From 8efb06ea5ad074766da4c92c8af82327e51cc3ee Mon Sep 17 00:00:00 2001 From: bankisan Date: Mon, 11 Dec 2023 15:43:19 +0000 Subject: [PATCH] Checkpoint -- testing new schemas for reading and writing --- packages/core/src/_test/setup.ts | 49 +++-- .../core/src/indexing-store/postgres/store.ts | 182 +++++++++--------- .../core/src/indexing-store/sqlite/store.ts | 4 + .../core/src/indexing-store/store.test.ts | 35 ++++ packages/core/src/indexing-store/store.ts | 1 + packages/core/vite.config.ts | 3 + 6 files changed, 167 insertions(+), 107 deletions(-) diff --git a/packages/core/src/_test/setup.ts b/packages/core/src/_test/setup.ts index 138222c19..40a163328 100644 --- a/packages/core/src/_test/setup.ts +++ b/packages/core/src/_test/setup.ts @@ -1,3 +1,4 @@ +import { randomBytes } from "crypto"; import { beforeEach, type TestContext } from "vitest"; import { buildOptions } from "@/config/options.js"; @@ -106,27 +107,53 @@ export async function setupSyncStore( */ export async function setupIndexingStore(context: TestContext) { if (process.env.DATABASE_URL) { - const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL }); + const testClient = new pg.Client({ + connectionString: process.env.DATABASE_URL, + }); + await testClient.connect(); + // Create a random database to isolate the tests. + const databaseSchema = `vitest_pool_${process.pid}_${randomBytes( + 10, + ).toString("hex")}`; + + const dbURL = new URL(process.env.DATABASE_URL); + dbURL.pathname = `/${databaseSchema}`; + const pool = new pg.Pool({ + connectionString: dbURL.toString(), + }); + + await testClient.query(`CREATE DATABASE "${databaseSchema}"`); context.indexingStore = new PostgresIndexingStore({ common: context.common, pool, }); + + return async () => { + try { + await context.indexingStore.kill(); + await testClient.query(`DROP DATABASE "${databaseSchema}"`); + await testClient.end(); + } catch (e) { + // This fails in end-to-end tests where the pool has + // already been shut down during the Ponder instance kill() method. + // It's fine to ignore the error. + } + }; } else { context.indexingStore = new SqliteIndexingStore({ common: context.common, file: ":memory:", }); + return async () => { + try { + await context.indexingStore.kill(); + } catch (e) { + // This fails in end-to-end tests where the pool has + // already been shut down during the Ponder instance kill() method. + // It's fine to ignore the error. + } + }; } - - return async () => { - try { - await context.indexingStore.kill(); - } catch (e) { - // This fails in end-to-end tests where the pool has - // already been shut down during the Ponder instance kill() method. - // It's fine to ignore the error. - } - }; } /** diff --git a/packages/core/src/indexing-store/postgres/store.ts b/packages/core/src/indexing-store/postgres/store.ts index 62aa2ab91..61eefed29 100644 --- a/packages/core/src/indexing-store/postgres/store.ts +++ b/packages/core/src/indexing-store/postgres/store.ts @@ -1,10 +1,4 @@ -import { - CompiledQuery, - Kysely, - PostgresDialect, - sql, - WithSchemaPlugin, -} from "kysely"; +import { Kysely, PostgresDialect, sql, WithSchemaPlugin } from "kysely"; import type { Common } from "@/Ponder.js"; import type { Scalar, Schema } from "@/schema/types.js"; @@ -41,89 +35,42 @@ export class PostgresIndexingStore implements IndexingStore { private common: Common; db: Kysely; + readerDB: Kysely; + writerDB: Kysely; schema?: Schema; - indexNamespace: string; - hasConnected: boolean = false; - isReadOnly: boolean = false; + namespaceVersion: string; constructor({ common, pool, - isReadOnly = false, }: { common: Common; pool: Pool; isReadOnly?: boolean; }) { - this.indexNamespace = `ponder_index_${new Date().getTime()}`; + this.namespaceVersion = `ponder_index_${new Date().getTime()}`; this.common = common; - this.isReadOnly = isReadOnly; this.db = new Kysely({ dialect: new PostgresDialect({ pool, - onCreateConnection: async (connection) => { - if (this.hasConnected) { - return; - } - - if (isReadOnly) { - const result = await connection.executeQuery( - // Finds the latest namespace which should have been created by the indexer writer. - CompiledQuery.raw( - `SELECT nspname FROM pg_namespace WHERE nspname LIKE 'ponder_index_%' ORDER BY nspname DESC LIMIT 1`, - ), - ); - if (result.rows.length === 0) { - throw new Error("No namespace found"); - } - const namespace = (result.rows as { nspname: string }[])[0] - .nspname as string; - - this.indexNamespace = namespace; - // This is only safe in a readonly context. Setting this modifies the session's search path which can - // modify the behavior of non-indexing queries. - await connection.executeQuery( - CompiledQuery.raw(`SET search_path = ${this.indexNamespace}`), - ); - } else { - await connection.executeQuery( - CompiledQuery.raw( - `CREATE SCHEMA IF NOT EXISTS ${this.indexNamespace}`, - ), - ); - } - - this.common.logger.debug({ - msg: `Connected to namespace: ${this.indexNamespace}`, - service: "indexing", - }); - this.hasConnected = true; - }, }), }); - if (isReadOnly) { - // This is a hack to stop readonly connections from connecting to a namespace using the Kysely plugin as readonly - // connections set their namespace using the search_path setter in the onCreateConnection hook. - return; - } - this.db = this.db.withPlugin(new WithSchemaPlugin(this.indexNamespace)); + this.readerDB = this.db.withPlugin(new WithSchemaPlugin("public")); + this.writerDB = this.db.withPlugin( + new WithSchemaPlugin(this.namespaceVersion), + ); } async kill() { return this.wrap({ method: "kill" }, async () => { - if (!this.isReadOnly) { - // Clean up all the tables and data created by this instance by dropping the schema that was created. - await this.db.executeQuery( - CompiledQuery.raw( - `DROP SCHEMA IF EXISTS ${this.indexNamespace} CASCADE`, - ), - ); - } - try { - await this.db.destroy(); + await Promise.all([ + this.writerDB.destroy(), + this.readerDB.destroy(), + this.db.destroy(), + ]); } catch (e) { const error = e as Error; if (error.message !== "Called end on pool more than once") { @@ -133,6 +80,32 @@ export class PostgresIndexingStore implements IndexingStore { }); } + publish = async () => { + return this.wrap({ method: "publish" }, async () => { + if (!this.schema) return; + + await this.readerDB.transaction().execute(async (tx) => { + // Drop all previous schemas. This will delete all previous views. + // TODO: Test this by creating an older schema and ensure it is removed. + + // Create all the view tables. + await Promise.all( + Object.entries(this.schema!.tables).map(async ([tableName]) => { + const viewBuilder = tx.schema + .createView(tableName) + .as( + tx + .withSchema(this.namespaceVersion) + .selectFrom(tableName) + .selectAll(), + ); + await viewBuilder.execute(); + }), + ); + }); + }); + }; + /** * Resets the database by dropping existing tables and creating new tables. * If no new schema is provided, the existing schema is used. @@ -147,7 +120,13 @@ export class PostgresIndexingStore implements IndexingStore { // Set the new schema. if (schema) this.schema = schema; - await this.db.transaction().execute(async (tx) => { + await this.writerDB.transaction().execute(async (tx) => { + await tx.schema.dropSchema(this.namespaceVersion).ifExists().execute(); + await tx.schema + .createSchema(this.namespaceVersion) + .ifNotExists() + .execute(); + // Create tables for new schema. await Promise.all( Object.entries(this.schema!.tables).map( @@ -217,12 +196,17 @@ export class PostgresIndexingStore implements IndexingStore { ), ); }); + + this.common.logger.debug({ + msg: `Connected to namespace: ${this.namespaceVersion}`, + service: "indexing", + }); }); }; revert = async ({ safeCheckpoint }: { safeCheckpoint: Checkpoint }) => { return this.wrap({ method: "revert" }, async () => { - await this.db.transaction().execute(async (tx) => { + await this.writerDB.transaction().execute(async (tx) => { await Promise.all( Object.keys(this.schema?.tables ?? {}).map(async (tableName) => { const encodedCheckpoint = encodeCheckpoint(safeCheckpoint); @@ -261,7 +245,7 @@ export class PostgresIndexingStore implements IndexingStore { encodeBigInts: false, }); - let query = this.db + let query = this.readerDB .selectFrom(tableName) .selectAll() .where("id", "=", formattedId); @@ -303,7 +287,7 @@ export class PostgresIndexingStore implements IndexingStore { orderBy?: OrderByInput; }) => { return this.wrap({ method: "findMany", tableName }, async () => { - let query = this.db.selectFrom(tableName).selectAll(); + let query = this.readerDB.selectFrom(tableName).selectAll(); if (checkpoint === "latest") { query = query.where("effectiveToCheckpoint", "=", "latest"); @@ -371,7 +355,7 @@ export class PostgresIndexingStore implements IndexingStore { const createRow = formatRow({ id, ...data }, false); const encodedCheckpoint = encodeCheckpoint(checkpoint); - const row = await this.db + const row = await this.writerDB .insertInto(tableName) .values({ ...createRow, @@ -409,7 +393,11 @@ export class PostgresIndexingStore implements IndexingStore { const rows = await Promise.all( chunkedRows.map((c) => - this.db.insertInto(tableName).values(c).returningAll().execute(), + this.writerDB + .insertInto(tableName) + .values(c) + .returningAll() + .execute(), ), ); @@ -437,7 +425,7 @@ export class PostgresIndexingStore implements IndexingStore { }); const encodedCheckpoint = encodeCheckpoint(checkpoint); - const row = await this.db.transaction().execute(async (tx) => { + const row = await this.writerDB.transaction().execute(async (tx) => { // Find the latest version of this instance. const latestRow = await tx .selectFrom(tableName) @@ -520,7 +508,7 @@ export class PostgresIndexingStore implements IndexingStore { return this.wrap({ method: "updateMany", tableName }, async () => { const encodedCheckpoint = encodeCheckpoint(checkpoint); - const rows = await this.db.transaction().execute(async (tx) => { + const rows = await this.writerDB.transaction().execute(async (tx) => { // Get all IDs that match the filter. let latestRowsQuery = tx .selectFrom(tableName) @@ -628,7 +616,7 @@ export class PostgresIndexingStore implements IndexingStore { const createRow = formatRow({ id, ...create }, false); const encodedCheckpoint = encodeCheckpoint(checkpoint); - const row = await this.db.transaction().execute(async (tx) => { + const row = await this.writerDB.transaction().execute(async (tx) => { // Find the latest version of this instance. const latestRow = await tx .selectFrom(tableName) @@ -722,31 +710,33 @@ export class PostgresIndexingStore implements IndexingStore { }); const encodedCheckpoint = encodeCheckpoint(checkpoint); - const isDeleted = await this.db.transaction().execute(async (tx) => { - // If the latest version has effectiveFromCheckpoint equal to current checkpoint, - // this row was created within the same indexing function, and we can delete it. - let deletedRow = await tx - .deleteFrom(tableName) - .where("id", "=", formattedId) - .where("effectiveFromCheckpoint", "=", encodedCheckpoint) - .where("effectiveToCheckpoint", "=", "latest") - .returning(["id"]) - .executeTakeFirst(); - - // If we did not take the shortcut above, update the latest record - // setting effectiveToCheckpoint to the current checkpoint. - if (!deletedRow) { - deletedRow = await tx - .updateTable(tableName) - .set({ effectiveToCheckpoint: encodedCheckpoint }) + const isDeleted = await this.writerDB + .transaction() + .execute(async (tx) => { + // If the latest version has effectiveFromCheckpoint equal to current checkpoint, + // this row was created within the same indexing function, and we can delete it. + let deletedRow = await tx + .deleteFrom(tableName) .where("id", "=", formattedId) + .where("effectiveFromCheckpoint", "=", encodedCheckpoint) .where("effectiveToCheckpoint", "=", "latest") .returning(["id"]) .executeTakeFirst(); - } - return !!deletedRow; - }); + // If we did not take the shortcut above, update the latest record + // setting effectiveToCheckpoint to the current checkpoint. + if (!deletedRow) { + deletedRow = await tx + .updateTable(tableName) + .set({ effectiveToCheckpoint: encodedCheckpoint }) + .where("id", "=", formattedId) + .where("effectiveToCheckpoint", "=", "latest") + .returning(["id"]) + .executeTakeFirst(); + } + + return !!deletedRow; + }); return isDeleted; }); diff --git a/packages/core/src/indexing-store/sqlite/store.ts b/packages/core/src/indexing-store/sqlite/store.ts index e14fb03d1..06b6216b7 100644 --- a/packages/core/src/indexing-store/sqlite/store.ts +++ b/packages/core/src/indexing-store/sqlite/store.ts @@ -76,6 +76,10 @@ export class SqliteIndexingStore implements IndexingStore { }); } + async publish() { + // Implements the interface. + } + /** * Resets the database by dropping existing tables and creating new tables. * If no new schema is provided, the existing schema is used. diff --git a/packages/core/src/indexing-store/store.test.ts b/packages/core/src/indexing-store/store.test.ts index 5cc303a59..28ca31d0f 100644 --- a/packages/core/src/indexing-store/store.test.ts +++ b/packages/core/src/indexing-store/store.test.ts @@ -28,6 +28,7 @@ function createCheckpoint(index: number): Checkpoint { test("reload() binds the schema", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); expect(indexingStore.schema).toBe(schema); }); @@ -35,6 +36,7 @@ test("reload() binds the schema", async (context) => { test("create() inserts a record that is effective after specified checkpoint", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -54,6 +56,7 @@ test("create() inserts a record that is effective after specified checkpoint", a test("create() inserts a record that is effective at timestamp", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -73,6 +76,7 @@ test("create() inserts a record that is effective at timestamp", async (context) test("create() inserts a record that is not effective before timestamp", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -92,6 +96,7 @@ test("create() inserts a record that is not effective before timestamp", async ( test("create() throws on unique constraint violation even if checkpoint is different", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -113,6 +118,7 @@ test("create() throws on unique constraint violation even if checkpoint is diffe test("create() respects optional fields", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -133,6 +139,7 @@ test("create() respects optional fields", async (context) => { test("create() accepts enums", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -153,6 +160,7 @@ test("create() accepts enums", async (context) => { test("create() throws on invalid enum value", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await expect(() => indexingStore.create({ @@ -167,6 +175,7 @@ test("create() throws on invalid enum value", async (context) => { test("create() accepts BigInt fields as bigint and returns as bigint", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -187,6 +196,7 @@ test("create() accepts BigInt fields as bigint and returns as bigint", async (co test("update() updates a record", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -218,6 +228,7 @@ test("update() updates a record", async (context) => { test("update() updates a record using an update function", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -254,6 +265,7 @@ test("update() updates a record using an update function", async (context) => { test("update() updates a record and maintains older version", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -284,6 +296,7 @@ test("update() updates a record and maintains older version", async (context) => test("update() throws if trying to update an instance in the past", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -305,6 +318,7 @@ test("update() throws if trying to update an instance in the past", async (conte test("update() updates a record in-place within the same timestamp", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -330,6 +344,7 @@ test("update() updates a record in-place within the same timestamp", async (cont test("upsert() inserts a new record", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.upsert({ tableName: "Pet", @@ -348,6 +363,7 @@ test("upsert() inserts a new record", async (context) => { test("upsert() updates a record", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -379,6 +395,7 @@ test("upsert() updates a record", async (context) => { test("upsert() updates a record using an update function", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -412,6 +429,7 @@ test("upsert() updates a record using an update function", async (context) => { test("upsert() throws if trying to update an instance in the past", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -434,6 +452,7 @@ test("upsert() throws if trying to update an instance in the past", async (conte test("upsert() updates a record in-place within the same timestamp", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -460,6 +479,7 @@ test("upsert() updates a record in-place within the same timestamp", async (cont test("delete() removes a record", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -489,6 +509,7 @@ test("delete() removes a record", async (context) => { test("delete() retains older version of record", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -514,6 +535,7 @@ test("delete() retains older version of record", async (context) => { test("delete() removes a record entirely if only present for one timestamp", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -544,6 +566,7 @@ test("delete() removes a record entirely if only present for one timestamp", asy test("delete() removes a record entirely if only present for one timestamp after update()", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -590,6 +613,7 @@ test("delete() removes a record entirely if only present for one timestamp after test("delete() deletes versions effective in the delete timestamp", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -623,6 +647,7 @@ test("delete() deletes versions effective in the delete timestamp", async (conte test("findMany() returns current versions of all records", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -661,6 +686,7 @@ test("findMany() returns current versions of all records", async (context) => { test("findMany() sorts on bigint field", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -697,6 +723,7 @@ test("findMany() sorts on bigint field", async (context) => { test("findMany() filters on bigint gt", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -734,6 +761,7 @@ test("findMany() filters on bigint gt", async (context) => { test("findMany() sorts and filters together", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -772,6 +800,7 @@ test("findMany() sorts and filters together", async (context) => { test("findMany() errors on invalid filter condition", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); expect(() => indexingStore.findMany({ @@ -784,6 +813,7 @@ test("findMany() errors on invalid filter condition", async (context) => { test("findMany() errors on orderBy object with multiple keys", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); expect(() => indexingStore.findMany({ @@ -796,6 +826,7 @@ test("findMany() errors on orderBy object with multiple keys", async (context) = test("createMany() inserts multiple entities", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); const createdInstances = await indexingStore.createMany({ tableName: "Pet", @@ -815,6 +846,7 @@ test("createMany() inserts multiple entities", async (context) => { test("createMany() inserts a large number of entities", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); const ENTITY_COUNT = 100_000; @@ -836,6 +868,7 @@ test("createMany() inserts a large number of entities", async (context) => { test("updateMany() updates multiple entities", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.createMany({ tableName: "Pet", @@ -864,6 +897,7 @@ test("updateMany() updates multiple entities", async (context) => { test("revert() deletes versions newer than the safe timestamp", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", @@ -910,6 +944,7 @@ test("revert() deletes versions newer than the safe timestamp", async (context) test("revert() updates versions that only existed during the safe timestamp to latest", async (context) => { const { indexingStore } = context; await indexingStore.reload({ schema }); + await indexingStore.publish(); await indexingStore.create({ tableName: "Pet", diff --git a/packages/core/src/indexing-store/store.ts b/packages/core/src/indexing-store/store.ts index 1dfe31cdb..e2cf62942 100644 --- a/packages/core/src/indexing-store/store.ts +++ b/packages/core/src/indexing-store/store.ts @@ -86,6 +86,7 @@ export interface IndexingStore { kill(): Promise; revert(options: { safeCheckpoint: Checkpoint }): Promise; + publish(): Promise; findUnique(options: { tableName: string; diff --git a/packages/core/vite.config.ts b/packages/core/vite.config.ts index d0778072f..53af994dc 100644 --- a/packages/core/vite.config.ts +++ b/packages/core/vite.config.ts @@ -11,5 +11,8 @@ export default defineConfig({ test: { globalSetup: ["src/_test/globalSetup.ts"], setupFiles: ["src/_test/setup.ts"], + sequence: { + concurrent: true, + }, }, });