Skip to content

Commit

Permalink
feat: checkpoint selecting strategy on sync-store getEvents (#907)
Browse files Browse the repository at this point in the history
* implements checkpoint selecting strategy and improved sql generation

* simplify max checkpoint selection

* use offset instead of limit to improve performance

* Revert "use offset instead of limit to improve performance"

This reverts commit 705ff40.

* checkpoint limit adjustment

* Revert "checkpoint limit adjustment"

This reverts commit 6fe6e81.

* reintroduces offset selection

* add sqlite and tweaks

* changeset

---------

Co-authored-by: typedarray <[email protected]>
  • Loading branch information
erensanlier and 0xOlias authored May 30, 2024
1 parent 1e423a1 commit c9886d9
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/lemon-dingos-leave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ponder/core": patch
---

Improved the performance an important internal SQL query (`getEvents`) for large apps. An app with ~5M rows in the `ponder_sync.logs` table saw a ~20x reduction in execution time for this query. Smaller apps will see a more modest improvement.
60 changes: 59 additions & 1 deletion packages/core/src/sync-store/postgres/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,58 @@ export class PostgresSyncStore implements SyncStore {
)
} )`,
)
.with("log_checkpoints", (db) =>
db
.selectFrom("logs")
.where("logs.checkpoint", ">", cursor)
.where("logs.checkpoint", "<=", encodedToCheckpoint)
.orderBy("logs.checkpoint", "asc")
.offset(limit)
.limit(1)
.select("logs.checkpoint"),
)
.with("block_checkpoints", (db) =>
db
.selectFrom("blocks")
.where("blocks.checkpoint", ">", cursor)
.where("blocks.checkpoint", "<=", encodedToCheckpoint)
.orderBy("blocks.checkpoint", "asc")
.offset(limit)
.limit(1)
.select("blocks.checkpoint"),
)
.with("call_trace_checkpoints", (db) =>
db
.selectFrom("callTraces")
.where("callTraces.checkpoint", ">", cursor)
.where("callTraces.checkpoint", "<=", encodedToCheckpoint)
.orderBy("callTraces.checkpoint", "asc")
.offset(limit)
.limit(1)
.select("callTraces.checkpoint"),
)
.with("max_checkpoint", (db) =>
db
.selectFrom(
db
.selectFrom("log_checkpoints")
.select("checkpoint")
.unionAll(
db.selectFrom("block_checkpoints").select("checkpoint"),
)
.unionAll(
db
.selectFrom("call_trace_checkpoints")
.select("checkpoint"),
)
.as("all_checkpoints"),
)
.select(
sql`coalesce(max(checkpoint), ${encodedToCheckpoint})`.as(
"max_checkpoint",
),
),
)
.with("events", (db) =>
db
.selectFrom("logs")
Expand Down Expand Up @@ -1931,7 +1983,13 @@ export class PostgresSyncStore implements SyncStore {
]),
)
.where("events.checkpoint", ">", cursor)
.where("events.checkpoint", "<=", encodedToCheckpoint)
.where(
"events.checkpoint",
"<=",
// Use the theoretical max checkpoint across all source types
// For details, read https://github.com/ponder-sh/ponder/pull/907/files
sql`( select max_checkpoint from max_checkpoint )`,
)
.orderBy("events.checkpoint", "asc")
.limit(limit + 1)
.execute();
Expand Down
60 changes: 59 additions & 1 deletion packages/core/src/sync-store/sqlite/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1728,6 +1728,58 @@ export class SqliteSyncStore implements SyncStore {
)
} )`,
)
.with("log_checkpoints", (db) =>
db
.selectFrom("logs")
.where("logs.checkpoint", ">", cursor)
.where("logs.checkpoint", "<=", encodedToCheckpoint)
.orderBy("logs.checkpoint", "asc")
.offset(limit)
.limit(1)
.select("logs.checkpoint"),
)
.with("block_checkpoints", (db) =>
db
.selectFrom("blocks")
.where("blocks.checkpoint", ">", cursor)
.where("blocks.checkpoint", "<=", encodedToCheckpoint)
.orderBy("blocks.checkpoint", "asc")
.offset(limit)
.limit(1)
.select("blocks.checkpoint"),
)
.with("call_trace_checkpoints", (db) =>
db
.selectFrom("callTraces")
.where("callTraces.checkpoint", ">", cursor)
.where("callTraces.checkpoint", "<=", encodedToCheckpoint)
.orderBy("callTraces.checkpoint", "asc")
.offset(limit)
.limit(1)
.select("callTraces.checkpoint"),
)
.with("max_checkpoint", (db) =>
db
.selectFrom(
db
.selectFrom("log_checkpoints")
.select("checkpoint")
.unionAll(
db.selectFrom("block_checkpoints").select("checkpoint"),
)
.unionAll(
db
.selectFrom("call_trace_checkpoints")
.select("checkpoint"),
)
.as("all_checkpoints"),
)
.select(
sql`coalesce(max(checkpoint), ${encodedToCheckpoint})`.as(
"max_checkpoint",
),
),
)
.with("events", (db) =>
db
.selectFrom("logs")
Expand Down Expand Up @@ -1970,7 +2022,13 @@ export class SqliteSyncStore implements SyncStore {
]),
)
.where("events.checkpoint", ">", cursor)
.where("events.checkpoint", "<=", encodedToCheckpoint)
.where(
"events.checkpoint",
"<=",
// Use the theoretical max checkpoint across all source types
// For details, read https://github.com/ponder-sh/ponder/pull/907/files
sql`( select max_checkpoint from max_checkpoint )`,
)
.orderBy("events.checkpoint", "asc")
.limit(limit + 1)
.execute();
Expand Down

0 comments on commit c9886d9

Please sign in to comment.