Skip to content

Commit

Permalink
feat(cardano-services): implements rabbitmq new interface
Browse files Browse the repository at this point in the history
  • Loading branch information
iccicci committed Jun 16, 2022
1 parent 90d674d commit 4a9de5b
Show file tree
Hide file tree
Showing 16 changed files with 87 additions and 54 deletions.
2 changes: 1 addition & 1 deletion packages/cardano-services/src/Program/loadHttpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const serviceMapFactory = (args: ProgramArgs, logger: Logger, cache: InMemoryCac
logger,
txSubmitProvider:
args.options?.useQueue && args.options?.rabbitmqUrl
? new RabbitMqTxSubmitProvider(args.options.rabbitmqUrl)
? new RabbitMqTxSubmitProvider({ dummyTxId: args.options.dummyTxId, rabbitmqUrl: args.options.rabbitmqUrl })
: ogmiosTxSubmitProvider(urlToConnectionConfig(args.options?.ogmiosUrl))
}),
[ServiceNames.Rewards]: async () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export enum CommonOptionDescriptions {
DummyTxId = 'Dummy txID algorithm',
LoggerMinSeverity = 'Log level',
OgmiosUrl = 'Ogmios URL',
RabbitMQUrl = 'RabbitMQ URL'
Expand Down
4 changes: 4 additions & 0 deletions packages/cardano-services/src/ProgramsCommon/defaults.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { createConnectionObject } from '@cardano-sdk/ogmios';

export const DUMMY_TX_ID_DEFAULT = false;

export const OGMIOS_URL_DEFAULT = (() => {
const connection = createConnectionObject();
return connection.address.webSocket;
})();

export const RABBITMQ_URL_DEFAULT = 'amqp://localhost:5672';

export const USE_QUEUE_DEFAULT = false;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { CustomError } from 'ts-custom-error';
import { Programs } from '../programs';

export class WrongOption extends CustomError {
public constructor(program: Programs, option: string, expected: string[]) {
super();
this.message = `${program} requires a valid ${option} program option. Expected: ${expected.join(', ')}`;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './WrongOption';
2 changes: 2 additions & 0 deletions packages/cardano-services/src/ProgramsCommon/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './CommonOptionDescriptions';
export * from './defaults';
export * from './errors';
export * from './options';
export * from './programs';
1 change: 1 addition & 0 deletions packages/cardano-services/src/ProgramsCommon/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { LogLevel } from 'bunyan';
* - RabbitMQ worker
*/
export interface CommonProgramOptions {
dummyTxId?: boolean;
loggerMinSeverity?: LogLevel;
ogmiosUrl?: URL;
rabbitmqUrl?: URL;
Expand Down
7 changes: 7 additions & 0 deletions packages/cardano-services/src/ProgramsCommon/programs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* cardano-services programs
*/
export enum Programs {
HttpServer = 'HTTP server',
RabbitmqWorker = 'RabbitMQ worker'
}

This file was deleted.

1 change: 0 additions & 1 deletion packages/cardano-services/src/TxWorker/errors/index.ts

This file was deleted.

1 change: 0 additions & 1 deletion packages/cardano-services/src/TxWorker/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export * from './defaults';
export * from './errors';
export * from './loadTxWorker';
export * from './TxWorkerOptionDescriptions';
38 changes: 26 additions & 12 deletions packages/cardano-services/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import {
} from './Program';
import { CACHE_TTL_DEFAULT } from './InMemoryCache';
import { Command } from 'commander';
import { CommonOptionDescriptions } from './ProgramsCommon';
import {
CommonOptionDescriptions,
DUMMY_TX_ID_DEFAULT,
Programs,
USE_QUEUE_DEFAULT,
WrongOption
} from './ProgramsCommon';
import { DB_POLL_INTERVAL_DEFAULT } from './NetworkInfo';
import { InvalidLoggerLevel } from './errors';
import {
Expand All @@ -19,7 +25,6 @@ import {
POLLING_CYCLE_DEFAULT,
TxWorkerOptionDescriptions,
TxWorkerOptions,
WrongProgramOption,
loadTxWorker
} from './TxWorker';
import { URL } from 'url';
Expand All @@ -41,8 +46,21 @@ clear();
// eslint-disable-next-line no-console
console.log('Cardano Services CLI');

const commonOptions = (command: Command) =>
const stringToBoolean = (value: string, program: Programs, option: string) => {
// for compatibility: accepting same values as envalid in startWorker.ts
if (['0', 'f', 'false'].includes(value)) return false;
if (['1', 't', 'true'].includes(value)) return true;
throw new WrongOption(program, option, ['false', 'true']);
};

const commonOptions = (command: Command, program: Programs) =>
command
.option(
'--dummy-tx-id [dummyTxId]',
CommonOptionDescriptions.DummyTxId,
(dummyTxId) => stringToBoolean(dummyTxId, program, CommonOptionDescriptions.DummyTxId),
DUMMY_TX_ID_DEFAULT
)
.option(
'--logger-min-severity <level>',
CommonOptionDescriptions.LoggerMinSeverity,
Expand Down Expand Up @@ -75,7 +93,8 @@ commonOptions(
program
.command('start-server')
.description('Start the HTTP server')
.argument('<serviceNames...>', `List of services to attach: ${Object.values(ServiceNames).toString()}`)
.argument('<serviceNames...>', `List of services to attach: ${Object.values(ServiceNames).toString()}`),
Programs.HttpServer
)
.option('--api-url <apiUrl>', ProgramOptionDescriptions.ApiUrl, (url) => new URL(url), new URL(API_URL_DEFAULT))
.option('--enable-metrics <metricsEnabled>', ProgramOptionDescriptions.MetricsEnabled, false)
Expand All @@ -95,7 +114,7 @@ commonOptions(
(interval) => Number.parseInt(interval, 10),
DB_POLL_INTERVAL_DEFAULT
)
.option('--use-queue', ProgramOptionDescriptions.UseQueue, () => true, false)
.option('--use-queue', ProgramOptionDescriptions.UseQueue, () => true, USE_QUEUE_DEFAULT)
.action(async (serviceNames: ServiceNames[], options: { apiUrl: URL } & HttpServerOptions) => {
const { apiUrl, ...rest } = options;
const server = await loadHttpServer({ apiUrl: apiUrl || API_URL_DEFAULT, options: rest, serviceNames });
Expand All @@ -107,16 +126,11 @@ commonOptions(
});
});

commonOptions(program.command('start-worker').description('Start RabbitMQ worker'))
commonOptions(program.command('start-worker').description('Start RabbitMQ worker'), Programs.RabbitmqWorker)
.option(
'--parallel [parallel]',
TxWorkerOptionDescriptions.Parallel,
(parallel) => {
// for compatibility: accepting same values as envalid in startWorker.ts
if (['0', 'f', 'false'].includes(parallel)) return false;
if (['1', 't', 'true'].includes(parallel)) return true;
throw new WrongProgramOption(ServiceNames.TxSubmit, TxWorkerOptionDescriptions.Parallel, ['false', 'true']);
},
(parallel) => stringToBoolean(parallel, Programs.RabbitmqWorker, TxWorkerOptionDescriptions.Parallel),
PARALLEL_MODE_DEFAULT
)
.option(
Expand Down
6 changes: 5 additions & 1 deletion packages/cardano-services/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as envalid from 'envalid';
import { API_URL_DEFAULT, OGMIOS_URL_DEFAULT, RABBITMQ_URL_DEFAULT, ServiceNames, loadHttpServer } from './Program';
import { CACHE_TTL_DEFAULT } from './InMemoryCache';
import { DB_POLL_INTERVAL_DEFAULT } from './NetworkInfo';
import { DUMMY_TX_ID_DEFAULT, USE_QUEUE_DEFAULT } from './ProgramsCommon';
import { LogLevel } from 'bunyan';
import { URL } from 'url';
import { cacheTtlValidator } from './util/validators';
Expand All @@ -16,17 +17,19 @@ const envSpecs = {
DB_CONNECTION_STRING: envalid.str({ default: undefined }),
DB_POLL_INTERVAL: envalid.num({ default: DB_POLL_INTERVAL_DEFAULT }),
DB_QUERIES_CACHE_TTL: envalid.makeValidator(cacheTtlValidator)(envalid.num({ default: CACHE_TTL_DEFAULT })),
DUMMY_TX_ID: envalid.bool({ default: DUMMY_TX_ID_DEFAULT }),
LOGGER_MIN_SEVERITY: envalid.str({ choices: loggerMethodNames as string[], default: 'info' }),
OGMIOS_URL: envalid.url({ default: OGMIOS_URL_DEFAULT }),
RABBITMQ_URL: envalid.url({ default: RABBITMQ_URL_DEFAULT }),
SERVICE_NAMES: envalid.str({ example: Object.values(ServiceNames).toString() }),
USE_QUEUE: envalid.bool({ default: false })
USE_QUEUE: envalid.bool({ default: USE_QUEUE_DEFAULT })
};

void (async () => {
config();
const env = envalid.cleanEnv(process.env, envSpecs);
const apiUrl = new URL(env.API_URL);
const dummyTxId = env.DUMMY_TX_ID;
const ogmiosUrl = new URL(env.OGMIOS_URL);
const rabbitmqUrl = new URL(env.RABBITMQ_URL);
const cardanoNodeConfigPath = env.CARDANO_NODE_CONFIG_PATH;
Expand All @@ -43,6 +46,7 @@ void (async () => {
dbConnectionString,
dbPollInterval,
dbQueriesCacheTtl,
dummyTxId,
loggerMinSeverity: env.LOGGER_MIN_SEVERITY as LogLevel,
ogmiosUrl,
rabbitmqUrl,
Expand Down
3 changes: 3 additions & 0 deletions packages/cardano-services/src/startWorker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env node
import * as envalid from 'envalid';
import { DUMMY_TX_ID_DEFAULT } from './ProgramsCommon';
import { LogLevel } from 'bunyan';
import {
OGMIOS_URL_DEFAULT,
Expand All @@ -15,6 +16,7 @@ import { loggerMethodNames } from './util';
import onDeath from 'death';

const envSpecs = {
DUMMY_TX_ID: envalid.bool({ default: DUMMY_TX_ID_DEFAULT }),
LOGGER_MIN_SEVERITY: envalid.str({ choices: loggerMethodNames as string[], default: 'info' }),
OGMIOS_URL: envalid.url({ default: OGMIOS_URL_DEFAULT }),
PARALLEL: envalid.bool({ default: PARALLEL_MODE_DEFAULT }),
Expand All @@ -30,6 +32,7 @@ void (async () => {
try {
const worker = await loadTxWorker({
options: {
dummyTxId: env.DUMMY_TX_ID,
loggerMinSeverity: env.LOGGER_MIN_SEVERITY as LogLevel,
ogmiosUrl: new URL(env.OGMIOS_URL),
parallel: env.PARALLEL,
Expand Down
53 changes: 26 additions & 27 deletions packages/cardano-services/test/entrypoints.txWorker.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable sonarjs/no-duplicate-string */
import { BAD_CONNECTION_URL, enqueueFakeTx, removeAllMessagesFromQueue } from '../../rabbitmq/test/utils';
import { BAD_CONNECTION_URL, enqueueFakeTx, removeAllQueues } from '../../rabbitmq/test/utils';
import { ChildProcess, fork } from 'child_process';
import { createConnectionObject } from '@cardano-ogmios/client';
import { createHealthyMockOgmiosServer, ogmiosServerReady } from './util';
Expand Down Expand Up @@ -45,55 +45,48 @@ describe('tx-worker entrypoints', () => {
ogmiosServer = createHealthyMockOgmiosServer(() => hook());
await listenPromise(ogmiosServer, { port });
await ogmiosServerReady(ogmiosConnection);
commonArgs = ['start-worker', '--logger-min-severity', 'error', '--ogmios-url', ogmiosConnection.address.webSocket];
commonEnv = { LOGGER_MIN_SEVERITY: 'error', OGMIOS_URL: ogmiosConnection.address.webSocket };
commonArgs = [
'start-worker',
'--dummy-tx-id',
'--logger-min-severity',
'error',
'--ogmios-url',
ogmiosConnection.address.webSocket
];
commonEnv = { DUMMY_TX_ID: 'true', LOGGER_MIN_SEVERITY: 'debug', OGMIOS_URL: ogmiosConnection.address.webSocket };
});

afterAll(async () => await serverClosePromise(ogmiosServer));

beforeEach(async () => {
await removeAllMessagesFromQueue();
await enqueueFakeTx();
await enqueueFakeTx();
await removeAllQueues();
hookLogs = [];
loggerHookCounter = 0;
});

afterEach((done) => {
resetHook();
if (proc?.kill()) proc.on('close', done);
if (proc?.kill()) proc.on('close', () => done());
else done();
});

// Tests without any assertion fail if they get timeout
// eslint-disable-next-line sonarjs/cognitive-complexity
describe('with a working RabbitMQ server', () => {
describe('worker starts', () => {
it('cli:start-worker', (done) => {
describe('transaction are actually submitted', () => {
it('cli:start-worker submits transactions', async () => {
hookPromise = new Promise((resolve) => (hook = resolve));
proc = fork(exePath('cli'), commonArgs, { stdio: 'pipe' });
proc.stdout!.on('data', (data) => (data.toString().match('RabbitMQ transactions worker') ? done() : null));
await Promise.all([hookPromise, enqueueFakeTx()]);
});

it('startWorker', (done) => {
hook = done;
it('startWorker submits transactions', async () => {
hookPromise = new Promise((resolve) => (hook = resolve));
proc = fork(exePath('startWorker'), { env: commonEnv, stdio: 'pipe' });
await Promise.all([hookPromise, enqueueFakeTx()]);
});
});

describe('transaction are actually submitted', () => {
it('cli:start-worker submits transactions', () =>
new Promise<void>(async (resolve) => {
hook = resolve;
proc = fork(exePath('cli'), commonArgs, { stdio: 'pipe' });
}));

it('startWorker submits transactions', () =>
new Promise<void>(async (resolve) => {
hook = resolve;
proc = fork(exePath('startWorker'), { env: commonEnv, stdio: 'pipe' });
}));
});

describe('parallel option', () => {
describe('without parallel option', () => {
it('cli:start-worker starts in serial mode', (done) => {
Expand All @@ -104,7 +97,9 @@ describe('tx-worker entrypoints', () => {
it('startWorker starts in serial mode', async () => {
[hook, hookPromise] = loggerHook();
proc = fork(exePath('startWorker'), { env: commonEnv, stdio: 'pipe' });
const txPromises = [enqueueFakeTx([1, 2, 3]), enqueueFakeTx([4, 5, 6])];
await hookPromise;
await Promise.all(txPromises);
expect(hookLogs).toEqual(['Processing tx 1', 'Processed tx 1', 'Processing tx 2', 'Processed tx 2']);
});
});
Expand All @@ -114,7 +109,7 @@ describe('tx-worker entrypoints', () => {
expect.assertions(2);
proc = fork(exePath('cli'), [...commonArgs, '--parallel', 'test'], { stdio: 'pipe' });
proc.stderr!.on('data', (data) =>
expect(data.toString()).toMatch('tx-submit requires a valid Parallel mode')
expect(data.toString()).toMatch('RabbitMQ worker requires a valid Parallel mode')
);
proc.on('exit', (code) => {
expect(code).toBe(1);
Expand All @@ -141,7 +136,9 @@ describe('tx-worker entrypoints', () => {
it('startWorker starts in serial mode', async () => {
[hook, hookPromise] = loggerHook();
proc = fork(exePath('startWorker'), { env: { ...commonEnv, PARALLEL: 'false' }, stdio: 'pipe' });
const txPromises = [enqueueFakeTx([1, 2, 3]), enqueueFakeTx([4, 5, 6])];
await hookPromise;
await Promise.all(txPromises);
expect(hookLogs).toEqual(['Processing tx 1', 'Processed tx 1', 'Processing tx 2', 'Processed tx 2']);
});
});
Expand All @@ -155,7 +152,9 @@ describe('tx-worker entrypoints', () => {
it('startWorker starts in parallel mode', async () => {
[hook, hookPromise] = loggerHook();
proc = fork(exePath('startWorker'), { env: { ...commonEnv, PARALLEL: 'true' }, stdio: 'pipe' });
const txPromises = [enqueueFakeTx([1, 2, 3]), enqueueFakeTx([4, 5, 6])];
await hookPromise;
await Promise.all(txPromises);
expect(hookLogs).toEqual(['Processing tx 1', 'Processing tx 2', 'Processed tx 1', 'Processed tx 2']);
});
});
Expand Down
2 changes: 1 addition & 1 deletion packages/cardano-services/test/load/load.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ interface TestOptions {
interface TestReport extends TestOptions {
timeBeforeSubmitTxs: number;
timeAfterWorkerStarted: number;
timeAfterTxsInMempool: number; // TODO: will work after https://input-output.atlassian.net/browse/ADP-1823
timeAfterTxsInMempool: number;
timeAfterTxsInBlockchain: number;
}

Expand Down

0 comments on commit 4a9de5b

Please sign in to comment.