Skip to content

Commit

Permalink
Checkpoint -- testing new schemas for reading and writing
Browse files Browse the repository at this point in the history
  • Loading branch information
bankisan committed Dec 11, 2023
1 parent c869ebd commit 8efb06e
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 107 deletions.
49 changes: 38 additions & 11 deletions packages/core/src/_test/setup.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { randomBytes } from "crypto";
import { beforeEach, type TestContext } from "vitest";

import { buildOptions } from "@/config/options.js";
Expand Down Expand Up @@ -106,27 +107,53 @@ export async function setupSyncStore(
*/
export async function setupIndexingStore(context: TestContext) {
if (process.env.DATABASE_URL) {
const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
const testClient = new pg.Client({
connectionString: process.env.DATABASE_URL,
});
await testClient.connect();
// Create a random database to isolate the tests.
const databaseSchema = `vitest_pool_${process.pid}_${randomBytes(
10,
).toString("hex")}`;

const dbURL = new URL(process.env.DATABASE_URL);
dbURL.pathname = `/${databaseSchema}`;
const pool = new pg.Pool({
connectionString: dbURL.toString(),
});

await testClient.query(`CREATE DATABASE "${databaseSchema}"`);
context.indexingStore = new PostgresIndexingStore({
common: context.common,
pool,
});

return async () => {
try {
await context.indexingStore.kill();
await testClient.query(`DROP DATABASE "${databaseSchema}"`);
await testClient.end();
} catch (e) {
// This fails in end-to-end tests where the pool has
// already been shut down during the Ponder instance kill() method.
// It's fine to ignore the error.
}
};
} else {
context.indexingStore = new SqliteIndexingStore({
common: context.common,
file: ":memory:",
});
return async () => {
try {
await context.indexingStore.kill();
} catch (e) {
// This fails in end-to-end tests where the pool has
// already been shut down during the Ponder instance kill() method.
// It's fine to ignore the error.
}
};
}

return async () => {
try {
await context.indexingStore.kill();
} catch (e) {
// This fails in end-to-end tests where the pool has
// already been shut down during the Ponder instance kill() method.
// It's fine to ignore the error.
}
};
}

/**
Expand Down
182 changes: 86 additions & 96 deletions packages/core/src/indexing-store/postgres/store.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
import {
CompiledQuery,
Kysely,
PostgresDialect,
sql,
WithSchemaPlugin,
} from "kysely";
import { Kysely, PostgresDialect, sql, WithSchemaPlugin } from "kysely";

import type { Common } from "@/Ponder.js";
import type { Scalar, Schema } from "@/schema/types.js";
Expand Down Expand Up @@ -41,89 +35,42 @@ export class PostgresIndexingStore implements IndexingStore {
private common: Common;

db: Kysely<any>;
readerDB: Kysely<any>;
writerDB: Kysely<any>;

schema?: Schema;
indexNamespace: string;
hasConnected: boolean = false;
isReadOnly: boolean = false;
namespaceVersion: string;

constructor({
common,
pool,
isReadOnly = false,
}: {
common: Common;
pool: Pool;
isReadOnly?: boolean;
}) {
this.indexNamespace = `ponder_index_${new Date().getTime()}`;
this.namespaceVersion = `ponder_index_${new Date().getTime()}`;
this.common = common;
this.isReadOnly = isReadOnly;
this.db = new Kysely({
dialect: new PostgresDialect({
pool,
onCreateConnection: async (connection) => {
if (this.hasConnected) {
return;
}

if (isReadOnly) {
const result = await connection.executeQuery(
// Finds the latest namespace which should have been created by the indexer writer.
CompiledQuery.raw(
`SELECT nspname FROM pg_namespace WHERE nspname LIKE 'ponder_index_%' ORDER BY nspname DESC LIMIT 1`,
),
);
if (result.rows.length === 0) {
throw new Error("No namespace found");
}
const namespace = (result.rows as { nspname: string }[])[0]
.nspname as string;

this.indexNamespace = namespace;
// This is only safe in a readonly context. Setting this modifies the session's search path which can
// modify the behavior of non-indexing queries.
await connection.executeQuery(
CompiledQuery.raw(`SET search_path = ${this.indexNamespace}`),
);
} else {
await connection.executeQuery(
CompiledQuery.raw(
`CREATE SCHEMA IF NOT EXISTS ${this.indexNamespace}`,
),
);
}

this.common.logger.debug({
msg: `Connected to namespace: ${this.indexNamespace}`,
service: "indexing",
});
this.hasConnected = true;
},
}),
});
if (isReadOnly) {
// This is a hack to stop readonly connections from connecting to a namespace using the Kysely plugin as readonly
// connections set their namespace using the search_path setter in the onCreateConnection hook.
return;
}

this.db = this.db.withPlugin(new WithSchemaPlugin(this.indexNamespace));
this.readerDB = this.db.withPlugin(new WithSchemaPlugin("public"));
this.writerDB = this.db.withPlugin(
new WithSchemaPlugin(this.namespaceVersion),
);
}

async kill() {
return this.wrap({ method: "kill" }, async () => {
if (!this.isReadOnly) {
// Clean up all the tables and data created by this instance by dropping the schema that was created.
await this.db.executeQuery(
CompiledQuery.raw(
`DROP SCHEMA IF EXISTS ${this.indexNamespace} CASCADE`,
),
);
}

try {
await this.db.destroy();
await Promise.all([
this.writerDB.destroy(),
this.readerDB.destroy(),
this.db.destroy(),
]);
} catch (e) {
const error = e as Error;
if (error.message !== "Called end on pool more than once") {
Expand All @@ -133,6 +80,32 @@ export class PostgresIndexingStore implements IndexingStore {
});
}

publish = async () => {
return this.wrap({ method: "publish" }, async () => {
if (!this.schema) return;

await this.readerDB.transaction().execute(async (tx) => {
// Drop all previous schemas. This will delete all previous views.
// TODO: Test this by creating an older schema and ensure it is removed.

// Create all the view tables.
await Promise.all(
Object.entries(this.schema!.tables).map(async ([tableName]) => {
const viewBuilder = tx.schema
.createView(tableName)
.as(
tx
.withSchema(this.namespaceVersion)
.selectFrom(tableName)
.selectAll(),
);
await viewBuilder.execute();
}),
);
});
});
};

/**
* Resets the database by dropping existing tables and creating new tables.
* If no new schema is provided, the existing schema is used.
Expand All @@ -147,7 +120,13 @@ export class PostgresIndexingStore implements IndexingStore {
// Set the new schema.
if (schema) this.schema = schema;

await this.db.transaction().execute(async (tx) => {
await this.writerDB.transaction().execute(async (tx) => {
await tx.schema.dropSchema(this.namespaceVersion).ifExists().execute();
await tx.schema
.createSchema(this.namespaceVersion)
.ifNotExists()
.execute();

// Create tables for new schema.
await Promise.all(
Object.entries(this.schema!.tables).map(
Expand Down Expand Up @@ -217,12 +196,17 @@ export class PostgresIndexingStore implements IndexingStore {
),
);
});

this.common.logger.debug({
msg: `Connected to namespace: ${this.namespaceVersion}`,
service: "indexing",
});
});
};

revert = async ({ safeCheckpoint }: { safeCheckpoint: Checkpoint }) => {
return this.wrap({ method: "revert" }, async () => {
await this.db.transaction().execute(async (tx) => {
await this.writerDB.transaction().execute(async (tx) => {
await Promise.all(
Object.keys(this.schema?.tables ?? {}).map(async (tableName) => {
const encodedCheckpoint = encodeCheckpoint(safeCheckpoint);
Expand Down Expand Up @@ -261,7 +245,7 @@ export class PostgresIndexingStore implements IndexingStore {
encodeBigInts: false,
});

let query = this.db
let query = this.readerDB
.selectFrom(tableName)
.selectAll()
.where("id", "=", formattedId);
Expand Down Expand Up @@ -303,7 +287,7 @@ export class PostgresIndexingStore implements IndexingStore {
orderBy?: OrderByInput<any>;
}) => {
return this.wrap({ method: "findMany", tableName }, async () => {
let query = this.db.selectFrom(tableName).selectAll();
let query = this.readerDB.selectFrom(tableName).selectAll();

if (checkpoint === "latest") {
query = query.where("effectiveToCheckpoint", "=", "latest");
Expand Down Expand Up @@ -371,7 +355,7 @@ export class PostgresIndexingStore implements IndexingStore {
const createRow = formatRow({ id, ...data }, false);
const encodedCheckpoint = encodeCheckpoint(checkpoint);

const row = await this.db
const row = await this.writerDB
.insertInto(tableName)
.values({
...createRow,
Expand Down Expand Up @@ -409,7 +393,11 @@ export class PostgresIndexingStore implements IndexingStore {

const rows = await Promise.all(
chunkedRows.map((c) =>
this.db.insertInto(tableName).values(c).returningAll().execute(),
this.writerDB
.insertInto(tableName)
.values(c)
.returningAll()
.execute(),
),
);

Expand Down Expand Up @@ -437,7 +425,7 @@ export class PostgresIndexingStore implements IndexingStore {
});
const encodedCheckpoint = encodeCheckpoint(checkpoint);

const row = await this.db.transaction().execute(async (tx) => {
const row = await this.writerDB.transaction().execute(async (tx) => {
// Find the latest version of this instance.
const latestRow = await tx
.selectFrom(tableName)
Expand Down Expand Up @@ -520,7 +508,7 @@ export class PostgresIndexingStore implements IndexingStore {
return this.wrap({ method: "updateMany", tableName }, async () => {
const encodedCheckpoint = encodeCheckpoint(checkpoint);

const rows = await this.db.transaction().execute(async (tx) => {
const rows = await this.writerDB.transaction().execute(async (tx) => {
// Get all IDs that match the filter.
let latestRowsQuery = tx
.selectFrom(tableName)
Expand Down Expand Up @@ -628,7 +616,7 @@ export class PostgresIndexingStore implements IndexingStore {
const createRow = formatRow({ id, ...create }, false);
const encodedCheckpoint = encodeCheckpoint(checkpoint);

const row = await this.db.transaction().execute(async (tx) => {
const row = await this.writerDB.transaction().execute(async (tx) => {
// Find the latest version of this instance.
const latestRow = await tx
.selectFrom(tableName)
Expand Down Expand Up @@ -722,31 +710,33 @@ export class PostgresIndexingStore implements IndexingStore {
});
const encodedCheckpoint = encodeCheckpoint(checkpoint);

const isDeleted = await this.db.transaction().execute(async (tx) => {
// If the latest version has effectiveFromCheckpoint equal to current checkpoint,
// this row was created within the same indexing function, and we can delete it.
let deletedRow = await tx
.deleteFrom(tableName)
.where("id", "=", formattedId)
.where("effectiveFromCheckpoint", "=", encodedCheckpoint)
.where("effectiveToCheckpoint", "=", "latest")
.returning(["id"])
.executeTakeFirst();

// If we did not take the shortcut above, update the latest record
// setting effectiveToCheckpoint to the current checkpoint.
if (!deletedRow) {
deletedRow = await tx
.updateTable(tableName)
.set({ effectiveToCheckpoint: encodedCheckpoint })
const isDeleted = await this.writerDB
.transaction()
.execute(async (tx) => {
// If the latest version has effectiveFromCheckpoint equal to current checkpoint,
// this row was created within the same indexing function, and we can delete it.
let deletedRow = await tx
.deleteFrom(tableName)
.where("id", "=", formattedId)
.where("effectiveFromCheckpoint", "=", encodedCheckpoint)
.where("effectiveToCheckpoint", "=", "latest")
.returning(["id"])
.executeTakeFirst();
}

return !!deletedRow;
});
// If we did not take the shortcut above, update the latest record
// setting effectiveToCheckpoint to the current checkpoint.
if (!deletedRow) {
deletedRow = await tx
.updateTable(tableName)
.set({ effectiveToCheckpoint: encodedCheckpoint })
.where("id", "=", formattedId)
.where("effectiveToCheckpoint", "=", "latest")
.returning(["id"])
.executeTakeFirst();
}

return !!deletedRow;
});

return isDeleted;
});
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/indexing-store/sqlite/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ export class SqliteIndexingStore implements IndexingStore {
});
}

async publish() {
// Implements the interface.
}

/**
* Resets the database by dropping existing tables and creating new tables.
* If no new schema is provided, the existing schema is used.
Expand Down
Loading

0 comments on commit 8efb06e

Please sign in to comment.