Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix historical sync race condition #241

Merged
merged 5 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/metal-boats-protect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ponder/core": patch
---

Fixed a race condition bug in the historical sync service
12 changes: 6 additions & 6 deletions packages/core/src/_test/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,17 @@ export async function setupEventStore(context: TestContext) {
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const databaseSchema = `vitest_pool_${process.pid}_${poolId}`;
context.eventStore = new PostgresEventStore({ pool, databaseSchema });
await context.eventStore.migrateUp();

return async () => {
await pool.query(`DROP SCHEMA IF EXISTS "${databaseSchema}" CASCADE`);
};
} else {
const rawSqliteDb = new SqliteDatabase(":memory:");
const db = patchSqliteDatabase({ db: rawSqliteDb });
context.eventStore = new SqliteEventStore({ db });
await context.eventStore.migrateUp();
}

await context.eventStore.migrateUp();

return async () => {
await context.eventStore.migrateDown();
};
}

/**
Expand Down
48 changes: 24 additions & 24 deletions packages/core/src/event-store/postgres/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,6 @@ export class PostgresEventStore implements EventStore {
});
};

getLogFilterCachedRanges = async ({ filterKey }: { filterKey: string }) => {
const results = await this.db
.selectFrom("logFilterCachedRanges")
.select(["filterKey", "startBlock", "endBlock", "endBlockTimestamp"])
.where("filterKey", "=", filterKey)
.execute();

return results.map((range) => ({
...range,
startBlock: blobToBigInt(range.startBlock),
endBlock: blobToBigInt(range.endBlock),
endBlockTimestamp: blobToBigInt(range.endBlockTimestamp),
}));
};

insertFinalizedLogs = async ({
chainId,
logs: rpcLogs,
Expand Down Expand Up @@ -247,19 +232,14 @@ export class PostgresEventStore implements EventStore {
chainId,
block: rpcBlock,
transactions: rpcTransactions,
logFilterRange: {
logFilterKey,
blockNumberToCacheFrom,
logFilterStartBlockNumber,
},
logFilterRange: { logFilterKey, blockNumberToCacheFrom },
}: {
chainId: number;
block: RpcBlock;
transactions: RpcTransaction[];
logFilterRange: {
logFilterKey: string;
blockNumberToCacheFrom: number;
logFilterStartBlockNumber: number;
};
}) => {
const block: InsertableBlock = {
Expand Down Expand Up @@ -301,10 +281,15 @@ export class PostgresEventStore implements EventStore {
.values(logFilterCachedRange)
.execute();
});
};

// After inserting the new cached range record, execute a transaction to merge
// all adjacent cached ranges. Return the end block timestamp of the cached interval
// that contains the start block number of the log filter.
mergeLogFilterCachedRanges = async ({
logFilterKey,
logFilterStartBlockNumber,
}: {
logFilterKey: string;
logFilterStartBlockNumber: number;
}) => {
const startingRangeEndTimestamp = await this.db
.transaction()
.execute(async (tx) => {
Expand Down Expand Up @@ -366,6 +351,21 @@ export class PostgresEventStore implements EventStore {
return { startingRangeEndTimestamp };
};

getLogFilterCachedRanges = async ({ filterKey }: { filterKey: string }) => {
const results = await this.db
.selectFrom("logFilterCachedRanges")
.select(["filterKey", "startBlock", "endBlock", "endBlockTimestamp"])
.where("filterKey", "=", filterKey)
.execute();

return results.map((range) => ({
...range,
startBlock: blobToBigInt(range.startBlock),
endBlock: blobToBigInt(range.endBlock),
endBlockTimestamp: blobToBigInt(range.endBlockTimestamp),
}));
};

insertContractReadResult = async ({
address,
blockNumber,
Expand Down
53 changes: 25 additions & 28 deletions packages/core/src/event-store/sqlite/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,6 @@ export class SqliteEventStore implements EventStore {
});
};

getLogFilterCachedRanges = async ({ filterKey }: { filterKey: string }) => {
const results = await this.db
.selectFrom("logFilterCachedRanges")
.select(["filterKey", "startBlock", "endBlock", "endBlockTimestamp"])
.where("filterKey", "=", filterKey)
.execute();

return results.map((range) => ({
...range,
startBlock: blobToBigInt(range.startBlock),
endBlock: blobToBigInt(range.endBlock),
endBlockTimestamp: blobToBigInt(range.endBlockTimestamp),
}));
};

insertFinalizedLogs = async ({
chainId,
logs: rpcLogs,
Expand Down Expand Up @@ -216,20 +201,12 @@ export class SqliteEventStore implements EventStore {
chainId,
block: rpcBlock,
transactions: rpcTransactions,
logFilterRange: {
logFilterKey,
blockNumberToCacheFrom,
logFilterStartBlockNumber,
},
logFilterRange: { logFilterKey, blockNumberToCacheFrom },
}: {
chainId: number;
block: RpcBlock;
transactions: RpcTransaction[];
logFilterRange: {
logFilterKey: string;
blockNumberToCacheFrom: number;
logFilterStartBlockNumber: number;
};
logFilterRange: { logFilterKey: string; blockNumberToCacheFrom: number };
}) => {
const block: InsertableBlock = {
...rpcToSqliteBlock(rpcBlock),
Expand Down Expand Up @@ -272,10 +249,15 @@ export class SqliteEventStore implements EventStore {
.execute(),
]);
});
};

// After inserting the new cached range record, execute a transaction to merge
// all adjacent cached ranges. Return the end block timestamp of the cached interval
// that contains the start block number of the log filter.
mergeLogFilterCachedRanges = async ({
logFilterKey,
logFilterStartBlockNumber,
}: {
logFilterKey: string;
logFilterStartBlockNumber: number;
}) => {
const startingRangeEndTimestamp = await this.db
.transaction()
.execute(async (tx) => {
Expand Down Expand Up @@ -336,6 +318,21 @@ export class SqliteEventStore implements EventStore {
return { startingRangeEndTimestamp };
};

getLogFilterCachedRanges = async ({ filterKey }: { filterKey: string }) => {
const results = await this.db
.selectFrom("logFilterCachedRanges")
.select(["filterKey", "startBlock", "endBlock", "endBlockTimestamp"])
.where("filterKey", "=", filterKey)
.execute();

return results.map((range) => ({
...range,
startBlock: blobToBigInt(range.startBlock),
endBlock: blobToBigInt(range.endBlock),
endBlockTimestamp: blobToBigInt(range.endBlockTimestamp),
}));
};

insertContractReadResult = async ({
address,
blockNumber,
Expand Down
57 changes: 33 additions & 24 deletions packages/core/src/event-store/store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,8 @@ test("insertFinalizedBlock inserts block as finalized", async (context) => {
block: blockOne,
transactions: blockOneTransactions,
logFilterRange: {
blockNumberToCacheFrom: 15131900,
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
blockNumberToCacheFrom: 15131900,
},
});

Expand All @@ -301,9 +300,8 @@ test("insertFinalizedBlock inserts transactions as finalized", async (context) =
block: blockOne,
transactions: blockOneTransactions,
logFilterRange: {
blockNumberToCacheFrom: 15131900,
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
blockNumberToCacheFrom: 15131900,
},
});

Expand Down Expand Up @@ -334,9 +332,8 @@ test("insertFinalizedBlock inserts a log filter cached interval", async (context
block: blockOne,
transactions: blockOneTransactions,
logFilterRange: {
blockNumberToCacheFrom: 15131900,
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
blockNumberToCacheFrom: 15131900,
},
});

Expand All @@ -353,17 +350,16 @@ test("insertFinalizedBlock inserts a log filter cached interval", async (context
expect(logFilterCachedRanges).toHaveLength(1);
});

test("insertFinalizedBlock merges cached intervals", async (context) => {
test("mergeLogFilterCachedIntervals merges cached intervals", async (context) => {
const { eventStore } = context;

await eventStore.insertFinalizedBlock({
chainId: 1,
block: blockOne,
transactions: blockOneTransactions,
logFilterRange: {
blockNumberToCacheFrom: 15131900,
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
blockNumberToCacheFrom: 15131900,
},
});

Expand All @@ -372,12 +368,16 @@ test("insertFinalizedBlock merges cached intervals", async (context) => {
block: blockTwo,
transactions: blockTwoTransactions,
logFilterRange: {
blockNumberToCacheFrom: 15495110,
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
blockNumberToCacheFrom: 15495110,
},
});

await eventStore.mergeLogFilterCachedRanges({
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
});

const logFilterCachedRanges = await eventStore.getLogFilterCachedRanges({
filterKey: "test-filter-key",
});
Expand All @@ -391,32 +391,41 @@ test("insertFinalizedBlock merges cached intervals", async (context) => {
expect(logFilterCachedRanges).toHaveLength(1);
});

test("insertFinalizedBlock returns the startingRangeEndTimestamp", async (context) => {
test("mergeLogFilterCachedIntervals returns the startingRangeEndTimestamp", async (context) => {
const { eventStore } = context;

const { startingRangeEndTimestamp } = await eventStore.insertFinalizedBlock({
await eventStore.insertFinalizedBlock({
chainId: 1,
block: blockOne,
transactions: blockOneTransactions,
logFilterRange: {
blockNumberToCacheFrom: 15131900,
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
blockNumberToCacheFrom: 15131900,
},
});

const { startingRangeEndTimestamp } =
await eventStore.mergeLogFilterCachedRanges({
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
});

expect(startingRangeEndTimestamp).toBe(hexToNumber(blockOne.timestamp));

await eventStore.insertFinalizedBlock({
chainId: 1,
block: blockTwo,
transactions: blockTwoTransactions,
logFilterRange: {
logFilterKey: "test-filter-key",
blockNumberToCacheFrom: 15495110,
},
});

const { startingRangeEndTimestamp: startingRangeEndTimestamp2 } =
await eventStore.insertFinalizedBlock({
chainId: 1,
block: blockTwo,
transactions: blockTwoTransactions,
logFilterRange: {
blockNumberToCacheFrom: 15495110,
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
},
await eventStore.mergeLogFilterCachedRanges({
logFilterKey: "test-filter-key",
logFilterStartBlockNumber: 15131900,
});

expect(startingRangeEndTimestamp2).toBe(hexToNumber(blockTwo.timestamp));
Expand Down
14 changes: 9 additions & 5 deletions packages/core/src/event-store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ export interface EventStore {
logFilterRange: {
logFilterKey: string;
blockNumberToCacheFrom: number;
logFilterStartBlockNumber: number;
};
}): Promise<void>;

mergeLogFilterCachedRanges(options: {
logFilterKey: string;
logFilterStartBlockNumber: number;
}): Promise<{ startingRangeEndTimestamp: number }>;

getLogFilterCachedRanges(options: {
filterKey: string;
}): Promise<LogFilterCachedRange[]>;

insertUnfinalizedBlock(options: {
chainId: number;
block: RpcBlock;
Expand All @@ -67,10 +75,6 @@ export interface EventStore {
toBlockNumber: number;
}): Promise<void>;

getLogFilterCachedRanges(options: {
filterKey: string;
}): Promise<LogFilterCachedRange[]>;

insertContractReadResult(options: {
address: string;
blockNumber: bigint;
Expand Down
9 changes: 0 additions & 9 deletions packages/core/src/historical-sync/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { publicClient, testResources } from "@/_test/utils";
import { encodeLogFilterKey } from "@/config/logFilterKey";
import { LogFilter } from "@/config/logFilters";
import { Network } from "@/config/networks";
import { wait } from "@/utils/wait";

import { HistoricalSyncService } from "./service";

Expand Down Expand Up @@ -317,14 +316,6 @@ test("start() emits historicalCheckpoint event", async (context) => {

await service.onIdle();

// TODO: Remove this. It's just a test to see if there's indeed a
// a race condition happening here.
await wait(300);
const logFilterCachedRanges = await eventStore.getLogFilterCachedRanges({
filterKey: logFilters[0].filter.key,
});
console.log(logFilterCachedRanges);

expect(emitSpy).toHaveBeenCalledWith("historicalCheckpoint", {
timestamp: 1673275859, // Block timestamp of block 16369955
});
Expand Down
Loading