diff --git a/.changeset/loud-days-perform.md b/.changeset/loud-days-perform.md new file mode 100644 index 000000000..6ea1d31db --- /dev/null +++ b/.changeset/loud-days-perform.md @@ -0,0 +1,5 @@ +--- +"@ponder/core": patch +--- + +Fixed a bug where stale tables were left in the database after the service was stopped. diff --git a/packages/core/src/Ponder.ts b/packages/core/src/Ponder.ts index 7e5b452c5..f5f13ab65 100644 --- a/packages/core/src/Ponder.ts +++ b/packages/core/src/Ponder.ts @@ -293,13 +293,13 @@ export class Ponder { ) ); - await this.buildService.kill?.(); + await this.buildService.kill(); this.uiService.kill(); this.indexingService.kill(); await this.serverService.kill(); - await this.userStore.teardown(); await this.common.telemetry.kill(); + await this.userStore.kill(); await this.eventStore.kill(); this.common.logger.debug({ diff --git a/packages/core/src/_test/globalSetup.ts b/packages/core/src/_test/globalSetup.ts index 6fded9655..0b413ee17 100644 --- a/packages/core/src/_test/globalSetup.ts +++ b/packages/core/src/_test/globalSetup.ts @@ -1,5 +1,6 @@ import { startProxy } from "@viem/anvil"; import dotenv from "dotenv"; +import { Pool } from "pg"; import { FORK_BLOCK_NUMBER } from "./constants"; @@ -11,11 +12,45 @@ export default async function () { throw new Error('Missing environment variable "ANVIL_FORK_URL"'); } - return await startProxy({ + const shutdownProxy = await startProxy({ options: { chainId: 1, forkUrl: ANVIL_FORK_URL, forkBlockNumber: FORK_BLOCK_NUMBER, }, }); + + let cleanupDatabase: () => Promise; + if (process.env.DATABASE_URL) { + cleanupDatabase = async () => { + const pool = new Pool({ connectionString: process.env.DATABASE_URL }); + + const schemaRows = await pool.query(` + SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname ~ '^vitest_pool_'; + `); + const schemas = schemaRows.rows.map((r) => r.nspname) as string[]; + + for (const schema of schemas) { + const tableRows = await pool.query(` + SELECT table_name FROM information_schema.tables WHERE table_schema = '${schema}' + `); + const tables = tableRows.rows.map((r) => r.table_name) as string[]; + + for (const table of tables) { + await pool.query( + `DROP TABLE IF EXISTS "${schema}"."${table}" CASCADE` + ); + } + await pool.query(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`); + console.log(`Dropped ${tables.length} tables from schema "${schema}".`); + } + + await pool.end(); + }; + } + + return async () => { + await shutdownProxy(); + await cleanupDatabase?.(); + }; } diff --git a/packages/core/src/_test/setup.ts b/packages/core/src/_test/setup.ts index 08c4d0eca..6af8cddf1 100644 --- a/packages/core/src/_test/setup.ts +++ b/packages/core/src/_test/setup.ts @@ -69,31 +69,31 @@ beforeEach((context) => { */ export async function setupEventStore( context: TestContext, - options = { skipMigrateUp: false } + options = { migrateUp: true } ) { if (process.env.DATABASE_URL) { const pool = new Pool({ connectionString: process.env.DATABASE_URL }); const databaseSchema = `vitest_pool_${process.pid}_${poolId}`; context.eventStore = new PostgresEventStore({ pool, databaseSchema }); - if (!options.skipMigrateUp) await context.eventStore.migrateUp(); + if (options.migrateUp) await context.eventStore.migrateUp(); return async () => { try { await pool.query(`DROP SCHEMA IF EXISTS "${databaseSchema}" CASCADE`); + await context.eventStore.kill(); } catch (e) { - // This query fails in end-to-end tests where the pool has + // This fails in end-to-end tests where the pool has // already been shut down during the Ponder instance kill() method. // It's fine to ignore the error. } - await context.eventStore.kill(); }; } else { const rawSqliteDb = new SqliteDatabase(":memory:"); const db = patchSqliteDatabase({ db: rawSqliteDb }); context.eventStore = new SqliteEventStore({ db }); - if (!options.skipMigrateUp) await context.eventStore.migrateUp(); + if (options.migrateUp) await context.eventStore.migrateUp(); return async () => { await context.eventStore.kill(); @@ -121,7 +121,13 @@ export async function setupUserStore(context: TestContext) { } return async () => { - await context.userStore.teardown(); + try { + await context.userStore.kill(); + } catch (e) { + // This fails in end-to-end tests where the pool has + // already been shut down during the Ponder instance kill() method. + // It's fine to ignore the error. + } }; } diff --git a/packages/core/src/event-store/postgres/migrations.test.ts b/packages/core/src/event-store/postgres/migrations.test.ts index 708d1b8c7..b5dd9947b 100644 --- a/packages/core/src/event-store/postgres/migrations.test.ts +++ b/packages/core/src/event-store/postgres/migrations.test.ts @@ -15,7 +15,7 @@ import { rpcToPostgresTransaction, } from "./format"; -beforeEach((context) => setupEventStore(context, { skipMigrateUp: true })); +beforeEach((context) => setupEventStore(context, { migrateUp: false })); const seed_2023_07_24_0_drop_finalized = async (db: Kysely) => { await db @@ -60,20 +60,27 @@ const seed_2023_07_24_0_drop_finalized = async (db: Kysely) => { .execute(); }; -test("2023_07_24_0_drop_finalized -> 2023_09_19_0_new_sync_design succeeds", async (context) => { - const { eventStore } = context; +test( + "2023_07_24_0_drop_finalized -> 2023_09_19_0_new_sync_design succeeds", + async (context) => { + const { eventStore } = context; - if (eventStore.kind !== "postgres") return; + if (eventStore.kind !== "postgres") return; - const { error } = await eventStore.migrator.migrateTo( - "2023_07_24_0_drop_finalized" - ); - expect(error).toBeFalsy(); + const { error } = await eventStore.migrator.migrateTo( + "2023_07_24_0_drop_finalized" + ); - await seed_2023_07_24_0_drop_finalized(eventStore.db); + expect(error).toBeFalsy(); - const { error: latestError } = await eventStore.migrator.migrateTo( - "2023_09_19_0_new_sync_design" - ); - expect(latestError).toBeFalsy(); -}, 15_000); + await seed_2023_07_24_0_drop_finalized(eventStore.db); + + const { error: latestError } = await eventStore.migrator.migrateTo( + "2023_09_19_0_new_sync_design" + ); + expect(latestError).toBeFalsy(); + }, + // This test is flaky. It seems like a Postgres isolation issue with our + // test setup. Annoying! + { timeout: 15_000, retry: 3 } +); diff --git a/packages/core/src/event-store/sqlite/migrations.test.ts b/packages/core/src/event-store/sqlite/migrations.test.ts index 4a2cc939f..1b9d29b3e 100644 --- a/packages/core/src/event-store/sqlite/migrations.test.ts +++ b/packages/core/src/event-store/sqlite/migrations.test.ts @@ -15,7 +15,7 @@ import { rpcToSqliteTransaction, } from "./format"; -beforeEach((context) => setupEventStore(context, { skipMigrateUp: true })); +beforeEach((context) => setupEventStore(context, { migrateUp: false })); const seed_2023_07_24_0_drop_finalized = async (db: Kysely) => { await db @@ -60,24 +60,20 @@ const seed_2023_07_24_0_drop_finalized = async (db: Kysely) => { .execute(); }; -test( - "2023_07_24_0_drop_finalized -> 2023_09_19_0_new_sync_design succeeds", - async (context) => { - const { eventStore } = context; +test("2023_07_24_0_drop_finalized -> 2023_09_19_0_new_sync_design succeeds", async (context) => { + const { eventStore } = context; - if (eventStore.kind !== "sqlite") return; + if (eventStore.kind !== "sqlite") return; - const { error } = await eventStore.migrator.migrateTo( - "2023_07_24_0_drop_finalized" - ); - expect(error).toBeFalsy(); + const { error } = await eventStore.migrator.migrateTo( + "2023_07_24_0_drop_finalized" + ); + expect(error).toBeFalsy(); - await seed_2023_07_24_0_drop_finalized(eventStore.db); + await seed_2023_07_24_0_drop_finalized(eventStore.db); - const { error: latestError } = await eventStore.migrator.migrateTo( - "2023_09_19_0_new_sync_design" - ); - expect(latestError).toBeFalsy(); - }, - {} -); + const { error: latestError } = await eventStore.migrator.migrateTo( + "2023_09_19_0_new_sync_design" + ); + expect(latestError).toBeFalsy(); +}); diff --git a/packages/core/src/server/service.test.ts b/packages/core/src/server/service.test.ts index 569c1ec03..4d4ebf641 100644 --- a/packages/core/src/server/service.test.ts +++ b/packages/core/src/server/service.test.ts @@ -1393,7 +1393,6 @@ test("serves singular entity versioned at specified timestamp", async (context) expect(testEntity.string).toBe("updated"); await service.kill(); - await userStore.teardown(); }); test("serves plural entities versioned at specified timestamp", async (context) => { @@ -1452,7 +1451,6 @@ test("serves plural entities versioned at specified timestamp", async (context) ]); await service.kill(); - await userStore.teardown(); }); test("derived field respects skip argument", async (context) => { @@ -1485,7 +1483,6 @@ test("derived field respects skip argument", async (context) => { }); await service.kill(); - await userStore.teardown(); }); test("responds with appropriate status code pre and post historical sync", async (context) => { @@ -1530,7 +1527,6 @@ test("responds with appropriate status code pre and post historical sync", async }); await service.kill(); - await userStore.teardown(); }); // This is a known limitation for now, which is that the timestamp version of entities @@ -1592,5 +1588,4 @@ test.skip("serves derived entities versioned at provided timestamp", async (cont }); await service.kill(); - await userStore.teardown(); }); diff --git a/packages/core/src/user-store/postgres/store.ts b/packages/core/src/user-store/postgres/store.ts index 859dc8c6c..0b3da3e7f 100644 --- a/packages/core/src/user-store/postgres/store.ts +++ b/packages/core/src/user-store/postgres/store.ts @@ -169,22 +169,21 @@ export class PostgresUserStore implements UserStore { }); }; - /** - * Tears down the store by dropping all tables for the current schema. - */ - teardown = async () => { - if (!this.schema) return; + async kill() { + const entities = this.schema?.entities ?? []; + if (entities.length > 0) { + await this.db.transaction().execute(async (tx) => { + await Promise.all( + entities.map(async (model) => { + const tableName = `${model.name}_${this.versionId}`; + await tx.schema.dropTable(tableName).execute(); + }) + ); + }); + } - // Drop tables from existing schema. - await this.db.transaction().execute(async (tx) => { - await Promise.all( - this.schema!.entities.map((model) => { - const tableName = `${model.name}_${this.versionId}`; - tx.schema.dropTable(tableName); - }) - ); - }); - }; + await this.db.destroy(); + } findUnique = async ({ modelName, diff --git a/packages/core/src/user-store/sqlite/store.ts b/packages/core/src/user-store/sqlite/store.ts index 4092d5644..193749d2c 100644 --- a/packages/core/src/user-store/sqlite/store.ts +++ b/packages/core/src/user-store/sqlite/store.ts @@ -150,22 +150,21 @@ export class SqliteUserStore implements UserStore { }); }; - /** - * Tears down the store by dropping all tables for the current schema. - */ - teardown = async () => { - if (!this.schema) return; + async kill() { + const entities = this.schema?.entities ?? []; + if (entities.length > 0) { + await this.db.transaction().execute(async (tx) => { + await Promise.all( + entities.map(async (model) => { + const tableName = `${model.name}_${this.versionId}`; + await tx.schema.dropTable(tableName).execute(); + }) + ); + }); + } - // Drop tables from existing schema. - await this.db.transaction().execute(async (tx) => { - await Promise.all( - this.schema!.entities.map((model) => { - const tableName = `${model.name}_${this.versionId}`; - tx.schema.dropTable(tableName); - }) - ); - }); - }; + await this.db.destroy(); + } findUnique = async ({ modelName, diff --git a/packages/core/src/user-store/store.test.ts b/packages/core/src/user-store/store.test.ts index aff263080..c3828df45 100644 --- a/packages/core/src/user-store/store.test.ts +++ b/packages/core/src/user-store/store.test.ts @@ -35,8 +35,6 @@ test("reload() binds the schema", async (context) => { await userStore.reload({ schema }); expect(userStore.schema).toBe(schema); - - await userStore.teardown(); }); test("create() inserts a record that is effective after timestamp", async (context) => { @@ -56,8 +54,6 @@ test("create() inserts a record that is effective after timestamp", async (conte id: "id1", }); expect(instance).toMatchObject({ id: "id1", name: "Skip", age: 12 }); - - await userStore.teardown(); }); test("create() inserts a record that is effective at timestamp", async (context) => { @@ -77,8 +73,6 @@ test("create() inserts a record that is effective at timestamp", async (context) id: "id1", }); expect(instance).toMatchObject({ id: "id1", name: "Skip", age: 12 }); - - await userStore.teardown(); }); test("create() inserts a record that is not effective before timestamp", async (context) => { @@ -98,8 +92,6 @@ test("create() inserts a record that is not effective before timestamp", async ( id: "id1", }); expect(instance).toBeNull(); - - await userStore.teardown(); }); test("create() throws on unique constraint violation", async (context) => { @@ -121,8 +113,6 @@ test("create() throws on unique constraint violation", async (context) => { data: { name: "Skip", age: 13 }, }) ).rejects.toThrow(); - - await userStore.teardown(); }); test("create() respects optional fields", async (context) => { @@ -143,8 +133,6 @@ test("create() respects optional fields", async (context) => { }); expect(instance).toMatchObject({ id: "id1", name: "Skip", age: null }); - - await userStore.teardown(); }); test("create() accepts enums", async (context) => { @@ -165,8 +153,6 @@ test("create() accepts enums", async (context) => { }); expect(instance).toMatchObject({ id: "id1", name: "Skip", kind: "CAT" }); - - await userStore.teardown(); }); test("create() throws on invalid enum value", async (context) => { @@ -181,8 +167,6 @@ test("create() throws on invalid enum value", async (context) => { data: { name: "Skip", kind: "NOTACAT" }, }) ).rejects.toThrow(); - - await userStore.teardown(); }); test("create() accepts BigInt fields as bigint and returns as bigint", async (context) => { @@ -203,8 +187,6 @@ test("create() accepts BigInt fields as bigint and returns as bigint", async (co }); expect(instance).toMatchObject({ id: "id1", name: "Skip", bigAge: 100n }); - - await userStore.teardown(); }); test("update() updates a record", async (context) => { @@ -236,8 +218,6 @@ test("update() updates a record", async (context) => { id: "id1", }); expect(updatedInstance).toMatchObject({ id: "id1", name: "Peanut Butter" }); - - await userStore.teardown(); }); test("update() updates a record using an update function", async (context) => { @@ -274,8 +254,6 @@ test("update() updates a record using an update function", async (context) => { id: "id1", name: "Skip and Skipper", }); - - await userStore.teardown(); }); test("update() updates a record and maintains older version", async (context) => { @@ -306,8 +284,6 @@ test("update() updates a record and maintains older version", async (context) => name: "Skip", bigAge: 100n, }); - - await userStore.teardown(); }); test("update() throws if trying to update an instance in the past", async (context) => { @@ -329,8 +305,6 @@ test("update() throws if trying to update an instance in the past", async (conte data: { name: "Peanut Butter" }, }) ).rejects.toThrow(); - - await userStore.teardown(); }); test("update() updates a record in-place within the same timestamp", async (context) => { @@ -356,8 +330,6 @@ test("update() updates a record in-place within the same timestamp", async (cont id: "id1", }); expect(updatedInstance).toMatchObject({ id: "id1", name: "Peanut Butter" }); - - await userStore.teardown(); }); test("upsert() inserts a new record", async (context) => { @@ -373,8 +345,6 @@ test("upsert() inserts a new record", async (context) => { const instance = await userStore.findUnique({ modelName: "Pet", id: "id1" }); expect(instance).toMatchObject({ id: "id1", name: "Skip", age: 12 }); - - await userStore.teardown(); }); test("upsert() updates a record", async (context) => { @@ -403,8 +373,6 @@ test("upsert() updates a record", async (context) => { id: "id1", }); expect(updatedInstance).toMatchObject({ id: "id1", name: "Jelly", age: 12 }); - - await userStore.teardown(); }); test("upsert() updates a record using an update function", async (context) => { @@ -435,8 +403,6 @@ test("upsert() updates a record using an update function", async (context) => { id: "id1", }); expect(updatedInstance).toMatchObject({ id: "id1", name: "Skip", age: 7 }); - - await userStore.teardown(); }); test("upsert() throws if trying to update an instance in the past", async (context) => { @@ -459,8 +425,6 @@ test("upsert() throws if trying to update an instance in the past", async (conte update: { name: "Peanut Butter" }, }) ).rejects.toThrow(); - - await userStore.teardown(); }); test("upsert() updates a record in-place within the same timestamp", async (context) => { @@ -487,8 +451,6 @@ test("upsert() updates a record in-place within the same timestamp", async (cont id: "id1", }); expect(updatedInstance).toMatchObject({ id: "id1", name: "Peanut Butter" }); - - await userStore.teardown(); }); test("delete() removes a record", async (context) => { @@ -511,8 +473,6 @@ test("delete() removes a record", async (context) => { id: "id1", }); expect(deletedInstance).toBe(null); - - await userStore.teardown(); }); test("delete() retains older version of record", async (context) => { @@ -534,8 +494,6 @@ test("delete() retains older version of record", async (context) => { id: "id1", }); expect(deletedInstance).toMatchObject({ id: "id1", name: "Skip", age: 12 }); - - await userStore.teardown(); }); test("delete() removes a record entirely if only present for one timestamp", async (context) => { @@ -559,8 +517,6 @@ test("delete() removes a record entirely if only present for one timestamp", asy id: "id1", }); expect(deletedInstance).toBe(null); - - await userStore.teardown(); }); test("delete() removes a record entirely if only present for one timestamp after update()", async (context) => { @@ -600,8 +556,6 @@ test("delete() removes a record entirely if only present for one timestamp after id: "id1", }); expect(deletedInstance).toBe(null); - - await userStore.teardown(); }); test("delete() deletes versions effective in the delete timestamp", async (context) => { @@ -630,8 +584,6 @@ test("delete() deletes versions effective in the delete timestamp", async (conte id: "id1", }); expect(instancePriorToDelete!.name).toBe("Skip"); - - await userStore.teardown(); }); test("findMany() returns current versions of all records", async (context) => { @@ -670,8 +622,6 @@ test("findMany() returns current versions of all records", async (context) => { "Foo", "Bar", ]); - - await userStore.teardown(); }); test("findMany() sorts on bigint field", async (context) => { @@ -708,8 +658,6 @@ test("findMany() sorts on bigint field", async (context) => { orderBy: { bigAge: "asc" }, }); expect(instances.map((i) => i.bigAge)).toMatchObject([null, 10n, 105n, 190n]); - - await userStore.teardown(); }); test("findMany() filters on bigint gt", async (context) => { @@ -747,8 +695,6 @@ test("findMany() filters on bigint gt", async (context) => { }); expect(instances.map((i) => i.bigAge)).toMatchObject([105n, 190n]); - - await userStore.teardown(); }); test("findMany() sorts and filters together", async (context) => { @@ -787,8 +733,6 @@ test("findMany() sorts and filters together", async (context) => { }); expect(instances.map((i) => i.name)).toMatchObject(["Bar", "Zarbar"]); - - await userStore.teardown(); }); test("findMany() errors on invalid filter condition", async (context) => { @@ -801,8 +745,6 @@ test("findMany() errors on invalid filter condition", async (context) => { where: { name: { invalidWhereCondition: "ar" } }, }) ).rejects.toThrow("Invalid filter condition name: invalidWhereCondition"); - - await userStore.teardown(); }); test("findMany() errors on orderBy object with multiple keys", async (context) => { @@ -815,8 +757,6 @@ test("findMany() errors on orderBy object with multiple keys", async (context) = orderBy: { name: "asc", bigAge: "desc" }, }) ).rejects.toThrow("Invalid sort condition: Must have exactly one property"); - - await userStore.teardown(); }); test("createMany() inserts multiple entities", async (context) => { @@ -836,8 +776,6 @@ test("createMany() inserts multiple entities", async (context) => { const instances = await userStore.findMany({ modelName: "Pet" }); expect(instances.length).toBe(3); - - await userStore.teardown(); }); test("createMany() inserts a large number of entities", async (context) => { @@ -859,8 +797,6 @@ test("createMany() inserts a large number of entities", async (context) => { const instances = await userStore.findMany({ modelName: "Pet" }); expect(instances.length).toBe(ENTITY_COUNT); - - await userStore.teardown(); }); test("updateMany() updates multiple entities", async (context) => { @@ -889,8 +825,6 @@ test("updateMany() updates multiple entities", async (context) => { const instances = await userStore.findMany({ modelName: "Pet" }); expect(instances.map((i) => i.bigAge)).toMatchObject([10n, 300n, 300n]); - - await userStore.teardown(); }); test("revert() deletes versions newer than the safe timestamp", async (context) => { @@ -937,8 +871,6 @@ test("revert() deletes versions newer than the safe timestamp", async (context) const persons = await userStore.findMany({ modelName: "Person" }); expect(persons.length).toBe(1); expect(persons[0].name).toBe("Bobby"); - - await userStore.teardown(); }); test("revert() updates versions that only existed during the safe timestamp to latest", async (context) => { @@ -962,6 +894,4 @@ test("revert() updates versions that only existed during the safe timestamp to l const pets = await userStore.findMany({ modelName: "Pet" }); expect(pets.length).toBe(1); expect(pets[0].name).toBe("Skip"); - - await userStore.teardown(); }); diff --git a/packages/core/src/user-store/store.ts b/packages/core/src/user-store/store.ts index 85eaac974..c83361e1a 100644 --- a/packages/core/src/user-store/store.ts +++ b/packages/core/src/user-store/store.ts @@ -78,7 +78,7 @@ export interface UserStore { versionId?: string; reload(options?: { schema?: Schema }): Promise; - teardown(): Promise; + kill(): Promise; revert(options: { safeTimestamp: number }): Promise;