From 1e423a1c4f5eb303711842cc6389f9e13cfeecde Mon Sep 17 00:00:00 2001 From: kyscott18 <43524469+kyscott18@users.noreply.github.com> Date: Wed, 29 May 2024 11:58:09 -0400 Subject: [PATCH] Support future end block (#917) * fix: unfinalized start block in historical sync * support future end block * support future end block * remove validateHistoricalBlockRange * chore: changeset * nits * more nit --- .changeset/giant-spoons-build.md | 5 + .../src/build/configAndIndexingFunctions.ts | 20 +++ .../core/src/sync-historical/service.test.ts | 1 - packages/core/src/sync-historical/service.ts | 163 ++++++------------ .../validateHistoricalBlockRange.test.ts | 63 ------- .../validateHistoricalBlockRange.ts | 59 ------- packages/core/src/sync-realtime/service.ts | 6 + packages/core/src/sync/service.test.ts | 38 ++++ packages/core/src/sync/service.ts | 67 ++++++- 9 files changed, 178 insertions(+), 244 deletions(-) create mode 100644 .changeset/giant-spoons-build.md delete mode 100644 packages/core/src/sync-historical/validateHistoricalBlockRange.test.ts delete mode 100644 packages/core/src/sync-historical/validateHistoricalBlockRange.ts diff --git a/.changeset/giant-spoons-build.md b/.changeset/giant-spoons-build.md new file mode 100644 index 000000000..79edfdebe --- /dev/null +++ b/.changeset/giant-spoons-build.md @@ -0,0 +1,5 @@ +--- +"@ponder/core": patch +--- + +Added support for `startBlock` or `endBlock` to be greater than the finalized or latest block. diff --git a/packages/core/src/build/configAndIndexingFunctions.ts b/packages/core/src/build/configAndIndexingFunctions.ts index 8a071e114..03aeb6b2f 100644 --- a/packages/core/src/build/configAndIndexingFunctions.ts +++ b/packages/core/src/build/configAndIndexingFunctions.ts @@ -342,6 +342,10 @@ export async function buildConfigAndIndexingFunctions({ ? undefined : endBlockMaybeNan; + if (endBlock !== undefined && endBlock < startBlock) { + throw new Error(`Validation failed: Start block for contract '${contractName}' is after end block (${startBlock} > ${endBlock}).`) + } + // Single network case. if (typeof contract.network === "string") { return { @@ -382,6 +386,10 @@ export async function buildConfigAndIndexingFunctions({ ? undefined : endBlockMaybeNan; + if (endBlock !== undefined && endBlock < startBlock) { + throw new Error(`Validation failed: Start block for contract '${contractName}' is after end block (${startBlock} > ${endBlock}).`) + } + return { contractName, networkName, @@ -703,6 +711,12 @@ export async function buildConfigAndIndexingFunctions({ ? undefined : endBlockMaybeNan; + if (endBlock !== undefined && endBlock < startBlock) { + throw new Error( + `Validation failed: Start block for block source '${sourceName}' is after end block (${startBlock} > ${endBlock}).`, + ); + } + const intervalMaybeNan = blockSourceConfig.interval; const interval = Number.isNaN(intervalMaybeNan) ? 0 : intervalMaybeNan; @@ -768,6 +782,12 @@ export async function buildConfigAndIndexingFunctions({ ? undefined : endBlockMaybeNan; + if (endBlock !== undefined && endBlock < startBlock) { + throw new Error( + `Validation failed: Start block for block source '${sourceName}' is after end block (${startBlock} > ${endBlock}).`, + ); + } + const intervalMaybeNan = overrides.interval ?? blockSourceConfig.interval; const interval = Number.isNaN(intervalMaybeNan) diff --git a/packages/core/src/sync-historical/service.test.ts b/packages/core/src/sync-historical/service.test.ts index 650477d7e..b0d1d42e8 100644 --- a/packages/core/src/sync-historical/service.test.ts +++ b/packages/core/src/sync-historical/service.test.ts @@ -21,7 +21,6 @@ beforeEach(setupIsolatedDatabase); const getBlockNumbers = () => publicClient.getBlockNumber().then((b) => ({ - latestBlockNumber: Number(b) + 5, finalizedBlockNumber: Number(b), })); diff --git a/packages/core/src/sync-historical/service.ts b/packages/core/src/sync-historical/service.ts index a00c9d0b6..c8d61fad5 100644 --- a/packages/core/src/sync-historical/service.ts +++ b/packages/core/src/sync-historical/service.ts @@ -40,7 +40,6 @@ import { numberToHex, toHex, } from "viem"; -import { validateHistoricalBlockRange } from "./validateHistoricalBlockRange.js"; const HISTORICAL_CHECKPOINT_EMIT_INTERVAL = 500; const TRACE_FILTER_CHUNK_SIZE = 10; @@ -188,10 +187,8 @@ export class HistoricalSyncService extends Emittery { } async setup({ - latestBlockNumber, finalizedBlockNumber, }: { - latestBlockNumber: number; finalizedBlockNumber: number; }) { // Initialize state variables. Required when restarting the service. @@ -200,21 +197,13 @@ export class HistoricalSyncService extends Emittery { await Promise.all( this.sources.map(async (source) => { - const { isHistoricalSyncRequired, startBlock, endBlock } = - validateHistoricalBlockRange({ - startBlock: source.startBlock, - endBlock: source.endBlock, - finalizedBlockNumber, - latestBlockNumber, - }); + const startBlock = source.startBlock; + const endBlock = source.endBlock ?? finalizedBlockNumber; - switch (source.type) { - case "log": { - if (!isHistoricalSyncRequired) { - this.logFilterProgressTrackers[source.id] = new ProgressTracker({ - target: [startBlock, finalizedBlockNumber], - completed: [[startBlock, finalizedBlockNumber]], - }); + if (source.startBlock > finalizedBlockNumber) { + switch (source.type) { + case "log": + case "factoryLog": { this.common.metrics.ponder_historical_total_blocks.set( { network: this.network.name, @@ -227,9 +216,51 @@ export class HistoricalSyncService extends Emittery { service: "historical", msg: `Skipped syncing '${this.network.name}' logs for '${source.contractName}' because the start block is not finalized`, }); - return; + break; + } + + case "callTrace": + case "factoryCallTrace": { + this.common.metrics.ponder_historical_total_blocks.set( + { + network: this.network.name, + source: source.contractName, + type: "trace", + }, + 0, + ); + this.common.logger.warn({ + service: "historical", + msg: `Skipped syncing '${this.network.name}' call traces for '${source.contractName}' because the start block is not finalized`, + }); + break; + } + + case "block": { + this.common.metrics.ponder_historical_total_blocks.set( + { + network: this.network.name, + source: source.sourceName, + type: "block", + }, + 0, + ); + this.common.logger.warn({ + service: "historical", + msg: `Skipped syncing '${this.network.name}' blocks for '${source.sourceName}' because the start block is not finalized`, + }); + break; } + default: + never(source); + } + + return; + } + + switch (source.type) { + case "log": { const completedLogFilterIntervals = await this.syncStore.getLogFilterIntervals({ chainId: source.chainId, @@ -303,32 +334,6 @@ export class HistoricalSyncService extends Emittery { break; case "factoryLog": { - if (!isHistoricalSyncRequired) { - this.factoryChildAddressProgressTrackers[source.id] = - new ProgressTracker({ - target: [startBlock, finalizedBlockNumber], - completed: [[startBlock, finalizedBlockNumber]], - }); - this.factoryLogFilterProgressTrackers[source.id] = - new ProgressTracker({ - target: [startBlock, finalizedBlockNumber], - completed: [[startBlock, finalizedBlockNumber]], - }); - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.contractName, - type: "log", - }, - 0, - ); - this.common.logger.warn({ - service: "historical", - msg: `Skipped syncing '${this.network.name}' logs for '${source.contractName}' because the start block is not finalized`, - }); - return; - } - // Note that factory child address progress is stored using // log intervals for the factory log. const completedFactoryChildAddressIntervals = @@ -486,28 +491,6 @@ export class HistoricalSyncService extends Emittery { break; case "callTrace": { - if (!isHistoricalSyncRequired) { - this.traceFilterProgressTrackers[source.id] = new ProgressTracker( - { - target: [startBlock, finalizedBlockNumber], - completed: [[startBlock, finalizedBlockNumber]], - }, - ); - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.contractName, - type: "trace", - }, - 0, - ); - this.common.logger.warn({ - service: "historical", - msg: `Skipped syncing '${this.network.name}' call traces for '${source.contractName}' because the start block is not finalized`, - }); - return; - } - const completedTraceFilterIntervals = await this.syncStore.getTraceFilterIntervals({ chainId: source.chainId, @@ -580,32 +563,6 @@ export class HistoricalSyncService extends Emittery { break; case "factoryCallTrace": { - if (!isHistoricalSyncRequired) { - this.factoryChildAddressProgressTrackers[source.id] = - new ProgressTracker({ - target: [startBlock, finalizedBlockNumber], - completed: [[startBlock, finalizedBlockNumber]], - }); - this.factoryTraceFilterProgressTrackers[source.id] = - new ProgressTracker({ - target: [startBlock, finalizedBlockNumber], - completed: [[startBlock, finalizedBlockNumber]], - }); - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.contractName, - type: "trace", - }, - 0, - ); - this.common.logger.warn({ - service: "historical", - msg: `Skipped syncing '${this.network.name}' call traces for '${source.contractName}' because the start block is not finalized`, - }); - return; - } - // Note that factory child address progress is stored using // log intervals for the factory log. const completedFactoryChildAddressIntervals = @@ -763,28 +720,6 @@ export class HistoricalSyncService extends Emittery { break; case "block": { - if (!isHistoricalSyncRequired) { - this.blockFilterProgressTrackers[source.id] = new ProgressTracker( - { - target: [startBlock, finalizedBlockNumber], - completed: [[startBlock, finalizedBlockNumber]], - }, - ); - this.common.metrics.ponder_historical_total_blocks.set( - { - network: this.network.name, - source: source.sourceName, - type: "block", - }, - 0, - ); - this.common.logger.warn({ - service: "historical", - msg: `Skipped syncing '${this.network.name}' blocks for '${source.sourceName}' because the start block is not finalized`, - }); - return; - } - const completedBlockFilterIntervals = await this.syncStore.getBlockFilterIntervals({ chainId: source.chainId, diff --git a/packages/core/src/sync-historical/validateHistoricalBlockRange.test.ts b/packages/core/src/sync-historical/validateHistoricalBlockRange.test.ts deleted file mode 100644 index 48c60b669..000000000 --- a/packages/core/src/sync-historical/validateHistoricalBlockRange.test.ts +++ /dev/null @@ -1,63 +0,0 @@ -import { expect, test } from "vitest"; -import { validateHistoricalBlockRange } from "./validateHistoricalBlockRange.js"; - -test("validateHistoricalBlockRange throws if start block is greater than latest block", () => { - expect(() => - validateHistoricalBlockRange({ - finalizedBlockNumber: 50, - latestBlockNumber: 100, - startBlock: 120, - }), - ).toThrowError( - "Start block number (120) cannot be greater than latest block number (100)", - ); -}); - -test("validateHistoricalBlockRange returns not required if startBlock is between finalized and latest", () => { - const result = validateHistoricalBlockRange({ - finalizedBlockNumber: 50, - latestBlockNumber: 100, - startBlock: 75, - }); - - expect(result.isHistoricalSyncRequired).toBe(false); -}); - -test("validateHistoricalBlockRange throws if end block is greater than start block", () => { - expect(() => - validateHistoricalBlockRange({ - finalizedBlockNumber: 50, - latestBlockNumber: 100, - startBlock: 40, - endBlock: 20, - }), - ).toThrowError( - "End block number (20) cannot be less than start block number (40)", - ); -}); - -test("validateHistoricalBlockRange throws if end block is greater than finalized block", () => { - expect(() => - validateHistoricalBlockRange({ - finalizedBlockNumber: 50, - latestBlockNumber: 100, - startBlock: 20, - endBlock: 75, - }), - ).toThrowError( - "End block number (75) cannot be greater than finalized block number (50)", - ); -}); - -test("validateHistoricalBlockRange throws if end block is greater than latest block", () => { - expect(() => - validateHistoricalBlockRange({ - finalizedBlockNumber: 50, - latestBlockNumber: 100, - startBlock: 20, - endBlock: 150, - }), - ).toThrowError( - "End block number (150) cannot be greater than latest block number (100)", - ); -}); diff --git a/packages/core/src/sync-historical/validateHistoricalBlockRange.ts b/packages/core/src/sync-historical/validateHistoricalBlockRange.ts deleted file mode 100644 index 0b49b125d..000000000 --- a/packages/core/src/sync-historical/validateHistoricalBlockRange.ts +++ /dev/null @@ -1,59 +0,0 @@ -export function validateHistoricalBlockRange({ - startBlock, - endBlock: userDefinedEndBlock, - finalizedBlockNumber, - latestBlockNumber, -}: { - startBlock: number; - endBlock?: number; - finalizedBlockNumber: number; - latestBlockNumber: number; -}) { - if (startBlock > latestBlockNumber) { - throw new Error( - `Start block number (${startBlock}) cannot be greater than latest block number (${latestBlockNumber}). - Are you sure the RPC endpoint is for the correct network?`, - ); - } - - if (startBlock > finalizedBlockNumber) { - // If the start block is in the unfinalized range, the historical sync is not needed. - // Set the checkpoint to the current timestamp, then return (don't create the queue). - return { - isHistoricalSyncRequired: false, - startBlock, - endBlock: userDefinedEndBlock, - } as const; - } - - if (userDefinedEndBlock) { - if (userDefinedEndBlock < startBlock) { - throw new Error( - `End block number (${userDefinedEndBlock}) cannot be less than start block number (${startBlock}). - Are you sure the RPC endpoint is for the correct network?`, - ); - } - - if (userDefinedEndBlock > latestBlockNumber) { - throw new Error( - `End block number (${userDefinedEndBlock}) cannot be greater than latest block number (${latestBlockNumber}). - Are you sure the RPC endpoint is for the correct network?`, - ); - } - - if (userDefinedEndBlock > finalizedBlockNumber) { - throw new Error( - `End block number (${userDefinedEndBlock}) cannot be greater than finalized block number (${finalizedBlockNumber}). - Are you sure the RPC endpoint is for the correct network?`, - ); - } - } - - const resolvedEndBlock = userDefinedEndBlock ?? finalizedBlockNumber; - - return { - isHistoricalSyncRequired: true, - startBlock, - endBlock: resolvedEndBlock, - } as const; -} diff --git a/packages/core/src/sync-realtime/service.ts b/packages/core/src/sync-realtime/service.ts index f4c288389..1195a5e3f 100644 --- a/packages/core/src/sync-realtime/service.ts +++ b/packages/core/src/sync-realtime/service.ts @@ -305,6 +305,12 @@ export const kill = async (service: Service) => { service.isKilled = true; service.queue?.pause(); service.queue?.clear(); + + service.common.logger.debug({ + service: "realtime", + msg: `Killed '${service.network.name}' realtime sync`, + }); + await service.queue?.onIdle(); }; diff --git a/packages/core/src/sync/service.test.ts b/packages/core/src/sync/service.test.ts index fe429ede8..f1c34ae3d 100644 --- a/packages/core/src/sync/service.test.ts +++ b/packages/core/src/sync/service.test.ts @@ -365,3 +365,41 @@ test("onRealtimeSyncEvent finalize", async (context) => { await kill(syncService); await cleanup(); }); + +test("onRealtimeSyncEvent unfinalized end block", async (context) => { + const { common } = context; + const { syncStore, cleanup } = await setupDatabaseServices(context); + const { networks, sources } = getMultichainNetworksAndSources(context); + + const syncService = await create({ + common, + syncStore, + networks, + sources: [sources[0], { ...sources[1], endBlock: 4 }], + onRealtimeEvent: vi.fn(), + onFatalError: vi.fn(), + initialCheckpoint: zeroCheckpoint, + }); + + const killSpy = vi.spyOn( + syncService.networkServices[1].realtime!.realtimeSync, + "kill", + ); + + syncService.networkServices[0].realtime!.realtimeSync.onEvent({ + type: "finalize", + chainId: networks[0].chainId, + checkpoint: createCheckpoint({ blockNumber: 5n }), + }); + + syncService.networkServices[1].realtime!.realtimeSync.onEvent({ + type: "finalize", + chainId: networks[1].chainId, + checkpoint: createCheckpoint({ blockNumber: 5n }), + }); + + expect(killSpy).toHaveBeenCalledOnce(); + + await kill(syncService); + await cleanup(); +}); diff --git a/packages/core/src/sync/service.ts b/packages/core/src/sync/service.ts index 07b586e1b..78df231ce 100644 --- a/packages/core/src/sync/service.ts +++ b/packages/core/src/sync/service.ts @@ -48,6 +48,7 @@ export type Service = { checkpoint: Checkpoint; finalizedCheckpoint: Checkpoint; finalizedBlock: SyncBlock; + endBlock: number | undefined; } | undefined; @@ -89,9 +90,15 @@ export const create = async ({ const onRealtimeSyncEvent = (realtimeSyncEvent: RealtimeSyncEvent) => { switch (realtimeSyncEvent.type) { case "checkpoint": { - syncService.networkServices.find( + const networkService = syncService.networkServices.find( (ns) => ns.network.chainId === realtimeSyncEvent.chainId, - )!.realtime!.checkpoint = realtimeSyncEvent.checkpoint; + )!; + + // "realtime" property may be undefined when `kill()` has been + // invoked but hasn't completed. + if (networkService.realtime === undefined) return; + + networkService.realtime.checkpoint = realtimeSyncEvent.checkpoint; // `realtime` can be undefined if no contracts for that network require a realtime // service. Those networks can be left out of the checkpoint calculation. @@ -123,9 +130,15 @@ export const create = async ({ } case "reorg": { - syncService.networkServices.find( + const networkService = syncService.networkServices.find( (ns) => ns.network.chainId === realtimeSyncEvent.chainId, - )!.realtime!.checkpoint = realtimeSyncEvent.safeCheckpoint; + )!; + + // "realtime" property may be undefined when `kill()` has been + // invoked but hasn't completed. + if (networkService.realtime === undefined) return; + + networkService.realtime!.checkpoint = realtimeSyncEvent.safeCheckpoint; if ( isCheckpointGreaterThan( @@ -142,9 +155,32 @@ export const create = async ({ } case "finalize": { - syncService.networkServices.find( + const networkService = syncService.networkServices.find( (ns) => ns.network.chainId === realtimeSyncEvent.chainId, - )!.realtime!.finalizedCheckpoint = realtimeSyncEvent.checkpoint; + )!; + + // "realtime" property may be undefined when `kill()` has been + // invoked but hasn't completed. + if (networkService.realtime === undefined) return; + + networkService.realtime!.finalizedCheckpoint = + realtimeSyncEvent.checkpoint; + + // Check if the finalized blockNumber is greater than the end block of all + // sources for the network. Potentially kill the realtime sync and remove the + // network from checkpoint calculations. + if ( + networkService.realtime.endBlock !== undefined && + realtimeSyncEvent.checkpoint.blockNumber > + networkService.realtime.endBlock + ) { + common.logger.info({ + service: "sync", + msg: `Synced final end block for '${networkService.network.name}' (${networkService.realtime.endBlock}), killing realtime sync service`, + }); + networkService.realtime.realtimeSync.kill(); + networkService.realtime = undefined; + } const newFinalizedCheckpoint = checkpointMin( ...syncService.networkServices @@ -200,6 +236,19 @@ export const create = async ({ }); } + for (const source of networkSources) { + if (source.startBlock > hexToNumber(latestBlock.number)) { + common.logger.warn({ + service: "sync", + msg: `Start block ${ + source.startBlock + } is greater than the latest block ${hexToNumber( + latestBlock.number, + )} for '${network.name}'.`, + }); + } + } + const historicalSync = new HistoricalSyncService({ common, syncStore, @@ -209,7 +258,6 @@ export const create = async ({ }); await historicalSync.setup({ - latestBlockNumber: hexToNumber(latestBlock.number), finalizedBlockNumber: hexToNumber(finalizedBlock.number), }); @@ -262,6 +310,11 @@ export const create = async ({ checkpoint: initialFinalizedCheckpoint, finalizedCheckpoint: initialFinalizedCheckpoint, finalizedBlock, + endBlock: networkSources.every( + (source) => source.endBlock !== undefined, + ) + ? Math.max(...networkSources.map((source) => source.endBlock!)) + : undefined, }, historical: { historicalSync,