Skip to content

Commit

Permalink
Fix the stale table bug (#405)
Browse files Browse the repository at this point in the history
* FIX THE STALE TABLE BUG

* tolerate errors in test cleanup

* chore: changeset

* try removing timeout

* Add logs

* add another log

* another log

* add global shutdown

* add schema to log

* remove logs from migration test

* use tx

* add retry
  • Loading branch information
0xOlias authored Nov 3, 2023
1 parent 2f69117 commit fb3a2a8
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 147 deletions.
5 changes: 5 additions & 0 deletions .changeset/loud-days-perform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ponder/core": patch
---

Fixed a bug where stale tables were left in the database after the service was stopped.
4 changes: 2 additions & 2 deletions packages/core/src/Ponder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,13 @@ export class Ponder {
)
);

await this.buildService.kill?.();
await this.buildService.kill();
this.uiService.kill();
this.indexingService.kill();
await this.serverService.kill();
await this.userStore.teardown();
await this.common.telemetry.kill();

await this.userStore.kill();
await this.eventStore.kill();

this.common.logger.debug({
Expand Down
37 changes: 36 additions & 1 deletion packages/core/src/_test/globalSetup.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { startProxy } from "@viem/anvil";
import dotenv from "dotenv";
import { Pool } from "pg";

import { FORK_BLOCK_NUMBER } from "./constants";

Expand All @@ -11,11 +12,45 @@ export default async function () {
throw new Error('Missing environment variable "ANVIL_FORK_URL"');
}

return await startProxy({
const shutdownProxy = await startProxy({
options: {
chainId: 1,
forkUrl: ANVIL_FORK_URL,
forkBlockNumber: FORK_BLOCK_NUMBER,
},
});

let cleanupDatabase: () => Promise<void>;
if (process.env.DATABASE_URL) {
cleanupDatabase = async () => {
const pool = new Pool({ connectionString: process.env.DATABASE_URL });

const schemaRows = await pool.query(`
SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname ~ '^vitest_pool_';
`);
const schemas = schemaRows.rows.map((r) => r.nspname) as string[];

for (const schema of schemas) {
const tableRows = await pool.query(`
SELECT table_name FROM information_schema.tables WHERE table_schema = '${schema}'
`);
const tables = tableRows.rows.map((r) => r.table_name) as string[];

for (const table of tables) {
await pool.query(
`DROP TABLE IF EXISTS "${schema}"."${table}" CASCADE`
);
}
await pool.query(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
console.log(`Dropped ${tables.length} tables from schema "${schema}".`);
}

await pool.end();
};
}

return async () => {
await shutdownProxy();
await cleanupDatabase?.();
};
}
18 changes: 12 additions & 6 deletions packages/core/src/_test/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,31 @@ beforeEach((context) => {
*/
export async function setupEventStore(
context: TestContext,
options = { skipMigrateUp: false }
options = { migrateUp: true }
) {
if (process.env.DATABASE_URL) {
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const databaseSchema = `vitest_pool_${process.pid}_${poolId}`;
context.eventStore = new PostgresEventStore({ pool, databaseSchema });

if (!options.skipMigrateUp) await context.eventStore.migrateUp();
if (options.migrateUp) await context.eventStore.migrateUp();

return async () => {
try {
await pool.query(`DROP SCHEMA IF EXISTS "${databaseSchema}" CASCADE`);
await context.eventStore.kill();
} catch (e) {
// This query fails in end-to-end tests where the pool has
// This fails in end-to-end tests where the pool has
// already been shut down during the Ponder instance kill() method.
// It's fine to ignore the error.
}
await context.eventStore.kill();
};
} else {
const rawSqliteDb = new SqliteDatabase(":memory:");
const db = patchSqliteDatabase({ db: rawSqliteDb });
context.eventStore = new SqliteEventStore({ db });

if (!options.skipMigrateUp) await context.eventStore.migrateUp();
if (options.migrateUp) await context.eventStore.migrateUp();

return async () => {
await context.eventStore.kill();
Expand Down Expand Up @@ -121,7 +121,13 @@ export async function setupUserStore(context: TestContext) {
}

return async () => {
await context.userStore.teardown();
try {
await context.userStore.kill();
} catch (e) {
// This fails in end-to-end tests where the pool has
// already been shut down during the Ponder instance kill() method.
// It's fine to ignore the error.
}
};
}

Expand Down
35 changes: 21 additions & 14 deletions packages/core/src/event-store/postgres/migrations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
rpcToPostgresTransaction,
} from "./format";

beforeEach((context) => setupEventStore(context, { skipMigrateUp: true }));
beforeEach((context) => setupEventStore(context, { migrateUp: false }));

const seed_2023_07_24_0_drop_finalized = async (db: Kysely<any>) => {
await db
Expand Down Expand Up @@ -60,20 +60,27 @@ const seed_2023_07_24_0_drop_finalized = async (db: Kysely<any>) => {
.execute();
};

test("2023_07_24_0_drop_finalized -> 2023_09_19_0_new_sync_design succeeds", async (context) => {
const { eventStore } = context;
test(
"2023_07_24_0_drop_finalized -> 2023_09_19_0_new_sync_design succeeds",
async (context) => {
const { eventStore } = context;

if (eventStore.kind !== "postgres") return;
if (eventStore.kind !== "postgres") return;

const { error } = await eventStore.migrator.migrateTo(
"2023_07_24_0_drop_finalized"
);
expect(error).toBeFalsy();
const { error } = await eventStore.migrator.migrateTo(
"2023_07_24_0_drop_finalized"
);

await seed_2023_07_24_0_drop_finalized(eventStore.db);
expect(error).toBeFalsy();

const { error: latestError } = await eventStore.migrator.migrateTo(
"2023_09_19_0_new_sync_design"
);
expect(latestError).toBeFalsy();
}, 15_000);
await seed_2023_07_24_0_drop_finalized(eventStore.db);

const { error: latestError } = await eventStore.migrator.migrateTo(
"2023_09_19_0_new_sync_design"
);
expect(latestError).toBeFalsy();
},
// This test is flaky. It seems like a Postgres isolation issue with our
// test setup. Annoying!
{ timeout: 15_000, retry: 3 }
);
32 changes: 14 additions & 18 deletions packages/core/src/event-store/sqlite/migrations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
rpcToSqliteTransaction,
} from "./format";

beforeEach((context) => setupEventStore(context, { skipMigrateUp: true }));
beforeEach((context) => setupEventStore(context, { migrateUp: false }));

const seed_2023_07_24_0_drop_finalized = async (db: Kysely<any>) => {
await db
Expand Down Expand Up @@ -60,24 +60,20 @@ const seed_2023_07_24_0_drop_finalized = async (db: Kysely<any>) => {
.execute();
};

test(
"2023_07_24_0_drop_finalized -> 2023_09_19_0_new_sync_design succeeds",
async (context) => {
const { eventStore } = context;
test("2023_07_24_0_drop_finalized -> 2023_09_19_0_new_sync_design succeeds", async (context) => {
const { eventStore } = context;

if (eventStore.kind !== "sqlite") return;
if (eventStore.kind !== "sqlite") return;

const { error } = await eventStore.migrator.migrateTo(
"2023_07_24_0_drop_finalized"
);
expect(error).toBeFalsy();
const { error } = await eventStore.migrator.migrateTo(
"2023_07_24_0_drop_finalized"
);
expect(error).toBeFalsy();

await seed_2023_07_24_0_drop_finalized(eventStore.db);
await seed_2023_07_24_0_drop_finalized(eventStore.db);

const { error: latestError } = await eventStore.migrator.migrateTo(
"2023_09_19_0_new_sync_design"
);
expect(latestError).toBeFalsy();
},
{}
);
const { error: latestError } = await eventStore.migrator.migrateTo(
"2023_09_19_0_new_sync_design"
);
expect(latestError).toBeFalsy();
});
5 changes: 0 additions & 5 deletions packages/core/src/server/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,6 @@ test("serves singular entity versioned at specified timestamp", async (context)
expect(testEntity.string).toBe("updated");

await service.kill();
await userStore.teardown();
});

test("serves plural entities versioned at specified timestamp", async (context) => {
Expand Down Expand Up @@ -1452,7 +1451,6 @@ test("serves plural entities versioned at specified timestamp", async (context)
]);

await service.kill();
await userStore.teardown();
});

test("derived field respects skip argument", async (context) => {
Expand Down Expand Up @@ -1485,7 +1483,6 @@ test("derived field respects skip argument", async (context) => {
});

await service.kill();
await userStore.teardown();
});

test("responds with appropriate status code pre and post historical sync", async (context) => {
Expand Down Expand Up @@ -1530,7 +1527,6 @@ test("responds with appropriate status code pre and post historical sync", async
});

await service.kill();
await userStore.teardown();
});

// This is a known limitation for now, which is that the timestamp version of entities
Expand Down Expand Up @@ -1592,5 +1588,4 @@ test.skip("serves derived entities versioned at provided timestamp", async (cont
});

await service.kill();
await userStore.teardown();
});
29 changes: 14 additions & 15 deletions packages/core/src/user-store/postgres/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,21 @@ export class PostgresUserStore implements UserStore {
});
};

/**
* Tears down the store by dropping all tables for the current schema.
*/
teardown = async () => {
if (!this.schema) return;
async kill() {
const entities = this.schema?.entities ?? [];
if (entities.length > 0) {
await this.db.transaction().execute(async (tx) => {
await Promise.all(
entities.map(async (model) => {
const tableName = `${model.name}_${this.versionId}`;
await tx.schema.dropTable(tableName).execute();
})
);
});
}

// Drop tables from existing schema.
await this.db.transaction().execute(async (tx) => {
await Promise.all(
this.schema!.entities.map((model) => {
const tableName = `${model.name}_${this.versionId}`;
tx.schema.dropTable(tableName);
})
);
});
};
await this.db.destroy();
}

findUnique = async ({
modelName,
Expand Down
29 changes: 14 additions & 15 deletions packages/core/src/user-store/sqlite/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,21 @@ export class SqliteUserStore implements UserStore {
});
};

/**
* Tears down the store by dropping all tables for the current schema.
*/
teardown = async () => {
if (!this.schema) return;
async kill() {
const entities = this.schema?.entities ?? [];
if (entities.length > 0) {
await this.db.transaction().execute(async (tx) => {
await Promise.all(
entities.map(async (model) => {
const tableName = `${model.name}_${this.versionId}`;
await tx.schema.dropTable(tableName).execute();
})
);
});
}

// Drop tables from existing schema.
await this.db.transaction().execute(async (tx) => {
await Promise.all(
this.schema!.entities.map((model) => {
const tableName = `${model.name}_${this.versionId}`;
tx.schema.dropTable(tableName);
})
);
});
};
await this.db.destroy();
}

findUnique = async ({
modelName,
Expand Down
Loading

1 comment on commit fb3a2a8

@vercel
Copy link

@vercel vercel bot commented on fb3a2a8 Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.