From 72fd3fda24919f14218cbbf02a552dfcf4d8ea52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Wed, 21 Feb 2024 15:48:00 -0600 Subject: [PATCH] feat: shutdown gracefully after finishing replaying blocks (#315) * feat: shutdown gracefully after finishing replaying blocks * test: shutdown --- package-lock.json | 14 +++---- package.json | 2 +- src/env.ts | 12 ++++++ src/ordhook/server.ts | 11 +++-- tests/ordhook/replay.test.ts | 78 ++++++++++++++++++++++++++++++++++++ tests/setup.ts | 1 + 6 files changed, 106 insertions(+), 12 deletions(-) create mode 100644 tests/ordhook/replay.test.ts diff --git a/package-lock.json b/package-lock.json index 869f446c..28f91c25 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,7 +14,7 @@ "@fastify/multipart": "^7.1.0", "@fastify/swagger": "^8.3.1", "@fastify/type-provider-typebox": "^3.2.0", - "@hirosystems/api-toolkit": "^1.3.1", + "@hirosystems/api-toolkit": "^1.4.0", "@hirosystems/chainhook-client": "^1.7.0", "@semantic-release/changelog": "^6.0.3", "@semantic-release/commit-analyzer": "^10.0.4", @@ -1278,9 +1278,9 @@ } }, "node_modules/@hirosystems/api-toolkit": { - "version": "1.3.1", - "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.3.1.tgz", - "integrity": "sha512-uUOqWcJlaxnlW30RyZ1UdidzFy29esd4bG0UxwnsJH+M+qtvV4V/NaHLRUFbnJkoF5b6vckZh1mldyBNY7aL1Q==", + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.4.0.tgz", + "integrity": "sha512-n1LF5roEQ7LkfAvKw0Wucmbo+XhvQBp4ED9N/AyD76/wHQeU59nocDkVAoKSJzWzzCfHa2+G32d3zB3m6oMbIQ==", "dependencies": { "@fastify/cors": "^8.0.0", "@fastify/swagger": "^8.3.1", @@ -19728,9 +19728,9 @@ "requires": {} }, "@hirosystems/api-toolkit": { - "version": "1.3.1", - "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.3.1.tgz", - "integrity": "sha512-uUOqWcJlaxnlW30RyZ1UdidzFy29esd4bG0UxwnsJH+M+qtvV4V/NaHLRUFbnJkoF5b6vckZh1mldyBNY7aL1Q==", + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.4.0.tgz", + "integrity": "sha512-n1LF5roEQ7LkfAvKw0Wucmbo+XhvQBp4ED9N/AyD76/wHQeU59nocDkVAoKSJzWzzCfHa2+G32d3zB3m6oMbIQ==", "requires": { "@fastify/cors": "^8.0.0", "@fastify/swagger": "^8.3.1", diff --git a/package.json b/package.json index 1ac289fb..08d55d38 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,7 @@ "@fastify/multipart": "^7.1.0", "@fastify/swagger": "^8.3.1", "@fastify/type-provider-typebox": "^3.2.0", - "@hirosystems/api-toolkit": "^1.3.1", + "@hirosystems/api-toolkit": "^1.4.0", "@hirosystems/chainhook-client": "^1.7.0", "@semantic-release/changelog": "^6.0.3", "@semantic-release/commit-analyzer": "^10.0.4", diff --git a/src/env.ts b/src/env.ts index 49638f54..21aeef65 100644 --- a/src/env.ts +++ b/src/env.ts @@ -41,6 +41,18 @@ const schema = Type.Object({ * if you're configuring your predicates manually for any reason. */ ORDHOOK_AUTO_PREDICATE_REGISTRATION: Type.Boolean({ default: true }), + /** + * Ordhook ingestion mode. Controls the API's Ordhook payload ingestion behavior: + * * `default`: The API will stay running and will listen for payloads indefinitely + * * `replay`: The API will stay running and listening only for payloads marked as "not streaming" + * by Ordhook (historical replays). Once Ordhook starts streaming recent blocks from its chain + * tip, the API will shut down. Recommended for deployments meant to sync the ordinals chain + * from genesis. + */ + ORDHOOK_INGESTION_MODE: Type.Enum( + { default: 'default', replay: 'replay' }, + { default: 'default' } + ), PGHOST: Type.String(), PGPORT: Type.Number({ default: 5432, minimum: 0, maximum: 65535 }), diff --git a/src/ordhook/server.ts b/src/ordhook/server.ts index 057efd20..a414da32 100644 --- a/src/ordhook/server.ts +++ b/src/ordhook/server.ts @@ -8,7 +8,7 @@ import { ServerOptions, ServerPredicate, } from '@hirosystems/chainhook-client'; -import { logger } from '@hirosystems/api-toolkit'; +import { logger, shutdown } from '@hirosystems/api-toolkit'; export const ORDHOOK_BASE_PATH = `http://${ENV.ORDHOOK_NODE_RPC_HOST}:${ENV.ORDHOOK_NODE_RPC_PORT}`; export const PREDICATE_UUID = randomUUID(); @@ -55,10 +55,13 @@ export async function startOrdhookServer(args: { db: PgStore }): Promise { + const streamed = payload.chainhook.is_streaming_blocks; + if (ENV.ORDHOOK_INGESTION_MODE === 'replay' && streamed) { + logger.info(`OrdhookServer finished replaying blocks, shutting down`); + return shutdown(); + } logger.info( - `OrdhookServer received ${ - payload.chainhook.is_streaming_blocks ? 'streamed' : 'replay' - } payload from predicate ${uuid}` + `OrdhookServer received ${streamed ? 'streamed' : 'replay'} payload from predicate ${uuid}` ); await args.db.updateInscriptions(payload); }); diff --git a/tests/ordhook/replay.test.ts b/tests/ordhook/replay.test.ts new file mode 100644 index 00000000..1e07d625 --- /dev/null +++ b/tests/ordhook/replay.test.ts @@ -0,0 +1,78 @@ +import { runMigrations } from '@hirosystems/api-toolkit'; +import { ChainhookEventObserver } from '@hirosystems/chainhook-client'; +import { buildApiServer } from '../../src/api/init'; +import { ENV } from '../../src/env'; +import { startOrdhookServer } from '../../src/ordhook/server'; +import { PgStore, MIGRATIONS_DIR } from '../../src/pg/pg-store'; +import { TestChainhookPayloadBuilder, TestFastifyServer } from '../helpers'; + +describe('Replay', () => { + let db: PgStore; + let server: ChainhookEventObserver; + let fastify: TestFastifyServer; + + beforeEach(async () => { + await runMigrations(MIGRATIONS_DIR, 'up'); + ENV.ORDHOOK_AUTO_PREDICATE_REGISTRATION = false; + ENV.ORDHOOK_INGESTION_MODE = 'replay'; + db = await PgStore.connect({ skipMigrations: true }); + server = await startOrdhookServer({ db }); + fastify = await buildApiServer({ db }); + }); + + test('shuts down when streaming on replay mode', async () => { + const payload1 = new TestChainhookPayloadBuilder() + .apply() + .block({ + height: 767430, + hash: '0x163de66dc9c0949905bfe8e148bde04600223cf88d19f26fdbeba1d6e6fa0f88', + timestamp: 1676913207, + }) + .transaction({ + hash: '0x0268dd9743c862d80ab02cb1d0228036cfe172522850eb96be60cfee14b31fb8', + }) + .inscriptionRevealed({ + content_bytes: '0x303030303030303030303030', + content_type: 'text/plain;charset=utf-8', + content_length: 12, + inscription_number: { classic: 0, jubilee: 0 }, + inscription_fee: 3425, + inscription_output_value: 10000, + inscription_id: '0268dd9743c862d80ab02cb1d0228036cfe172522850eb96be60cfee14b31fb8i0', + inscriber_address: 'bc1p3cyx5e2hgh53w7kpxcvm8s4kkega9gv5wfw7c4qxsvxl0u8x834qf0u2td', + ordinal_number: 125348773618236, + ordinal_block_height: 566462, + ordinal_offset: 0, + satpoint_post_inscription: + '0x0268dd9743c862d80ab02cb1d0228036cfe172522850eb96be60cfee14b31fb8:0:0', + inscription_input_index: 0, + transfers_pre_inscription: 0, + tx_index: 0, + curse_type: null, + inscription_pointer: null, + delegate: null, + metaprotocol: null, + metadata: null, + parent: null, + }) + .build(); + + const mockExit = jest.spyOn(process, 'exit').mockImplementation(); + const response = await server['fastify'].inject({ + method: 'POST', + url: `/payload`, + headers: { authorization: `Bearer ${ENV.ORDHOOK_NODE_AUTH_TOKEN}` }, + payload: payload1, + }); + expect(response.statusCode).toBe(200); + expect(mockExit).toHaveBeenCalled(); + mockExit.mockRestore(); + }); + + afterEach(async () => { + await server.close(); + await fastify.close(); + await db.close(); + await runMigrations(MIGRATIONS_DIR, 'down'); + }); +}); diff --git a/tests/setup.ts b/tests/setup.ts index b4abedb3..95438bc2 100644 --- a/tests/setup.ts +++ b/tests/setup.ts @@ -3,5 +3,6 @@ export default (): void => { process.env.ORDHOOK_NODE_AUTH_TOKEN = 'test'; process.env.ORDHOOK_NODE_RPC_HOST = 'test.chainhooks.com'; process.env.ORDHOOK_NODE_RPC_PORT = '13370'; + process.env.ORDHOOK_INGESTION_MODE = 'default'; process.env.PGDATABASE = 'postgres'; };