Skip to content

Commit

Permalink
Fix zero downtime deployment bug & improve graceful shutdown (#109)
Browse files Browse the repository at this point in the history
* feat: add entity instance id namespacing

* fix: relationship type resolver

* feat: improve graceful shutdown

* test: update tests with new entity name scheme

* chore: changeset
  • Loading branch information
0xOlias authored Feb 26, 2023
1 parent 73bd492 commit 1563946
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 215 deletions.
5 changes: 5 additions & 0 deletions .changeset/shy-wombats-invite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ponder/core": patch
---

Fixed zero downtime deployment bug
9 changes: 7 additions & 2 deletions packages/core/src/Ponder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,15 @@ export class Ponder extends EventEmitter<PonderEvents> {
unmount();
clearInterval(this.renderInterval);
clearInterval(this.etaInterval);
this.handlerQueue?.kill();

this.killFrontfillQueues?.();
this.killBackfillQueues?.();

this.handlerQueue?.kill();
delete this.handlerQueue;

await this.server.teardown();
await this.entityStore.teardown();
await this.killWatchers?.();
}

Expand Down Expand Up @@ -224,7 +229,7 @@ export class Ponder extends EventEmitter<PonderEvents> {
}

async resetEntityStore() {
await this.entityStore.migrate(this.schema);
await this.entityStore.load(this.schema);
}

async reload() {
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/bin/ponder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { cac } from "cac";
import dotenv from "dotenv";

import { buildOptions } from "@/common/options";
import { registerKilledProcessListener } from "@/common/utils";
import { buildPonderConfig } from "@/config/buildPonderConfig";
import { Ponder } from "@/Ponder";

Expand Down Expand Up @@ -43,6 +44,7 @@ cli
const options = buildOptions({ ...cliOptions, logType: "dev" });
const config = await buildPonderConfig(options);
const ponder = new Ponder({ options, config });
registerKilledProcessListener(() => ponder.kill());
await ponder.dev();
});

Expand All @@ -54,6 +56,7 @@ cli
const options = buildOptions({ ...cliOptions, logType: "start" });
const config = await buildPonderConfig(options);
const ponder = new Ponder({ options, config });
registerKilledProcessListener(() => ponder.kill());
await ponder.start();
});

Expand All @@ -65,6 +68,7 @@ cli
const options = buildOptions({ ...cliOptions, logType: "codegen" });
const config = await buildPonderConfig(options);
const ponder = new Ponder({ options, config });
registerKilledProcessListener(() => ponder.kill());
ponder.codegen();
});

Expand Down
15 changes: 15 additions & 0 deletions packages/core/src/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ export const groupBy = <T>(array: T[], fn: (item: T) => string | number) => {
}, {});
};

export const registerKilledProcessListener = (fn: () => Promise<unknown>) => {
let calledCount = 0;

const listener = async () => {
calledCount++;
if (calledCount > 1) return;
await fn();
process.exit(0);
};

process.on("SIGINT", listener); // CTRL+C
process.on("SIGQUIT", listener); // Keyboard quit
process.on("SIGTERM", listener); // `kill` command
};

export const startBenchmark = () => process.hrtime();
export const endBenchmark = (hrt: [number, number]) => {
const diffHrt = process.hrtime(hrt);
Expand Down
20 changes: 9 additions & 11 deletions packages/core/src/db/entity/entityStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,37 +26,35 @@ type Entity = Record<string, unknown>;
type MaybePromise<T> = T | Promise<T>;

export interface EntityStore {
migrate(schema?: Schema): MaybePromise<void>;
load(schema?: Schema): MaybePromise<void>;
teardown(): MaybePromise<void>;

getEntity(entityName: string, id: string): MaybePromise<Entity | null>;
getEntity(entityId: string, id: string): MaybePromise<Entity | null>;

insertEntity(
entityName: string,
entityId: string,
id: string,
instance: Entity
): MaybePromise<Entity>;

upsertEntity(
entityName: string,
entityId: string,
id: string,
instance: Entity
): MaybePromise<Entity>;

updateEntity(
entityName: string,
entityId: string,
id: string,
instance: Partial<Entity>
): MaybePromise<Entity>;

deleteEntity(entityName: string, id: string): MaybePromise<boolean>;
deleteEntity(entityId: string, id: string): MaybePromise<boolean>;

getEntities(
entityName: string,
filter?: EntityFilter
): MaybePromise<Entity[]>;
getEntities(entityId: string, filter?: EntityFilter): MaybePromise<Entity[]>;

getEntityDerivedField(
entityName: string,
entityId: string,
id: string,
derivedFieldName: string
): MaybePromise<unknown[]>;
Expand Down
130 changes: 71 additions & 59 deletions packages/core/src/db/entity/postgresEntityStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,55 @@ export class PostgresEntityStore implements EntityStore {
};
};

async migrate(schema?: Schema) {
if (!schema) return;
this.schema = schema;

this.schema.entities.forEach(async (entity) => {
// Drop the table if it already exists
await this.pool.query(`DROP TABLE IF EXISTS "${entity.name}"`);

// Build the create table statement using field migration fragments.
// TODO: Update this so the generation of the field migration fragments happens here
// instead of when the Schema gets built.
const columnStatements = entity.fields
.filter(
// This type guard is wrong, could actually be any FieldKind that's not derived (obvs)
(field): field is ScalarField => field.kind !== FieldKind.DERIVED
)
.map((field) => field.migrateUpStatement);
async teardown() {
if (!this.schema) return;

await Promise.all(
this.schema.entities.map(async (entity) => {
await this.pool.query(`DROP TABLE IF EXISTS "${entity.id}"`);
})
);
}

await this.pool.query(
`CREATE TABLE "${entity.name}" (${columnStatements.join(", ")})`
);
});
async load(newSchema?: Schema) {
if (!newSchema) return;

// If there is an existing schema, this is a hot reload and the existing entity tables should be dropped.
if (this.schema) {
await this.teardown();
}

this.schema = newSchema;

await Promise.all(
this.schema.entities.map(async (entity) => {
// Build the create table statement using field migration fragments.
// TODO: Update this so the generation of the field migration fragments happens here
// instead of when the Schema gets built.
const columnStatements = entity.fields
.filter(
// This type guard is wrong, could actually be any FieldKind that's not derived (obvs)
(field): field is ScalarField => field.kind !== FieldKind.DERIVED
)
.map((field) => field.migrateUpStatement);

await this.pool.query(
`CREATE TABLE "${entity.id}" (${columnStatements.join(", ")})`
);
})
);
}

getEntity = this.errorWrapper(async (entityName: string, id: string) => {
const statement = `SELECT "${entityName}".* FROM "${entityName}" WHERE "${entityName}"."id" = $1`;
getEntity = this.errorWrapper(async (entityId: string, id: string) => {
const statement = `SELECT "${entityId}".* FROM "${entityId}" WHERE "${entityId}"."id" = $1`;
const { rows, rowCount } = await this.pool.query(statement, [id]);

if (rowCount === 0) return null;
return this.deserialize(entityName, rows[0]);
return this.deserialize(entityId, rows[0]);
});

insertEntity = this.errorWrapper(
async (
entityName: string,
id: string,
instance: Record<string, unknown>
) => {
async (entityId: string, id: string, instance: Record<string, unknown>) => {
// If `instance.id` is defined, replace it with the id passed as a parameter.
// Should also log a warning here.
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
Expand All @@ -88,19 +99,15 @@ export class PostgresEntityStore implements EntityStore {
.map((_, idx) => `$${idx + 1}`)
.join(", ")})`;

const statement = `INSERT INTO "${entityName}" ${insertFragment} RETURNING *`;
const statement = `INSERT INTO "${entityId}" ${insertFragment} RETURNING *`;
const { rows } = await this.pool.query(statement, insertValues);

return this.deserialize(entityName, rows[0]);
return this.deserialize(entityId, rows[0]);
}
);

updateEntity = this.errorWrapper(
async (
entityName: string,
id: string,
instance: Record<string, unknown>
) => {
async (entityId: string, id: string, instance: Record<string, unknown>) => {
const pairs = getColumnValuePairs(instance);

const updatePairs = pairs.filter(({ column }) => column !== "id");
Expand All @@ -109,22 +116,18 @@ export class PostgresEntityStore implements EntityStore {
.map(({ column }, idx) => `${column} = $${idx + 1}`)
.join(", ");

const statement = `UPDATE "${entityName}" SET ${updateFragment} WHERE "id" = $${
const statement = `UPDATE "${entityId}" SET ${updateFragment} WHERE "id" = $${
updatePairs.length + 1
} RETURNING *`;
updateValues.push(id);
const { rows } = await this.pool.query(statement, updateValues);

return this.deserialize(entityName, rows[0]);
return this.deserialize(entityId, rows[0]);
}
);

upsertEntity = this.errorWrapper(
async (
entityName: string,
id: string,
instance: Record<string, unknown>
) => {
async (entityId: string, id: string, instance: Record<string, unknown>) => {
// If `instance.id` is defined, replace it with the id passed as a parameter.
// Should also log a warning here.
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
Expand All @@ -148,25 +151,25 @@ export class PostgresEntityStore implements EntityStore {
)
.join(", ");

const statement = `INSERT INTO "${entityName}" ${insertFragment} ON CONFLICT("id") DO UPDATE SET ${updateFragment} RETURNING *`;
const statement = `INSERT INTO "${entityId}" ${insertFragment} ON CONFLICT("id") DO UPDATE SET ${updateFragment} RETURNING *`;
const { rows } = await this.pool.query(statement, [
...insertValues,
...updateValues,
]);

return this.deserialize(entityName, rows[0]);
return this.deserialize(entityId, rows[0]);
}
);

deleteEntity = this.errorWrapper(async (entityName: string, id: string) => {
const statement = `DELETE FROM "${entityName}" WHERE "id" = $1`;
deleteEntity = this.errorWrapper(async (entityId: string, id: string) => {
const statement = `DELETE FROM "${entityId}" WHERE "id" = $1`;
const { rowCount } = await this.pool.query(statement, [id]);

return rowCount === 1;
});

getEntities = this.errorWrapper(
async (entityName: string, filter?: EntityFilter) => {
async (entityId: string, filter?: EntityFilter) => {
const where = filter?.where;
const first = filter?.first;
const skip = filter?.skip;
Expand Down Expand Up @@ -214,18 +217,18 @@ export class PostgresEntityStore implements EntityStore {
fragments.push(`OFFSET ${skip}`);
}

const statement = `SELECT * FROM "${entityName}" ${fragments.join(" ")}`;
const statement = `SELECT * FROM "${entityId}" ${fragments.join(" ")}`;
const { rows } = await this.pool.query(statement);

return rows.map((instance) => this.deserialize(entityName, instance));
return rows.map((instance) => this.deserialize(entityId, instance));
}
);

getEntityDerivedField = this.errorWrapper(
async (entityName: string, id: string, derivedFieldName: string) => {
const entity = this.schema?.entityByName[entityName];
async (entityId: string, instanceId: string, derivedFieldName: string) => {
const entity = this.schema?.entities.find((e) => e.id === entityId);
if (!entity) {
throw new Error(`Entity not found in schema: ${entityName}`);
throw new Error(`Entity not found in schema for ID: ${entityId}`);
}

const derivedField = entity.fields.find(
Expand All @@ -235,15 +238,24 @@ export class PostgresEntityStore implements EntityStore {

if (!derivedField) {
throw new Error(
`Derived field not found: ${entityName}.${derivedFieldName}`
`Derived field not found: ${entity.name}.${derivedFieldName}`
);
}

const derivedFromEntity = this.schema?.entities.find(
(e) => e.name === derivedField.derivedFromEntityName
);
if (!derivedFromEntity) {
throw new Error(
`Entity not found in schema for name: ${derivedField.derivedFromEntityName}`
);
}

const derivedFieldInstances = await this.getEntities(
derivedField.derivedFromEntityName,
derivedFromEntity.id,
{
where: {
[derivedField.derivedFromFieldName]: id,
[derivedField.derivedFromFieldName]: instanceId,
},
}
);
Expand All @@ -252,14 +264,14 @@ export class PostgresEntityStore implements EntityStore {
}
);

deserialize = (entityName: string, instance: Record<string, unknown>) => {
deserialize = (entityId: string, instance: Record<string, unknown>) => {
if (!this.schema) {
throw new Error(`EntityStore has not been initialized with a schema yet`);
}

const entity = this.schema.entityByName[entityName];
const entity = this.schema?.entities.find((e) => e.id === entityId);
if (!entity) {
throw new Error(`Entity not found in schema: ${entityName}`);
throw new Error(`Entity not found in schema for ID: ${entityId}`);
}

const deserializedInstance = { ...instance };
Expand Down
Loading

1 comment on commit 1563946

@vercel
Copy link

@vercel vercel bot commented on 1563946 Feb 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.