From 08aa0cd9f65c2c832ce44f9cf9413abaf9a852e2 Mon Sep 17 00:00:00 2001 From: typedarray <90073088+0xOlias@users.noreply.github.com> Date: Fri, 30 Jun 2023 18:40:46 -0400 Subject: [PATCH 1/5] fix: historical sync race condition --- .../core/src/event-store/postgres/store.ts | 48 +++++------ packages/core/src/event-store/sqlite/store.ts | 53 ++++++------ packages/core/src/event-store/store.ts | 14 +-- .../core/src/historical-sync/service.test.ts | 9 -- packages/core/src/historical-sync/service.ts | 85 ++++++++++++------- 5 files changed, 110 insertions(+), 99 deletions(-) diff --git a/packages/core/src/event-store/postgres/store.ts b/packages/core/src/event-store/postgres/store.ts index a2a97bd94..3febfcf1c 100644 --- a/packages/core/src/event-store/postgres/store.ts +++ b/packages/core/src/event-store/postgres/store.ts @@ -199,21 +199,6 @@ export class PostgresEventStore implements EventStore { }); }; - getLogFilterCachedRanges = async ({ filterKey }: { filterKey: string }) => { - const results = await this.db - .selectFrom("logFilterCachedRanges") - .select(["filterKey", "startBlock", "endBlock", "endBlockTimestamp"]) - .where("filterKey", "=", filterKey) - .execute(); - - return results.map((range) => ({ - ...range, - startBlock: blobToBigInt(range.startBlock), - endBlock: blobToBigInt(range.endBlock), - endBlockTimestamp: blobToBigInt(range.endBlockTimestamp), - })); - }; - insertFinalizedLogs = async ({ chainId, logs: rpcLogs, @@ -247,11 +232,7 @@ export class PostgresEventStore implements EventStore { chainId, block: rpcBlock, transactions: rpcTransactions, - logFilterRange: { - logFilterKey, - blockNumberToCacheFrom, - logFilterStartBlockNumber, - }, + logFilterRange: { logFilterKey, blockNumberToCacheFrom }, }: { chainId: number; block: RpcBlock; @@ -259,7 +240,6 @@ export class PostgresEventStore implements EventStore { logFilterRange: { logFilterKey: string; blockNumberToCacheFrom: number; - logFilterStartBlockNumber: number; }; }) => { const block: InsertableBlock = { @@ -301,10 +281,15 @@ export class PostgresEventStore implements EventStore { .values(logFilterCachedRange) .execute(); }); + }; - // After inserting the new cached range record, execute a transaction to merge - // all adjacent cached ranges. Return the end block timestamp of the cached interval - // that contains the start block number of the log filter. + mergeLogFilterCachedRanges = async ({ + logFilterKey, + logFilterStartBlockNumber, + }: { + logFilterKey: string; + logFilterStartBlockNumber: number; + }) => { const startingRangeEndTimestamp = await this.db .transaction() .execute(async (tx) => { @@ -366,6 +351,21 @@ export class PostgresEventStore implements EventStore { return { startingRangeEndTimestamp }; }; + getLogFilterCachedRanges = async ({ filterKey }: { filterKey: string }) => { + const results = await this.db + .selectFrom("logFilterCachedRanges") + .select(["filterKey", "startBlock", "endBlock", "endBlockTimestamp"]) + .where("filterKey", "=", filterKey) + .execute(); + + return results.map((range) => ({ + ...range, + startBlock: blobToBigInt(range.startBlock), + endBlock: blobToBigInt(range.endBlock), + endBlockTimestamp: blobToBigInt(range.endBlockTimestamp), + })); + }; + insertContractReadResult = async ({ address, blockNumber, diff --git a/packages/core/src/event-store/sqlite/store.ts b/packages/core/src/event-store/sqlite/store.ts index 42695e11b..c632cf7b1 100644 --- a/packages/core/src/event-store/sqlite/store.ts +++ b/packages/core/src/event-store/sqlite/store.ts @@ -173,21 +173,6 @@ export class SqliteEventStore implements EventStore { }); }; - getLogFilterCachedRanges = async ({ filterKey }: { filterKey: string }) => { - const results = await this.db - .selectFrom("logFilterCachedRanges") - .select(["filterKey", "startBlock", "endBlock", "endBlockTimestamp"]) - .where("filterKey", "=", filterKey) - .execute(); - - return results.map((range) => ({ - ...range, - startBlock: blobToBigInt(range.startBlock), - endBlock: blobToBigInt(range.endBlock), - endBlockTimestamp: blobToBigInt(range.endBlockTimestamp), - })); - }; - insertFinalizedLogs = async ({ chainId, logs: rpcLogs, @@ -216,20 +201,12 @@ export class SqliteEventStore implements EventStore { chainId, block: rpcBlock, transactions: rpcTransactions, - logFilterRange: { - logFilterKey, - blockNumberToCacheFrom, - logFilterStartBlockNumber, - }, + logFilterRange: { logFilterKey, blockNumberToCacheFrom }, }: { chainId: number; block: RpcBlock; transactions: RpcTransaction[]; - logFilterRange: { - logFilterKey: string; - blockNumberToCacheFrom: number; - logFilterStartBlockNumber: number; - }; + logFilterRange: { logFilterKey: string; blockNumberToCacheFrom: number }; }) => { const block: InsertableBlock = { ...rpcToSqliteBlock(rpcBlock), @@ -272,10 +249,15 @@ export class SqliteEventStore implements EventStore { .execute(), ]); }); + }; - // After inserting the new cached range record, execute a transaction to merge - // all adjacent cached ranges. Return the end block timestamp of the cached interval - // that contains the start block number of the log filter. + mergeLogFilterCachedRanges = async ({ + logFilterKey, + logFilterStartBlockNumber, + }: { + logFilterKey: string; + logFilterStartBlockNumber: number; + }) => { const startingRangeEndTimestamp = await this.db .transaction() .execute(async (tx) => { @@ -336,6 +318,21 @@ export class SqliteEventStore implements EventStore { return { startingRangeEndTimestamp }; }; + getLogFilterCachedRanges = async ({ filterKey }: { filterKey: string }) => { + const results = await this.db + .selectFrom("logFilterCachedRanges") + .select(["filterKey", "startBlock", "endBlock", "endBlockTimestamp"]) + .where("filterKey", "=", filterKey) + .execute(); + + return results.map((range) => ({ + ...range, + startBlock: blobToBigInt(range.startBlock), + endBlock: blobToBigInt(range.endBlock), + endBlockTimestamp: blobToBigInt(range.endBlockTimestamp), + })); + }; + insertContractReadResult = async ({ address, blockNumber, diff --git a/packages/core/src/event-store/store.ts b/packages/core/src/event-store/store.ts index 4b8ffdd57..7eed8dcb6 100644 --- a/packages/core/src/event-store/store.ts +++ b/packages/core/src/event-store/store.ts @@ -46,10 +46,18 @@ export interface EventStore { logFilterRange: { logFilterKey: string; blockNumberToCacheFrom: number; - logFilterStartBlockNumber: number; }; + }): Promise; + + mergeLogFilterCachedRanges(options: { + logFilterKey: string; + logFilterStartBlockNumber: number; }): Promise<{ startingRangeEndTimestamp: number }>; + getLogFilterCachedRanges(options: { + filterKey: string; + }): Promise; + insertUnfinalizedBlock(options: { chainId: number; block: RpcBlock; @@ -67,10 +75,6 @@ export interface EventStore { toBlockNumber: number; }): Promise; - getLogFilterCachedRanges(options: { - filterKey: string; - }): Promise; - insertContractReadResult(options: { address: string; blockNumber: bigint; diff --git a/packages/core/src/historical-sync/service.test.ts b/packages/core/src/historical-sync/service.test.ts index e62cb706d..43602bb1c 100644 --- a/packages/core/src/historical-sync/service.test.ts +++ b/packages/core/src/historical-sync/service.test.ts @@ -11,7 +11,6 @@ import { publicClient, testResources } from "@/_test/utils"; import { encodeLogFilterKey } from "@/config/logFilterKey"; import { LogFilter } from "@/config/logFilters"; import { Network } from "@/config/networks"; -import { wait } from "@/utils/wait"; import { HistoricalSyncService } from "./service"; @@ -317,14 +316,6 @@ test("start() emits historicalCheckpoint event", async (context) => { await service.onIdle(); - // TODO: Remove this. It's just a test to see if there's indeed a - // a race condition happening here. - await wait(300); - const logFilterCachedRanges = await eventStore.getLogFilterCachedRanges({ - filterKey: logFilters[0].filter.key, - }); - console.log(logFilterCachedRanges); - expect(emitSpy).toHaveBeenCalledWith("historicalCheckpoint", { timestamp: 1673275859, // Block timestamp of block 16369955 }); diff --git a/packages/core/src/historical-sync/service.ts b/packages/core/src/historical-sync/service.ts index 1a790f30a..21ea21c44 100644 --- a/packages/core/src/historical-sync/service.ts +++ b/packages/core/src/historical-sync/service.ts @@ -14,7 +14,7 @@ import type { EventStore } from "@/event-store/store"; import { LoggerService } from "@/logs/service"; import { MetricsService } from "@/metrics/service"; import { formatEta, formatPercentage } from "@/utils/format"; -import { type Queue, createQueue } from "@/utils/queue"; +import { type Queue, type Worker, createQueue } from "@/utils/queue"; import { startClock } from "@/utils/timer"; import { findMissingIntervals } from "./intervals"; @@ -273,18 +273,47 @@ export class HistoricalSyncService extends Emittery { }; private buildQueue = () => { - const worker = async ({ task }: { task: LogSyncTask | BlockSyncTask }) => { - switch (task.kind) { - case "LOG_SYNC": { - return this.logTaskWorker({ task }); - } - case "BLOCK_SYNC": { - return this.blockTaskWorker({ task }); - } + const worker: Worker = async ({ + task, + queue, + }) => { + if (task.kind === "LOG_SYNC") { + await this.logTaskWorker({ task }); + } else { + await this.blockTaskWorker({ task }); } + + // If this is not the final task, return. + if (queue.size > 0 || queue.pending > 1) return; + + // If this is the final task, run the cleanup/completion logic. + + // It's possible for multiple block sync tasks to run simultaneously, + // resulting in a scenario where cached ranges are not fully merged. + // Merge all cached ranges once last time before emitting the `syncComplete` event. + await Promise.all( + this.logFilters.map((logFilter) => + this.eventStore.mergeLogFilterCachedRanges({ + logFilterKey: logFilter.filter.key, + logFilterStartBlockNumber: logFilter.filter.startBlock, + }) + ) + ); + + this.stats.duration = this.stats.endDuration(); + this.stats.isComplete = true; + this.emit("syncComplete"); + this.logger.info({ + service: "historical", + msg: `Completed sync in ${formatEta(this.stats.duration)} (network=${ + this.network.name + })`, + network: this.network.name, + duration: this.stats.duration, + }); }; - const queue = createQueue({ + const queue = createQueue({ worker, options: { concurrency: 10, autoStart: false }, onAdd: ({ task }) => { @@ -471,20 +500,6 @@ export class HistoricalSyncService extends Emittery { : task.blockNumberToCacheFrom); queue.addTask(task, { priority, retry: true }); }, - onIdle: () => { - if (this.stats.isComplete) return; - this.stats.duration = this.stats.endDuration(); - this.stats.isComplete = true; - this.emit("syncComplete"); - this.logger.info({ - service: "historical", - msg: `Completed sync in ${formatEta(this.stats.duration)} (network=${ - this.network.name - })`, - network: this.network.name, - duration: this.stats.duration, - }); - }, }); return queue; @@ -598,16 +613,20 @@ export class HistoricalSyncService extends Emittery { requiredTxHashes.has(tx.hash) ); + await this.eventStore.insertFinalizedBlock({ + chainId: this.network.chainId, + block, + transactions, + logFilterRange: { + logFilterKey: logFilter.filter.key, + blockNumberToCacheFrom, + }, + }); + const { startingRangeEndTimestamp } = - await this.eventStore.insertFinalizedBlock({ - chainId: this.network.chainId, - block, - transactions, - logFilterRange: { - logFilterKey: logFilter.filter.key, - blockNumberToCacheFrom, - logFilterStartBlockNumber: logFilter.filter.startBlock, - }, + await this.eventStore.mergeLogFilterCachedRanges({ + logFilterKey: logFilter.filter.key, + logFilterStartBlockNumber: logFilter.filter.startBlock, }); this.logFilterCheckpoints[logFilter.name] = Math.max( From 3b98468f5149fa36b1db43ba6e3672a0ee813397 Mon Sep 17 00:00:00 2001 From: typedarray <90073088+0xOlias@users.noreply.github.com> Date: Fri, 30 Jun 2023 18:42:31 -0400 Subject: [PATCH 2/5] chore: changeset --- .changeset/metal-boats-protect.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/metal-boats-protect.md diff --git a/.changeset/metal-boats-protect.md b/.changeset/metal-boats-protect.md new file mode 100644 index 000000000..a9ed13611 --- /dev/null +++ b/.changeset/metal-boats-protect.md @@ -0,0 +1,5 @@ +--- +"@ponder/core": patch +--- + +Fixed a race condition bug in the historical sync service From 748cd85c2bc66c16934671671f92bddf6e794ed4 Mon Sep 17 00:00:00 2001 From: typedarray <90073088+0xOlias@users.noreply.github.com> Date: Fri, 30 Jun 2023 19:02:01 -0400 Subject: [PATCH 3/5] test: fix store tests --- packages/core/src/event-store/store.test.ts | 57 ++++++++++++--------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/packages/core/src/event-store/store.test.ts b/packages/core/src/event-store/store.test.ts index ef0631e2c..d01cd4546 100644 --- a/packages/core/src/event-store/store.test.ts +++ b/packages/core/src/event-store/store.test.ts @@ -272,9 +272,8 @@ test("insertFinalizedBlock inserts block as finalized", async (context) => { block: blockOne, transactions: blockOneTransactions, logFilterRange: { - blockNumberToCacheFrom: 15131900, logFilterKey: "test-filter-key", - logFilterStartBlockNumber: 15131900, + blockNumberToCacheFrom: 15131900, }, }); @@ -301,9 +300,8 @@ test("insertFinalizedBlock inserts transactions as finalized", async (context) = block: blockOne, transactions: blockOneTransactions, logFilterRange: { - blockNumberToCacheFrom: 15131900, logFilterKey: "test-filter-key", - logFilterStartBlockNumber: 15131900, + blockNumberToCacheFrom: 15131900, }, }); @@ -334,9 +332,8 @@ test("insertFinalizedBlock inserts a log filter cached interval", async (context block: blockOne, transactions: blockOneTransactions, logFilterRange: { - blockNumberToCacheFrom: 15131900, logFilterKey: "test-filter-key", - logFilterStartBlockNumber: 15131900, + blockNumberToCacheFrom: 15131900, }, }); @@ -353,7 +350,7 @@ test("insertFinalizedBlock inserts a log filter cached interval", async (context expect(logFilterCachedRanges).toHaveLength(1); }); -test("insertFinalizedBlock merges cached intervals", async (context) => { +test("mergeLogFilterCachedIntervals merges cached intervals", async (context) => { const { eventStore } = context; await eventStore.insertFinalizedBlock({ @@ -361,9 +358,8 @@ test("insertFinalizedBlock merges cached intervals", async (context) => { block: blockOne, transactions: blockOneTransactions, logFilterRange: { - blockNumberToCacheFrom: 15131900, logFilterKey: "test-filter-key", - logFilterStartBlockNumber: 15131900, + blockNumberToCacheFrom: 15131900, }, }); @@ -372,12 +368,16 @@ test("insertFinalizedBlock merges cached intervals", async (context) => { block: blockTwo, transactions: blockTwoTransactions, logFilterRange: { - blockNumberToCacheFrom: 15495110, logFilterKey: "test-filter-key", - logFilterStartBlockNumber: 15131900, + blockNumberToCacheFrom: 15495110, }, }); + await eventStore.mergeLogFilterCachedRanges({ + logFilterKey: "test-filter-key", + logFilterStartBlockNumber: 15131900, + }); + const logFilterCachedRanges = await eventStore.getLogFilterCachedRanges({ filterKey: "test-filter-key", }); @@ -391,32 +391,41 @@ test("insertFinalizedBlock merges cached intervals", async (context) => { expect(logFilterCachedRanges).toHaveLength(1); }); -test("insertFinalizedBlock returns the startingRangeEndTimestamp", async (context) => { +test("mergeLogFilterCachedIntervals returns the startingRangeEndTimestamp", async (context) => { const { eventStore } = context; - const { startingRangeEndTimestamp } = await eventStore.insertFinalizedBlock({ + await eventStore.insertFinalizedBlock({ chainId: 1, block: blockOne, transactions: blockOneTransactions, logFilterRange: { - blockNumberToCacheFrom: 15131900, logFilterKey: "test-filter-key", - logFilterStartBlockNumber: 15131900, + blockNumberToCacheFrom: 15131900, }, }); + const { startingRangeEndTimestamp } = + await eventStore.mergeLogFilterCachedRanges({ + logFilterKey: "test-filter-key", + logFilterStartBlockNumber: 15131900, + }); + expect(startingRangeEndTimestamp).toBe(hexToNumber(blockOne.timestamp)); + await eventStore.insertFinalizedBlock({ + chainId: 1, + block: blockTwo, + transactions: blockTwoTransactions, + logFilterRange: { + logFilterKey: "test-filter-key", + blockNumberToCacheFrom: 15495110, + }, + }); + const { startingRangeEndTimestamp: startingRangeEndTimestamp2 } = - await eventStore.insertFinalizedBlock({ - chainId: 1, - block: blockTwo, - transactions: blockTwoTransactions, - logFilterRange: { - blockNumberToCacheFrom: 15495110, - logFilterKey: "test-filter-key", - logFilterStartBlockNumber: 15131900, - }, + await eventStore.mergeLogFilterCachedRanges({ + logFilterKey: "test-filter-key", + logFilterStartBlockNumber: 15131900, }); expect(startingRangeEndTimestamp2).toBe(hexToNumber(blockTwo.timestamp)); From 35c111320c88cf322b01e586e0bd341e00bd15de Mon Sep 17 00:00:00 2001 From: typedarray <90073088+0xOlias@users.noreply.github.com> Date: Fri, 30 Jun 2023 19:02:30 -0400 Subject: [PATCH 4/5] chore: speed up postgres tests by dropping entire schema rather than migrating down (way more SQL queries) --- packages/core/src/_test/setup.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/core/src/_test/setup.ts b/packages/core/src/_test/setup.ts index f58edb303..0f063ba1f 100644 --- a/packages/core/src/_test/setup.ts +++ b/packages/core/src/_test/setup.ts @@ -53,17 +53,17 @@ export async function setupEventStore(context: TestContext) { const pool = new Pool({ connectionString: process.env.DATABASE_URL }); const databaseSchema = `vitest_pool_${process.pid}_${poolId}`; context.eventStore = new PostgresEventStore({ pool, databaseSchema }); + await context.eventStore.migrateUp(); + + return async () => { + await pool.query(`DROP SCHEMA IF EXISTS "${databaseSchema}" CASCADE`); + }; } else { const rawSqliteDb = new SqliteDatabase(":memory:"); const db = patchSqliteDatabase({ db: rawSqliteDb }); context.eventStore = new SqliteEventStore({ db }); + await context.eventStore.migrateUp(); } - - await context.eventStore.migrateUp(); - - return async () => { - await context.eventStore.migrateDown(); - }; } /** From ef6f9d613f7fae45a98c51d3f558aa34afa75012 Mon Sep 17 00:00:00 2001 From: typedarray <90073088+0xOlias@users.noreply.github.com> Date: Fri, 30 Jun 2023 19:11:28 -0400 Subject: [PATCH 5/5] fix: also emit event --- packages/core/src/historical-sync/service.ts | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/packages/core/src/historical-sync/service.ts b/packages/core/src/historical-sync/service.ts index 21ea21c44..ef3a1f6e3 100644 --- a/packages/core/src/historical-sync/service.ts +++ b/packages/core/src/historical-sync/service.ts @@ -293,10 +293,7 @@ export class HistoricalSyncService extends Emittery { // Merge all cached ranges once last time before emitting the `syncComplete` event. await Promise.all( this.logFilters.map((logFilter) => - this.eventStore.mergeLogFilterCachedRanges({ - logFilterKey: logFilter.filter.key, - logFilterStartBlockNumber: logFilter.filter.startBlock, - }) + this.updateHistoricalCheckpoint({ logFilter }) ) ); @@ -623,6 +620,14 @@ export class HistoricalSyncService extends Emittery { }, }); + await this.updateHistoricalCheckpoint({ logFilter }); + }; + + updateHistoricalCheckpoint = async ({ + logFilter, + }: { + logFilter: LogFilter; + }) => { const { startingRangeEndTimestamp } = await this.eventStore.mergeLogFilterCachedRanges({ logFilterKey: logFilter.filter.key,