diff --git a/packages/cardano-services/src/Program/loadHttpServer.ts b/packages/cardano-services/src/Program/loadHttpServer.ts index 787fe75a935..5b5c50fa554 100644 --- a/packages/cardano-services/src/Program/loadHttpServer.ts +++ b/packages/cardano-services/src/Program/loadHttpServer.ts @@ -81,7 +81,7 @@ const serviceMapFactory = (args: ProgramArgs, logger: Logger, cache: InMemoryCac logger, txSubmitProvider: args.options?.useQueue && args.options?.rabbitmqUrl - ? new RabbitMqTxSubmitProvider(args.options.rabbitmqUrl) + ? new RabbitMqTxSubmitProvider({ dummyTxId: args.options.dummyTxId, rabbitmqUrl: args.options.rabbitmqUrl }) : ogmiosTxSubmitProvider(urlToConnectionConfig(args.options?.ogmiosUrl)) }), [ServiceNames.Rewards]: async () => { diff --git a/packages/cardano-services/src/ProgramsCommon/CommonOptionDescriptions.ts b/packages/cardano-services/src/ProgramsCommon/CommonOptionDescriptions.ts index 33dbc11e72b..4a3d0ebd9bb 100644 --- a/packages/cardano-services/src/ProgramsCommon/CommonOptionDescriptions.ts +++ b/packages/cardano-services/src/ProgramsCommon/CommonOptionDescriptions.ts @@ -1,4 +1,5 @@ export enum CommonOptionDescriptions { + DummyTxId = 'Dummy txID algorithm', LoggerMinSeverity = 'Log level', OgmiosUrl = 'Ogmios URL', RabbitMQUrl = 'RabbitMQ URL' diff --git a/packages/cardano-services/src/ProgramsCommon/defaults.ts b/packages/cardano-services/src/ProgramsCommon/defaults.ts index 032f86d6ffb..afb6bbbbf20 100644 --- a/packages/cardano-services/src/ProgramsCommon/defaults.ts +++ b/packages/cardano-services/src/ProgramsCommon/defaults.ts @@ -1,8 +1,12 @@ import { createConnectionObject } from '@cardano-sdk/ogmios'; +export const DUMMY_TX_ID_DEFAULT = false; + export const OGMIOS_URL_DEFAULT = (() => { const connection = createConnectionObject(); return connection.address.webSocket; })(); export const RABBITMQ_URL_DEFAULT = 'amqp://localhost:5672'; + +export const USE_QUEUE_DEFAULT = false; diff --git a/packages/cardano-services/src/ProgramsCommon/errors/WrongOption.ts b/packages/cardano-services/src/ProgramsCommon/errors/WrongOption.ts new file mode 100644 index 00000000000..bdb84468434 --- /dev/null +++ b/packages/cardano-services/src/ProgramsCommon/errors/WrongOption.ts @@ -0,0 +1,9 @@ +import { CustomError } from 'ts-custom-error'; +import { Programs } from '../programs'; + +export class WrongOption extends CustomError { + public constructor(program: Programs, option: string, expected: string[]) { + super(); + this.message = `${program} requires a valid ${option} program option. Expected: ${expected.join(', ')}`; + } +} diff --git a/packages/cardano-services/src/ProgramsCommon/errors/index.ts b/packages/cardano-services/src/ProgramsCommon/errors/index.ts new file mode 100644 index 00000000000..e959285fefc --- /dev/null +++ b/packages/cardano-services/src/ProgramsCommon/errors/index.ts @@ -0,0 +1 @@ +export * from './WrongOption'; diff --git a/packages/cardano-services/src/ProgramsCommon/index.ts b/packages/cardano-services/src/ProgramsCommon/index.ts index 88e7f0fcf14..265645b31e5 100644 --- a/packages/cardano-services/src/ProgramsCommon/index.ts +++ b/packages/cardano-services/src/ProgramsCommon/index.ts @@ -1,3 +1,5 @@ export * from './CommonOptionDescriptions'; export * from './defaults'; +export * from './errors'; export * from './options'; +export * from './programs'; diff --git a/packages/cardano-services/src/ProgramsCommon/options.ts b/packages/cardano-services/src/ProgramsCommon/options.ts index 84f9302ca20..575bf29522b 100644 --- a/packages/cardano-services/src/ProgramsCommon/options.ts +++ b/packages/cardano-services/src/ProgramsCommon/options.ts @@ -6,6 +6,7 @@ import { LogLevel } from 'bunyan'; * - RabbitMQ worker */ export interface CommonProgramOptions { + dummyTxId?: boolean; loggerMinSeverity?: LogLevel; ogmiosUrl?: URL; rabbitmqUrl?: URL; diff --git a/packages/cardano-services/src/ProgramsCommon/programs.ts b/packages/cardano-services/src/ProgramsCommon/programs.ts new file mode 100644 index 00000000000..f21ea298cc1 --- /dev/null +++ b/packages/cardano-services/src/ProgramsCommon/programs.ts @@ -0,0 +1,7 @@ +/** + * cardano-services programs + */ +export enum Programs { + HttpServer = 'HTTP server', + RabbitmqWorker = 'RabbitMQ worker' +} diff --git a/packages/cardano-services/src/TxWorker/errors/WrongTxWorkerOption.ts b/packages/cardano-services/src/TxWorker/errors/WrongTxWorkerOption.ts deleted file mode 100644 index 192d9ee48e9..00000000000 --- a/packages/cardano-services/src/TxWorker/errors/WrongTxWorkerOption.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { CustomError } from 'ts-custom-error'; -import { ServiceNames } from '../../Program'; -import { TxWorkerOptionDescriptions } from '../TxWorkerOptionDescriptions'; - -export class WrongProgramOption extends CustomError { - public constructor(service: ServiceNames, option: TxWorkerOptionDescriptions, expected: string[]) { - super(); - this.message = `${service} requires a valid ${option} program option. Expected: ${expected.join(', ')}`; - } -} diff --git a/packages/cardano-services/src/TxWorker/errors/index.ts b/packages/cardano-services/src/TxWorker/errors/index.ts deleted file mode 100644 index b49e5286e82..00000000000 --- a/packages/cardano-services/src/TxWorker/errors/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './WrongTxWorkerOption'; diff --git a/packages/cardano-services/src/TxWorker/index.ts b/packages/cardano-services/src/TxWorker/index.ts index 68664e767c4..c063b356c69 100644 --- a/packages/cardano-services/src/TxWorker/index.ts +++ b/packages/cardano-services/src/TxWorker/index.ts @@ -1,4 +1,3 @@ export * from './defaults'; -export * from './errors'; export * from './loadTxWorker'; export * from './TxWorkerOptionDescriptions'; diff --git a/packages/cardano-services/src/cli.ts b/packages/cardano-services/src/cli.ts index 183769e60be..d0a9c9ad7aa 100755 --- a/packages/cardano-services/src/cli.ts +++ b/packages/cardano-services/src/cli.ts @@ -10,7 +10,13 @@ import { } from './Program'; import { CACHE_TTL_DEFAULT } from './InMemoryCache'; import { Command } from 'commander'; -import { CommonOptionDescriptions } from './ProgramsCommon'; +import { + CommonOptionDescriptions, + DUMMY_TX_ID_DEFAULT, + Programs, + USE_QUEUE_DEFAULT, + WrongOption +} from './ProgramsCommon'; import { DB_POLL_INTERVAL_DEFAULT } from './NetworkInfo'; import { InvalidLoggerLevel } from './errors'; import { @@ -19,7 +25,6 @@ import { POLLING_CYCLE_DEFAULT, TxWorkerOptionDescriptions, TxWorkerOptions, - WrongProgramOption, loadTxWorker } from './TxWorker'; import { URL } from 'url'; @@ -41,8 +46,21 @@ clear(); // eslint-disable-next-line no-console console.log('Cardano Services CLI'); -const commonOptions = (command: Command) => +const stringToBoolean = (value: string, program: Programs, option: string) => { + // for compatibility: accepting same values as envalid in startWorker.ts + if (['0', 'f', 'false'].includes(value)) return false; + if (['1', 't', 'true'].includes(value)) return true; + throw new WrongOption(program, option, ['false', 'true']); +}; + +const commonOptions = (command: Command, program: Programs) => command + .option( + '--dummy-tx-id [dummyTxId]', + CommonOptionDescriptions.DummyTxId, + (dummyTxId) => stringToBoolean(dummyTxId, program, CommonOptionDescriptions.DummyTxId), + DUMMY_TX_ID_DEFAULT + ) .option( '--logger-min-severity ', CommonOptionDescriptions.LoggerMinSeverity, @@ -75,7 +93,8 @@ commonOptions( program .command('start-server') .description('Start the HTTP server') - .argument('', `List of services to attach: ${Object.values(ServiceNames).toString()}`) + .argument('', `List of services to attach: ${Object.values(ServiceNames).toString()}`), + Programs.HttpServer ) .option('--api-url ', ProgramOptionDescriptions.ApiUrl, (url) => new URL(url), new URL(API_URL_DEFAULT)) .option('--enable-metrics ', ProgramOptionDescriptions.MetricsEnabled, false) @@ -95,7 +114,7 @@ commonOptions( (interval) => Number.parseInt(interval, 10), DB_POLL_INTERVAL_DEFAULT ) - .option('--use-queue', ProgramOptionDescriptions.UseQueue, () => true, false) + .option('--use-queue', ProgramOptionDescriptions.UseQueue, () => true, USE_QUEUE_DEFAULT) .action(async (serviceNames: ServiceNames[], options: { apiUrl: URL } & HttpServerOptions) => { const { apiUrl, ...rest } = options; const server = await loadHttpServer({ apiUrl: apiUrl || API_URL_DEFAULT, options: rest, serviceNames }); @@ -107,16 +126,11 @@ commonOptions( }); }); -commonOptions(program.command('start-worker').description('Start RabbitMQ worker')) +commonOptions(program.command('start-worker').description('Start RabbitMQ worker'), Programs.RabbitmqWorker) .option( '--parallel [parallel]', TxWorkerOptionDescriptions.Parallel, - (parallel) => { - // for compatibility: accepting same values as envalid in startWorker.ts - if (['0', 'f', 'false'].includes(parallel)) return false; - if (['1', 't', 'true'].includes(parallel)) return true; - throw new WrongProgramOption(ServiceNames.TxSubmit, TxWorkerOptionDescriptions.Parallel, ['false', 'true']); - }, + (parallel) => stringToBoolean(parallel, Programs.RabbitmqWorker, TxWorkerOptionDescriptions.Parallel), PARALLEL_MODE_DEFAULT ) .option( diff --git a/packages/cardano-services/src/run.ts b/packages/cardano-services/src/run.ts index fd6c60bf203..10395c20fcf 100755 --- a/packages/cardano-services/src/run.ts +++ b/packages/cardano-services/src/run.ts @@ -3,6 +3,7 @@ import * as envalid from 'envalid'; import { API_URL_DEFAULT, OGMIOS_URL_DEFAULT, RABBITMQ_URL_DEFAULT, ServiceNames, loadHttpServer } from './Program'; import { CACHE_TTL_DEFAULT } from './InMemoryCache'; import { DB_POLL_INTERVAL_DEFAULT } from './NetworkInfo'; +import { DUMMY_TX_ID_DEFAULT, USE_QUEUE_DEFAULT } from './ProgramsCommon'; import { LogLevel } from 'bunyan'; import { URL } from 'url'; import { cacheTtlValidator } from './util/validators'; @@ -16,17 +17,19 @@ const envSpecs = { DB_CONNECTION_STRING: envalid.str({ default: undefined }), DB_POLL_INTERVAL: envalid.num({ default: DB_POLL_INTERVAL_DEFAULT }), DB_QUERIES_CACHE_TTL: envalid.makeValidator(cacheTtlValidator)(envalid.num({ default: CACHE_TTL_DEFAULT })), + DUMMY_TX_ID: envalid.bool({ default: DUMMY_TX_ID_DEFAULT }), LOGGER_MIN_SEVERITY: envalid.str({ choices: loggerMethodNames as string[], default: 'info' }), OGMIOS_URL: envalid.url({ default: OGMIOS_URL_DEFAULT }), RABBITMQ_URL: envalid.url({ default: RABBITMQ_URL_DEFAULT }), SERVICE_NAMES: envalid.str({ example: Object.values(ServiceNames).toString() }), - USE_QUEUE: envalid.bool({ default: false }) + USE_QUEUE: envalid.bool({ default: USE_QUEUE_DEFAULT }) }; void (async () => { config(); const env = envalid.cleanEnv(process.env, envSpecs); const apiUrl = new URL(env.API_URL); + const dummyTxId = env.DUMMY_TX_ID; const ogmiosUrl = new URL(env.OGMIOS_URL); const rabbitmqUrl = new URL(env.RABBITMQ_URL); const cardanoNodeConfigPath = env.CARDANO_NODE_CONFIG_PATH; @@ -43,6 +46,7 @@ void (async () => { dbConnectionString, dbPollInterval, dbQueriesCacheTtl, + dummyTxId, loggerMinSeverity: env.LOGGER_MIN_SEVERITY as LogLevel, ogmiosUrl, rabbitmqUrl, diff --git a/packages/cardano-services/src/startWorker.ts b/packages/cardano-services/src/startWorker.ts index de9d76a1407..71d812a581c 100644 --- a/packages/cardano-services/src/startWorker.ts +++ b/packages/cardano-services/src/startWorker.ts @@ -1,5 +1,6 @@ #!/usr/bin/env node import * as envalid from 'envalid'; +import { DUMMY_TX_ID_DEFAULT } from './ProgramsCommon'; import { LogLevel } from 'bunyan'; import { OGMIOS_URL_DEFAULT, @@ -15,6 +16,7 @@ import { loggerMethodNames } from './util'; import onDeath from 'death'; const envSpecs = { + DUMMY_TX_ID: envalid.bool({ default: DUMMY_TX_ID_DEFAULT }), LOGGER_MIN_SEVERITY: envalid.str({ choices: loggerMethodNames as string[], default: 'info' }), OGMIOS_URL: envalid.url({ default: OGMIOS_URL_DEFAULT }), PARALLEL: envalid.bool({ default: PARALLEL_MODE_DEFAULT }), @@ -30,6 +32,7 @@ void (async () => { try { const worker = await loadTxWorker({ options: { + dummyTxId: env.DUMMY_TX_ID, loggerMinSeverity: env.LOGGER_MIN_SEVERITY as LogLevel, ogmiosUrl: new URL(env.OGMIOS_URL), parallel: env.PARALLEL, diff --git a/packages/cardano-services/test/entrypoints.txWorker.test.ts b/packages/cardano-services/test/entrypoints.txWorker.test.ts index f9bd72831ab..79613f9a0de 100644 --- a/packages/cardano-services/test/entrypoints.txWorker.test.ts +++ b/packages/cardano-services/test/entrypoints.txWorker.test.ts @@ -1,5 +1,5 @@ /* eslint-disable sonarjs/no-duplicate-string */ -import { BAD_CONNECTION_URL, enqueueFakeTx, removeAllMessagesFromQueue } from '../../rabbitmq/test/utils'; +import { BAD_CONNECTION_URL, enqueueFakeTx, removeAllQueues } from '../../rabbitmq/test/utils'; import { ChildProcess, fork } from 'child_process'; import { createConnectionObject } from '@cardano-ogmios/client'; import { createHealthyMockOgmiosServer, ogmiosServerReady } from './util'; @@ -45,55 +45,48 @@ describe('tx-worker entrypoints', () => { ogmiosServer = createHealthyMockOgmiosServer(() => hook()); await listenPromise(ogmiosServer, { port }); await ogmiosServerReady(ogmiosConnection); - commonArgs = ['start-worker', '--logger-min-severity', 'error', '--ogmios-url', ogmiosConnection.address.webSocket]; - commonEnv = { LOGGER_MIN_SEVERITY: 'error', OGMIOS_URL: ogmiosConnection.address.webSocket }; + commonArgs = [ + 'start-worker', + '--dummy-tx-id', + '--logger-min-severity', + 'error', + '--ogmios-url', + ogmiosConnection.address.webSocket + ]; + commonEnv = { DUMMY_TX_ID: 'true', LOGGER_MIN_SEVERITY: 'debug', OGMIOS_URL: ogmiosConnection.address.webSocket }; }); afterAll(async () => await serverClosePromise(ogmiosServer)); beforeEach(async () => { - await removeAllMessagesFromQueue(); - await enqueueFakeTx(); - await enqueueFakeTx(); + await removeAllQueues(); hookLogs = []; loggerHookCounter = 0; }); afterEach((done) => { resetHook(); - if (proc?.kill()) proc.on('close', done); + if (proc?.kill()) proc.on('close', () => done()); else done(); }); // Tests without any assertion fail if they get timeout // eslint-disable-next-line sonarjs/cognitive-complexity describe('with a working RabbitMQ server', () => { - describe('worker starts', () => { - it('cli:start-worker', (done) => { + describe('transaction are actually submitted', () => { + it('cli:start-worker submits transactions', async () => { + hookPromise = new Promise((resolve) => (hook = resolve)); proc = fork(exePath('cli'), commonArgs, { stdio: 'pipe' }); - proc.stdout!.on('data', (data) => (data.toString().match('RabbitMQ transactions worker') ? done() : null)); + await Promise.all([hookPromise, enqueueFakeTx()]); }); - it('startWorker', (done) => { - hook = done; + it('startWorker submits transactions', async () => { + hookPromise = new Promise((resolve) => (hook = resolve)); proc = fork(exePath('startWorker'), { env: commonEnv, stdio: 'pipe' }); + await Promise.all([hookPromise, enqueueFakeTx()]); }); }); - describe('transaction are actually submitted', () => { - it('cli:start-worker submits transactions', () => - new Promise(async (resolve) => { - hook = resolve; - proc = fork(exePath('cli'), commonArgs, { stdio: 'pipe' }); - })); - - it('startWorker submits transactions', () => - new Promise(async (resolve) => { - hook = resolve; - proc = fork(exePath('startWorker'), { env: commonEnv, stdio: 'pipe' }); - })); - }); - describe('parallel option', () => { describe('without parallel option', () => { it('cli:start-worker starts in serial mode', (done) => { @@ -104,7 +97,9 @@ describe('tx-worker entrypoints', () => { it('startWorker starts in serial mode', async () => { [hook, hookPromise] = loggerHook(); proc = fork(exePath('startWorker'), { env: commonEnv, stdio: 'pipe' }); + const txPromises = [enqueueFakeTx([1, 2, 3]), enqueueFakeTx([4, 5, 6])]; await hookPromise; + await Promise.all(txPromises); expect(hookLogs).toEqual(['Processing tx 1', 'Processed tx 1', 'Processing tx 2', 'Processed tx 2']); }); }); @@ -114,7 +109,7 @@ describe('tx-worker entrypoints', () => { expect.assertions(2); proc = fork(exePath('cli'), [...commonArgs, '--parallel', 'test'], { stdio: 'pipe' }); proc.stderr!.on('data', (data) => - expect(data.toString()).toMatch('tx-submit requires a valid Parallel mode') + expect(data.toString()).toMatch('RabbitMQ worker requires a valid Parallel mode') ); proc.on('exit', (code) => { expect(code).toBe(1); @@ -141,7 +136,9 @@ describe('tx-worker entrypoints', () => { it('startWorker starts in serial mode', async () => { [hook, hookPromise] = loggerHook(); proc = fork(exePath('startWorker'), { env: { ...commonEnv, PARALLEL: 'false' }, stdio: 'pipe' }); + const txPromises = [enqueueFakeTx([1, 2, 3]), enqueueFakeTx([4, 5, 6])]; await hookPromise; + await Promise.all(txPromises); expect(hookLogs).toEqual(['Processing tx 1', 'Processed tx 1', 'Processing tx 2', 'Processed tx 2']); }); }); @@ -155,7 +152,9 @@ describe('tx-worker entrypoints', () => { it('startWorker starts in parallel mode', async () => { [hook, hookPromise] = loggerHook(); proc = fork(exePath('startWorker'), { env: { ...commonEnv, PARALLEL: 'true' }, stdio: 'pipe' }); + const txPromises = [enqueueFakeTx([1, 2, 3]), enqueueFakeTx([4, 5, 6])]; await hookPromise; + await Promise.all(txPromises); expect(hookLogs).toEqual(['Processing tx 1', 'Processing tx 2', 'Processed tx 1', 'Processed tx 2']); }); }); diff --git a/packages/cardano-services/test/load/load.test.ts b/packages/cardano-services/test/load/load.test.ts index ee13ed03752..93c82cb52f4 100644 --- a/packages/cardano-services/test/load/load.test.ts +++ b/packages/cardano-services/test/load/load.test.ts @@ -29,7 +29,7 @@ interface TestOptions { interface TestReport extends TestOptions { timeBeforeSubmitTxs: number; timeAfterWorkerStarted: number; - timeAfterTxsInMempool: number; // TODO: will work after https://input-output.atlassian.net/browse/ADP-1823 + timeAfterTxsInMempool: number; timeAfterTxsInBlockchain: number; }