Skip to content

Commit

Permalink
fix: worker add for transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed May 3, 2024
1 parent adea4db commit feddc26
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 129 deletions.
32 changes: 32 additions & 0 deletions packages/graphql/ecosystem-dev.config.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
const IS_DEBUG = process.env.DEBUG;
const APP = process.env.APP;

const commonOptions = {
interpreter: 'tsx',
node_args: IS_DEBUG ? ['--inspect-brk'] : [],
watch: true,
ignore_watch: ['node_modules'],
};

module.exports = {
apps: [
...(APP === 'graphql'
? [
{
...commonOptions,
name: 'graphql',
script: './src/app.ts',
},
]
: []),
...(APP === 'syncer'
? [
{
...commonOptions,
name: 'syncer',
script: './src/syncer.ts',
},
]
: []),
],
};
8 changes: 3 additions & 5 deletions packages/graphql/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@
"db:push": "drizzle-kit push:pg",
"db:setup": "run-s codegen db:generate",
"db:clean": "tsx ./src/cli.ts clean-db",
"dev:graphql": "tsx watch src/app.ts --inspect",
"dev:graphql:inspect": "tsx --inspect-brk src/app.ts",
"dev:syncer": "tsx watch src/syncer.ts",
"dev:syncer:inspect": "tsx --inspect-brk src/syncer.ts",
"dev:graphql": "APP=graphql pm2 start ecosystem-dev.config.cjs",
"dev:syncer": "APP=syncer pm2 start ecosystem-dev.config.cjs",
"pm2:start": "pm2 start ecosystem.config.cjs --attach",
"pm2:restart": "pm2 restart ecosystem.config.cjs --attach --update-env",
"pm2:restart": "pm2 restart ecosystem.config.cjs --attach",
"pm2:reset": "pm2 delete ecosystem.config.cjs && pnpm server:start",
"pm2:stop": "pm2 stop ecosystem.config.cjs",
"pm2:delete": "pm2 delete ecosystem.config.cjs",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,92 +10,100 @@ import { PredicateRepository } from '~/domain/Predicate/PredicateRepository';
import type { TransactionEntity } from '~/domain/Transaction/TransactionEntity';
import { TransactionRepository } from '~/domain/Transaction/TransactionRepository';
import type { GQLBlock } from '~/graphql/generated/sdk';
import type { QueueData, QueueInputs, QueueNames } from '~/infra/queue/Queue';
import { db } from '~/infra/database/Db';
import type { QueueData } from '~/infra/queue/Queue';
import type { SyncTransactionEvent } from './SyncTransactions';

type Input = QueueInputs[QueueNames.SYNC_TRANSACTION];
type Input = SyncTransactionEvent[];

export class RunTransactionWorker {
constructor(
readonly txAddr: Address,
readonly block: GQLBlock,
) {}
export class RunTransactionsWorker {
async execute(input: Input) {
await Promise.all(input.map((event) => this.syncTransaction(event)));
}

async execute({ txHash, block, index }: Input) {
private async syncTransaction({
txHash,
block,
index,
}: SyncTransactionEvent) {
const repository = new TransactionRepository();
const added = await repository.insertOne(txHash, block, index);
const hash = this.txAddr.short();
const txAddr = new Address(txHash);
const hash = txAddr.short();
if (!added) {
console.log(
c.red(`${printBlock(block)} Transaction ${hash} already exists`),
);
return;
}
await Promise.all([
this.syncInputs(added),
this.syncOutputs(added),
this.syncContracts(added),
this.syncOperations(added),
this.syncInputs(added, block),
this.syncOutputs(added, block),
this.syncContracts(added, block),
this.syncOperations(added, block),
]);
}

private async syncInputs(transaction: TransactionEntity) {
const block = this.block;
private async syncInputs(transaction: TransactionEntity, block: GQLBlock) {
const txAddr = new Address(transaction.txHash);
const hash = txAddr.short();
const inputs = transaction.data.inputs;
const transactionId = transaction._id.value();
const txHash = this.txAddr.short();
if (!inputs?.length) return;

console.log(
`${printBlock(block)} -- Syncing inputs on transaction ${txHash}`,
`${printBlock(block)} -- Syncing inputs on transaction ${hash}`,
);
const repository = new InputRepository();
const created = await repository.insertMany(inputs, transactionId);
await this.syncPredicates(created);
await this.syncPredicates(created, block);
}

private async syncOutputs(transaction: TransactionEntity) {
const block = this.block;
private async syncOutputs(transaction: TransactionEntity, block: GQLBlock) {
const txAddr = new Address(transaction.txHash);
const hash = txAddr.short();
const outputs = transaction.data.outputs;
const transactionId = transaction._id.value();
const txHash = this.txAddr.short();
if (!outputs?.length) return;

console.log(
`${printBlock(block)} -- Syncing outputs on transaction ${txHash}`,
`${printBlock(block)} -- Syncing outputs on transaction ${hash}`,
);
const repository = new OutputRepository();
await repository.insertMany(outputs, transactionId);
}

private async syncContracts(transaction: TransactionEntity) {
const block = this.block;
const txHash = this.txAddr.short();
private async syncContracts(transaction: TransactionEntity, block: GQLBlock) {
const txAddr = new Address(transaction.txHash);
const hash = txAddr.short();
const contracts = transaction.getContractsCreated();
const repository = new ContractRepository();
if (!contracts.length) return;

console.log(
`${printBlock(block)} -- Syncing contracts on transaction ${txHash}`,
`${printBlock(block)} -- Syncing contracts on transaction ${hash}`,
);
await repository.insertMany(contracts);
}

private async syncOperations(transaction: TransactionEntity) {
const block = this.block;
const txHash = this.txAddr.short();
private async syncOperations(
transaction: TransactionEntity,
block: GQLBlock,
) {
const txAddr = new Address(transaction.txHash);
const hash = txAddr.short();
const repository = new OperationRepository();
const operations = OperationsFactory.create(transaction).value();
if (!operations?.length) return;

console.log(
`${printBlock(block)} -- Syncing operations on transaction ${txHash}`,
`${printBlock(block)} -- Syncing operations on transaction ${hash}`,
);
const transactionId = transaction._id.value();
await repository.insertMany(operations, transactionId);
}

private async syncPredicates(inputs: InputEntity[]) {
const block = this.block;
private async syncPredicates(inputs: InputEntity[], block: GQLBlock) {
const predicates = inputs
.map((input) => input.predicateData)
.filter(Boolean);
Expand Down Expand Up @@ -127,16 +135,18 @@ export class RunTransactionWorker {
}
}

export const runTransactionWorker = async ({ data }: QueueData<Input>) => {
export const runTransactionsWorker = async ({ data }: QueueData<Input>) => {
await db.connect();
try {
const txAddr = new Address(data.txHash);
const syncTransactions = new RunTransactionWorker(txAddr, data.block);
return syncTransactions.execute(data);
const syncTransactions = new RunTransactionsWorker();
await syncTransactions.execute(data);
} catch (error) {
console.error(error);
throw new Error(`Sync transactions ${data.txHash}`, {
throw new Error('Sync transactions', {
cause: error,
});
} finally {
await db.close();
}
};

Expand Down
43 changes: 12 additions & 31 deletions packages/graphql/src/application/uc/SyncBlocks.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import c from 'chalk';
import { uniqBy } from 'lodash';
import { assign, createActor, fromPromise, setup } from 'xstate';
import { env } from '~/config';
import type { BlockEntity } from '~/domain/Block/BlockEntity';
import { BlockRepository } from '~/domain/Block/BlockRepository';
import type { GQLBlock } from '~/graphql/generated/sdk';
import { db } from '~/infra/database/Db';
import {
type QueueData,
type QueueInputs,
Expand Down Expand Up @@ -90,13 +89,17 @@ class Syncer {
const repo = new BlockRepository();
const { blocks, endCursor } = await repo.blocksFromNode(to - from, from);
const hasBlocks = blocks.length > 0;
const created = await repo.insertMany(blocks);
await this.syncTransactions(created);

return {
endCursor,
hasBlocks,
};
return db.connection().transaction(async (trx) => {
await repo.insertMany(blocks, trx);
await queue.push(QueueNames.SYNC_TRANSACTIONS, {
blocks: blocks.filter(Boolean),
});

return {
endCursor,
hasBlocks,
};
});
}
return {
endCursor: to,
Expand All @@ -113,28 +116,6 @@ class Syncer {
to: height,
});
}

private async syncTransactions(blocks: (BlockEntity | null)[]) {
const filtered = blocks.filter(Boolean) as BlockEntity[];
const events = filtered.flatMap((block) => {
const txs = uniqBy(block.data.transactions, 'id');
return txs.map<QueueInputs[QueueNames.SYNC_TRANSACTION]>(
(transaction, idx) => ({
index: idx,
block: block.data,
txHash: transaction.id,
}),
);
});

if (events.length) {
const fromBlock = filtered[0].data.header.height;
const toBlock = filtered[filtered.length - 1].data.header.height;
const msg = `# Syncing ${events.length} transactions from ${fromBlock} to ${toBlock}`;
console.log(c.gray(msg));
await queue.pushBatch(QueueNames.SYNC_TRANSACTION, events);
}
}
}

const syncer = new Syncer();
Expand Down
23 changes: 0 additions & 23 deletions packages/graphql/src/application/uc/SyncTransaction.ts

This file was deleted.

46 changes: 46 additions & 0 deletions packages/graphql/src/application/uc/SyncTransactions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import c from 'chalk';
import { uniqBy } from 'lodash';
import type { GQLBlock } from '~/graphql/generated/sdk';
import type { QueueData, QueueInputs, QueueNames } from '~/infra/queue/Queue';
import { pool } from '~/infra/worker/WorkerPool';

type Input = QueueInputs[QueueNames.SYNC_TRANSACTIONS];
export type SyncTransactionEvent = {
index: number;
block: GQLBlock;
txHash: string;
};

export class SyncTransactions {
async execute({ data }: QueueData<Input>) {
const { blocks } = data;
const events = blocks.flatMap((block) => {
const txs = uniqBy(block.transactions, 'id');
return txs.map<SyncTransactionEvent>((transaction, idx) => ({
index: idx,
block: block,
txHash: transaction.id,
}));
});

if (events.length) {
const fromBlock = blocks[0].header.height;
const toBlock = blocks[blocks.length - 1].header.height;
const msg = `# Syncing ${events.length} transactions from ${fromBlock} to ${toBlock}`;
console.log(c.gray(msg));
await pool.run({ data: events });
}
}
}

export const syncTransactions = async (input: QueueData<Input>) => {
try {
const syncTransactions = new SyncTransactions();
await syncTransactions.execute(input);
} catch (error) {
console.error(error);
throw new Error('Sync transactions', {
cause: error,
});
}
};
Loading

0 comments on commit feddc26

Please sign in to comment.