Skip to content

Commit

Permalink
fix: more onchainSchema() (#1247)
Browse files Browse the repository at this point in the history
* fix: more onchain schema

* test onchain schema

* chore: changeset

* cleanup
  • Loading branch information
kyscott18 authored Nov 15, 2024
1 parent 5a6de18 commit 214b7fa
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .changeset/silver-owls-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ponder/core": patch
---

Fixed `onchainSchema()` with pglite.
5 changes: 5 additions & 0 deletions .changeset/swift-oranges-wink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ponder/core": patch
---

Fixed crash recovery, specifically build ID generation.
21 changes: 13 additions & 8 deletions packages/core/src/build/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ const executeConfig = async (
const executeSchema = async (
buildService: Service,
): Promise<
{ status: "success"; schema: Schema } | { status: "error"; error: Error }
| { status: "success"; schema: Schema; contentHash: string }
| { status: "error"; error: Error }
> => {
const executeResult = await executeFile(buildService, {
file: buildService.common.options.schemaFile,
Expand All @@ -556,7 +557,15 @@ const executeSchema = async (

const schema = executeResult.exports;

return { status: "success", schema };
const contents = fs.readFileSync(
buildService.common.options.schemaFile,
"utf-8",
);
return {
status: "success",
schema,
contentHash: createHash("sha256").update(contents).digest("hex"),
};
};

const executeIndexingFunctions = async (
Expand Down Expand Up @@ -665,7 +674,7 @@ const executeApiRoutes = async (
const validateAndBuild = async (
{ common }: Pick<Service, "common">,
config: { config: Config; contentHash: string },
schema: { schema: Schema },
schema: { schema: Schema; contentHash: string },
indexingFunctions: {
indexingFunctions: RawIndexingFunctions;
contentHash: string;
Expand Down Expand Up @@ -713,11 +722,7 @@ const validateAndBuild = async (
const buildId = createHash("sha256")
.update(BUILD_ID_VERSION)
.update(config.contentHash)
.update(
createHash("sha256")
.update(serialize(buildSchemaResult.statements))
.digest("hex"),
)
.update(schema.contentHash)
.update(indexingFunctions.contentHash)
.digest("hex")
.slice(0, 10);
Expand Down
37 changes: 36 additions & 1 deletion packages/core/src/database/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { setupCommon, setupIsolatedDatabase } from "@/_test/setup.js";
import { buildSchema } from "@/build/schema.js";
import { onchainEnum, onchainTable, primaryKey } from "@/drizzle/index.js";
import {
onchainEnum,
onchainSchema,
onchainTable,
primaryKey,
} from "@/drizzle/index.js";
import { createRealtimeIndexingStore } from "@/indexing-store/realtime.js";
import {
encodeCheckpoint,
Expand Down Expand Up @@ -107,6 +112,36 @@ test("setup() creates tables", async (context) => {
await database.kill();
});

test("setup() with onchainSchema", async (context) => {
const schema = onchainSchema("multichain");
const account = schema.table("account", (t) => ({
address: t.hex().primaryKey(),
balance: t.bigint(),
}));

const database = createDatabase({
common: context.common,
schema: { schema, account },
databaseConfig: context.databaseConfig,
instanceId: "1234",
buildId: "abc",
...buildSchema({
schema: { schema, account },
instanceId: "1234",
}),
});

await database.setup();

const tableNames = await getUserTableNames(database, "multichain");
expect(tableNames).toContain("1234__account");
expect(tableNames).toContain("1234_reorg__account");
expect(tableNames).toContain("_ponder_meta");

await database.unlock();
await database.kill();
});

test("setup() succeeds with a prior app in the same namespace", async (context) => {
const database = createDatabase({
common: context.common,
Expand Down
21 changes: 14 additions & 7 deletions packages/core/src/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ export const createDatabase = (args: {
});
}
},
plugins: [new WithSchemaPlugin(args.namespace)],
}),
user: new HeadlessKysely({
name: "user",
Expand All @@ -181,6 +182,7 @@ export const createDatabase = (args: {
});
}
},
plugins: [new WithSchemaPlugin(args.namespace)],
}),
readonly: new HeadlessKysely({
name: "readonly",
Expand All @@ -193,6 +195,7 @@ export const createDatabase = (args: {
});
}
},
plugins: [new WithSchemaPlugin(args.namespace)],
}),
sync: new HeadlessKysely<PonderSyncSchema>({
name: "sync",
Expand Down Expand Up @@ -496,11 +499,13 @@ export const createDatabase = (args: {
// @ts-ignore
.selectFrom("information_schema.tables")
// @ts-ignore
.select("table_name")
.select(["table_name", "table_schema"])
// @ts-ignore
.where("table_name", "=", "namespace_lock")
// @ts-ignore
.where("table_schema", "=", "ponder")
.executeTakeFirst()
.then((table) => table?.table_name === "namespace_lock");
.then((table) => table !== undefined);

if (hasNamespaceLockTable) {
await qb.internal.wrap({ method: "migrate" }, async () => {
Expand Down Expand Up @@ -564,11 +569,13 @@ export const createDatabase = (args: {
// @ts-ignore
.selectFrom("information_schema.tables")
// @ts-ignore
.select("table_name")
.select(["table_name", "table_schema"])
// @ts-ignore
.where("table_name", "=", "_ponder_meta")
// @ts-ignore
.where("table_schema", "=", args.namespace)
.executeTakeFirst()
.then((table) => table?.table_name === "_ponder_meta");
.then((table) => table !== undefined);

if (hasPonderMetaTable) {
await qb.internal.wrap({ method: "migrate" }, () =>
Expand Down Expand Up @@ -1157,13 +1164,13 @@ CREATE OR REPLACE FUNCTION ${tableName.triggerFn}
RETURNS TRIGGER AS $$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO "${tableName.reorg}" (${columnNames.join(",")}, operation, checkpoint)
INSERT INTO "${args.namespace}"."${tableName.reorg}" (${columnNames.join(",")}, operation, checkpoint)
VALUES (${columnNames.map((name) => `NEW.${name}`).join(",")}, 0, '${encodeCheckpoint(maxCheckpoint)}');
ELSIF TG_OP = 'UPDATE' THEN
INSERT INTO "${tableName.reorg}" (${columnNames.join(",")}, operation, checkpoint)
INSERT INTO "${args.namespace}"."${tableName.reorg}" (${columnNames.join(",")}, operation, checkpoint)
VALUES (${columnNames.map((name) => `OLD.${name}`).join(",")}, 1, '${encodeCheckpoint(maxCheckpoint)}');
ELSIF TG_OP = 'DELETE' THEN
INSERT INTO "${tableName.reorg}" (${columnNames.join(",")}, operation, checkpoint)
INSERT INTO "${args.namespace}"."${tableName.reorg}" (${columnNames.join(",")}, operation, checkpoint)
VALUES (${columnNames.map((name) => `OLD.${name}`).join(",")}, 2, '${encodeCheckpoint(maxCheckpoint)}');
END IF;
RETURN NULL;
Expand Down

0 comments on commit 214b7fa

Please sign in to comment.