From 90d674d2126f864f75552aec431300a7bb3a0545 Mon Sep 17 00:00:00 2001 From: Daniele Ricci Date: Thu, 16 Jun 2022 08:56:44 +0200 Subject: [PATCH] refactor(rabbitmq): now the package respects all the repository standards --- packages/rabbitmq/package.json | 4 +- packages/rabbitmq/src/TxSubmitWorker.ts | 251 +++++++----- packages/rabbitmq/src/errorsSerialization.ts | 49 +++ packages/rabbitmq/src/index.ts | 1 + .../rabbitmq/src/rabbitmqTxSubmitProvider.ts | 151 ++++++-- packages/rabbitmq/src/utils.ts | 33 ++ packages/rabbitmq/test/TxSubmitWorker.test.ts | 359 +++++++++--------- packages/rabbitmq/test/jest-setup/docker.ts | 4 +- .../test/rabbitmqTxSubmitProvider.test.ts | 65 ++-- packages/rabbitmq/test/utils.ts | 56 ++- yarn.lock | 14 +- 11 files changed, 629 insertions(+), 358 deletions(-) create mode 100644 packages/rabbitmq/src/errorsSerialization.ts create mode 100644 packages/rabbitmq/src/utils.ts diff --git a/packages/rabbitmq/package.json b/packages/rabbitmq/package.json index e9a1610112a..bfe4f0be6db 100644 --- a/packages/rabbitmq/package.json +++ b/packages/rabbitmq/package.json @@ -37,12 +37,14 @@ "@cardano-sdk/ogmios": "0.2.0", "@types/amqplib": "^0.8.2", "get-port-please": "^2.5.0", + "axios": "^0.27.2", "shx": "^0.3.3", "ws": "^8.5.0" }, "dependencies": { + "@cardano-ogmios/schema": "5.1.0", "@cardano-sdk/core": "0.2.0", - "amqplib": "^0.9.0", + "amqplib": "^0.10.0", "ts-log": "^2.2.4" }, "files": [ diff --git a/packages/rabbitmq/src/TxSubmitWorker.ts b/packages/rabbitmq/src/TxSubmitWorker.ts index b6ae1669103..dce01371b14 100644 --- a/packages/rabbitmq/src/TxSubmitWorker.ts +++ b/packages/rabbitmq/src/TxSubmitWorker.ts @@ -1,8 +1,9 @@ /* eslint-disable @typescript-eslint/no-shadow */ import { Channel, Connection, Message, connect } from 'amqplib'; +import { ErrorDescription, serializeError } from './errorsSerialization'; import { Logger, dummyLogger } from 'ts-log'; import { ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core'; -import { TX_SUBMISSION_QUEUE } from './rabbitmqTxSubmitProvider'; +import { TX_SUBMISSION_QUEUE, txBodyToId, waitForPending } from './utils'; const moduleName = 'TxSubmitWorker'; @@ -10,6 +11,11 @@ const moduleName = 'TxSubmitWorker'; * Configuration options parameters for the TxSubmitWorker */ export interface TxSubmitWorkerConfig { + /** + * Use the algorithm to get a dummy Tx Id from a Tx body. Used for tests + */ + dummyTxId?: boolean; + /** * Instructs the worker to process multiple transactions simultaneously. * Default: false (serial mode) @@ -89,25 +95,34 @@ export class TxSubmitWorker { */ #dependencies: TxSubmitWorkerDependencies; - /** - * The function to call to resolve the start method exit Promise - */ - #exitResolver?: () => void; - /** * The internal worker status */ #status: 'connected' | 'connecting' | 'error' | 'idle' = 'idle'; /** - * @param {TxSubmitWorkerConfig} config The configuration options - * @param {TxSubmitWorkerDependencies} dependencies The dependency objects + * @param config The configuration options + * @param dependencies The dependency objects */ constructor(config: TxSubmitWorkerConfig, dependencies: TxSubmitWorkerDependencies) { this.#config = { parallelTxs: 3, pollingCycle: 500, ...config }; this.#dependencies = { logger: dummyLogger, ...dependencies }; } + /** + * The common handler for errors + * + * @param isAsync flag to identify asynchronous errors + * @param err the error itself + */ + private async errorHandler(isAsync: boolean, err: unknown) { + if (err) { + this.logError(err, isAsync); + this.#status = 'error'; + await this.stop(); + } + } + /** * Get the status of the worker * @@ -120,117 +135,107 @@ export class TxSubmitWorker { /** * Starts the worker */ - start() { - return new Promise(async (resolve, reject) => { - const closeHandler = async (isAsync: boolean, err: unknown) => { - if (err) { - this.logError(err, isAsync); - this.#exitResolver = undefined; - this.#status = 'error'; - await this.stop(); - reject(err); - } - }; - - try { - this.#dependencies.logger!.info(`${moduleName} init: checking tx submission provider health status`); + async start() { + try { + this.#dependencies.logger!.info(`${moduleName} init: checking tx submission provider health status`); - const { ok } = await this.#dependencies.txSubmitProvider.healthCheck(); + const { ok } = await this.#dependencies.txSubmitProvider.healthCheck(); - if (!ok) throw new ProviderError(ProviderFailure.Unhealthy); + if (!ok) throw new ProviderError(ProviderFailure.Unhealthy); - this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ connection`); - this.#exitResolver = resolve; - this.#status = 'connecting'; - this.#connection = await connect(this.#config.rabbitmqUrl.toString()); - this.#connection.on('close', (error) => closeHandler(true, error)); + this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ connection`); + this.#status = 'connecting'; + this.#connection = await connect(this.#config.rabbitmqUrl.toString()); + this.#connection.on('close', (error) => this.errorHandler(true, error)); - this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ channel`); - this.#channel = await this.#connection.createChannel(); - this.#channel.on('close', (error) => closeHandler(true, error)); + this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ channel`); + this.#channel = await this.#connection.createChannel(); + this.#channel.on('close', (error) => this.errorHandler(true, error)); - this.#dependencies.logger!.info(`${moduleName} init: ensuring RabbitMQ queue`); - await this.#channel.assertQueue(TX_SUBMISSION_QUEUE); - this.#dependencies.logger!.info(`${moduleName}: init completed`); + this.#dependencies.logger!.info(`${moduleName} init: ensuring RabbitMQ queue`); + await this.#channel.assertQueue(TX_SUBMISSION_QUEUE); + this.#dependencies.logger!.info(`${moduleName}: init completed`); - if (this.#config.parallel) { - this.#dependencies.logger!.info(`${moduleName}: starting parallel mode`); - await this.#channel.prefetch(this.#config.parallelTxs!, true); + if (this.#config.parallel) { + this.#dependencies.logger!.info(`${moduleName}: starting parallel mode`); + await this.#channel.prefetch(this.#config.parallelTxs!, true); - const parallelHandler = (message: Message | null) => (message ? this.submitTx(message) : null); - const { consumerTag } = await this.#channel.consume(TX_SUBMISSION_QUEUE, parallelHandler); + const parallelHandler = (message: Message | null) => (message ? this.submitTx(message) : null); - this.#consumerTag = consumerTag; - this.#status = 'connected'; - } else { - this.#dependencies.logger!.info(`${moduleName}: starting serial mode`); - await this.infiniteLoop(); - } - } catch (error) { - await closeHandler(false, error); + this.#consumerTag = (await this.#channel.consume(TX_SUBMISSION_QUEUE, parallelHandler)).consumerTag; + } else { + this.#dependencies.logger!.info(`${moduleName}: starting serial mode`); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.infiniteLoop(); } - }); + + this.#status = 'connected'; + } catch (error) { + await this.errorHandler(false, error); + if (error instanceof ProviderError) throw error; + throw new ProviderError(ProviderFailure.ConnectionFailure, error); + } } /** - * Stops the worker. Once connection shutdown is completed, - * the Promise returned by the start method is resolved as well + * Stops the worker. */ async stop() { - // This method needs to call this.#exitResolver at the end. - // Since it may be called more than once simultaneously, - // we need to ensure this.#exitResolver is called only once, - // so we immediately store its value in a local variable and we reset it - const exitResolver = this.#exitResolver; - this.#exitResolver = undefined; - - try { - this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ channel`); + this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ channel`); + // In case of parallel worker; first of all cancel the consumer + if (this.#consumerTag) try { - if (this.#consumerTag) { - const consumerTag = this.#consumerTag; - this.#consumerTag = undefined; + // Let's immediately reset this.#consumerTag to be sure the cancel operation is called + // only once even if the this.stop methond is called more than once + const consumerTag = this.#consumerTag; + this.#consumerTag = undefined; - await this.#channel?.cancel(consumerTag); - } + await this.#channel!.cancel(consumerTag); } catch (error) { this.logError(error); } + // In case of serial worker; just instruct the infinite loop it can exit + else this.#continueForever = false; - this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ connection`); + // Wait for pending operations before closing + await waitForPending(this.#channel); - try { - await this.#connection?.close(); - } catch (error) { - this.logError(error); - } + try { + await this.#channel?.close(); + } catch (error) { + this.logError(error); + } - this.#dependencies.logger!.info(`${moduleName}: shutdown completed`); - this.#channel = undefined; - this.#connection = undefined; - this.#consumerTag = undefined; - this.#continueForever = false; - this.#status = 'idle'; - } finally { - // Only logging functions could throw an error here... - // Although this is almost impossible, we want to be sure exitResolver is called - exitResolver?.(); + this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ connection`); + + try { + await this.#connection?.close(); + } catch (error) { + this.logError(error); } + + this.#dependencies.logger!.info(`${moduleName}: shutdown completed`); + this.#channel = undefined; + this.#connection = undefined; + this.#status = 'idle'; } /** * Wrapper to log errors from try/catch blocks * - * @param {any} error the error to log + * @param error the error to log + * @param isAsync flag to set in case the error is asynchronous + * @param asWarning flag to log the error with warning loglevel */ - private logError(error: unknown, isAsync = false) { + private logError(error: unknown, isAsync = false, asWarning = false) { const errorMessage = // eslint-disable-next-line prettier/prettier error instanceof Error ? error.message : (typeof error === 'string' ? error : JSON.stringify(error)); const errorObject = { error: error instanceof Error ? error.name : 'Unknown error', isAsync, module: moduleName }; - this.#dependencies.logger!.error(errorObject, errorMessage); + if (asWarning) this.#dependencies.logger!.warn(errorObject, errorMessage); + else this.#dependencies.logger!.error(errorObject, errorMessage); if (error instanceof Error) this.#dependencies.logger!.debug(`${moduleName}:`, error.stack); } @@ -239,7 +244,6 @@ export class TxSubmitWorker { */ private async infiniteLoop() { this.#continueForever = true; - this.#status = 'connected'; while (this.#continueForever) { const message = await this.#channel?.get(TX_SUBMISSION_QUEUE); @@ -254,29 +258,80 @@ export class TxSubmitWorker { /** * Submit a tx to the provider and ack (or nack) the message * - * @param {Message} message the message containing the transaction + * @param message the message containing the transaction */ private async submitTx(message: Message) { + const counter = ++this.#counter; + let errorDescription: ErrorDescription | null = null; + let txId = ''; + try { - const counter = ++this.#counter; const { content } = message; + const txBody = new Uint8Array(content); + + // Register the handling of current transaction + txId = txBodyToId(txBody, this.#config.dummyTxId); - this.#dependencies.logger!.info(`${moduleName}: submitting tx`); - this.#dependencies.logger!.debug(`${moduleName}: tx ${counter} dump:`, content.toString('hex')); - await this.#dependencies.txSubmitProvider.submitTx(new Uint8Array(content)); + this.#dependencies.logger!.info(`${moduleName}: submitting tx #${counter} id: ${txId}`); + this.#dependencies.logger!.debug(`${moduleName}: tx #${counter} dump:`, content.toString('hex')); + await this.#dependencies.txSubmitProvider.submitTx(txBody); - this.#dependencies.logger!.debug(`${moduleName}: ACKing RabbitMQ message ${counter}`); + this.#dependencies.logger!.debug(`${moduleName}: ACKing RabbitMQ message #${counter}`); this.#channel?.ack(message); } catch (error) { + errorDescription = await this.submitTxErrorHandler(error, counter, message); + } finally { + if ( + txId && // If there is no error or the error can't be retried + (!errorDescription || !errorDescription.isRetriable) + ) { + // Send the response to the original submitter + try { + // An empty response message means succesful submission + const message = errorDescription || {}; + await this.#channel!.assertQueue(txId); + this.logError(`${moduleName}: sending response for message #${counter}`); + this.#channel!.sendToQueue(txId, Buffer.from(JSON.stringify(message))); + } catch (error) { + this.logError(`${moduleName}: while sending response for message #${counter}`); + this.logError(error); + } + } + } + } + + /** + * The error handler of submitTx method + */ + private async submitTxErrorHandler(err: unknown, counter: number, message: Message) { + let errorDescription: ErrorDescription; + + try { + errorDescription = serializeError(err); + } catch (error) { + this.#dependencies.logger!.error(`${moduleName}: serializing tx #${counter} error`); this.logError(error); - try { - this.#dependencies.logger!.info(`${moduleName}: NACKing RabbitMQ message`); - this.#channel?.nack(message); - // eslint-disable-next-line no-catch-shadow - } catch (error) { - this.logError(error); - } + errorDescription = { message: (err as { message: string }).message || JSON.stringify(err), type: 'unknown' }; } + + if (errorDescription.isRetriable) this.#dependencies.logger!.warn(`${moduleName}: submitting tx #${counter}`); + else this.#dependencies.logger!.error(`${moduleName}: submitting tx #${counter}`); + this.logError(err, false, errorDescription.isRetriable); + + const action = `${errorDescription.isRetriable ? 'N' : ''}ACKing RabbitMQ message #${counter}`; + + try { + this.#dependencies.logger!.info(`${moduleName}: ${action}`); + // In RabbitMQ languange, NACKing a message means to ask to retry for it + // We NACK only those messages which had an error which can be retried + if (errorDescription.isRetriable) this.#channel?.nack(message); + else this.#channel?.ack(message); + } catch (error) { + this.logError(`${moduleName}: while ${action}`); + this.logError(error); + } + + return errorDescription; } } diff --git a/packages/rabbitmq/src/errorsSerialization.ts b/packages/rabbitmq/src/errorsSerialization.ts new file mode 100644 index 00000000000..d9983bb1933 --- /dev/null +++ b/packages/rabbitmq/src/errorsSerialization.ts @@ -0,0 +1,49 @@ +import { Cardano } from '@cardano-sdk/core'; +import { OutsideOfValidityInterval } from '@cardano-ogmios/schema'; + +export interface ErrorDetails { + message?: string; +} + +const TxSubmissionErrorsEntries = Object.entries(Cardano.TxSubmissionErrors) as [ + keyof typeof Cardano.TxSubmissionErrors, + unknown +][]; + +export const deserializeErrors: Record unknown> = { + unknown: (details) => new Error(details.message), + ...Object.fromEntries( + TxSubmissionErrorsEntries.map(([key]) => [ + key, + (details: ErrorDetails) => new Cardano.TxSubmissionErrors[key](JSON.parse(details.message!)) + ]) + ) +}; + +export interface ErrorDescription extends ErrorDetails { + isRetriable?: boolean; + isUnknown?: boolean; + type?: keyof typeof deserializeErrors; +} + +export const serializeError = (error: unknown): ErrorDescription => { + if (error instanceof Cardano.TxSubmissionErrors.OutsideOfValidityIntervalError) { + const details = JSON.parse(error.message) as OutsideOfValidityInterval['outsideOfValidityInterval']; + + if (details.interval.invalidBefore && details.currentSlot <= details.interval.invalidBefore) + return { isRetriable: true }; + } + + for (const key in Cardano.TxSubmissionErrors) { + const type = key as keyof typeof Cardano.TxSubmissionErrors; + + if (error instanceof Cardano.TxSubmissionErrors[type]) { + const { message } = error as { message: string }; + + return { message, type }; + } + } + + if (error instanceof Error) return { isUnknown: true, message: error.message, type: 'unknonw' }; + return { isUnknown: true, message: JSON.stringify(error), type: 'unknonw' }; +}; diff --git a/packages/rabbitmq/src/index.ts b/packages/rabbitmq/src/index.ts index 94ce53d21d6..25bede95d26 100644 --- a/packages/rabbitmq/src/index.ts +++ b/packages/rabbitmq/src/index.ts @@ -1,2 +1,3 @@ export * from './TxSubmitWorker'; export * from './rabbitmqTxSubmitProvider'; +export * from './utils'; diff --git a/packages/rabbitmq/src/rabbitmqTxSubmitProvider.ts b/packages/rabbitmq/src/rabbitmqTxSubmitProvider.ts index 71027472c87..08f71b8e8e5 100644 --- a/packages/rabbitmq/src/rabbitmqTxSubmitProvider.ts +++ b/packages/rabbitmq/src/rabbitmqTxSubmitProvider.ts @@ -1,9 +1,36 @@ import { Buffer } from 'buffer'; import { Cardano, HealthCheckResponse, ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core'; import { Channel, Connection, connect } from 'amqplib'; +import { ErrorDescription, deserializeErrors } from './errorsSerialization'; import { Logger, dummyLogger } from 'ts-log'; +import { TX_SUBMISSION_QUEUE, txBodyToId, waitForPending } from './utils'; -export const TX_SUBMISSION_QUEUE = 'cardano-tx-submit'; +const moduleName = 'RabbitMqTxSubmitProvider'; + +/** + * Configuration options parameters for the RabbitMqTxSubmitProvider + */ +export interface RabbitMqTxSubmitProviderConfig { + /** + * Use the algorithm to get a dummy Tx Id from a Tx body. Used for tests + */ + dummyTxId?: boolean; + + /** + * The RabbitMQ connection URL + */ + rabbitmqUrl: URL; +} + +/** + * Dependencies for the RabbitMqTxSubmitProvider + */ +export interface RabbitMqTxSubmitProviderDependencies { + /** + * The logger. Default: silent + */ + logger?: Logger; +} /** * Connect to a [RabbitMQ](https://www.rabbitmq.com/) instance @@ -11,17 +38,25 @@ export const TX_SUBMISSION_QUEUE = 'cardano-tx-submit'; export class RabbitMqTxSubmitProvider implements TxSubmitProvider { #channel?: Channel; #connection?: Connection; - #connectionURL: URL; - #logger: Logger; #queueWasCreated = false; /** - * @param {URL} connectionURL RabbitMQ connection URL - * @param {Logger} logger object implementing the Logger abstract class + * The configuration options + */ + #config: RabbitMqTxSubmitProviderConfig; + + /** + * The dependency objects + */ + #dependencies: RabbitMqTxSubmitProviderDependencies; + + /** + * @param config The configuration options + * @param dependencies The dependency objects */ - constructor(connectionURL: URL, logger: Logger = dummyLogger) { - this.#connectionURL = connectionURL; - this.#logger = logger; + constructor(config: RabbitMqTxSubmitProviderConfig, dependencies: RabbitMqTxSubmitProviderDependencies = {}) { + this.#config = config; + this.#dependencies = { logger: dummyLogger, ...dependencies }; } /** @@ -31,35 +66,43 @@ export class RabbitMqTxSubmitProvider implements TxSubmitProvider { if (this.#connection) return; try { - this.#connection = await connect(this.#connectionURL.toString()); + this.#connection = await connect(this.#config.rabbitmqUrl.toString()); } catch (error) { - await this.close(); + this.#dependencies.logger!.error(`${moduleName}: while connecting`, error); + void this.close(); throw new ProviderError(ProviderFailure.ConnectionFailure, error); } + this.#connection.on('error', (error: unknown) => + this.#dependencies.logger!.error(`${moduleName}: connection error`, error) + ); try { this.#channel = await this.#connection.createChannel(); } catch (error) { - await this.close(); + this.#dependencies.logger!.error(`${moduleName}: while creating channel`, error); + void this.close(); throw new ProviderError(ProviderFailure.ConnectionFailure, error); } + this.#channel.on('error', (error: unknown) => + this.#dependencies.logger!.error(`${moduleName}: channel error`, error) + ); } /** * Idempotently (channel.assertQueue does the job for us) creates the queue * - * @param {boolean} force Forces the creation of the queue just to have a response from the server + * @param force Forces the creation of the queue just to have a response from the server */ async #ensureQueue(force?: boolean) { if (this.#queueWasCreated && !force) return; await this.#connectAndCreateChannel(); - this.#queueWasCreated = true; try { await this.#channel!.assertQueue(TX_SUBMISSION_QUEUE); + this.#queueWasCreated = true; } catch (error) { - await this.close(); + void this.close(); throw new ProviderError(ProviderFailure.ConnectionFailure, error); } } @@ -68,18 +111,21 @@ export class RabbitMqTxSubmitProvider implements TxSubmitProvider { * Closes the connection to RabbitMQ and (for interl purposes) it resets the state as well */ async close() { + // Wait for pending operations before closing + await waitForPending(this.#channel); + try { await this.#channel?.close(); // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (error: any) { - this.#logger.error({ error: error.name, module: 'rabbitmqTxSubmitProvider' }, error.message); + this.#dependencies.logger!.error({ error: error.name, module: moduleName }, error.message); } try { await this.#connection?.close(); // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (error: any) { - this.#logger.error({ error: error.name, module: 'rabbitmqTxSubmitProvider' }, error.message); + this.#dependencies.logger!.error({ error: error.name, module: moduleName }, error.message); } this.#channel = undefined; @@ -99,7 +145,7 @@ export class RabbitMqTxSubmitProvider implements TxSubmitProvider { await this.#ensureQueue(true); ok = true; } catch { - this.#logger.error({ error: 'Connection error', module: 'rabbitmqTxSubmitProvider' }); + this.#dependencies.logger!.error({ error: 'Connection error', module: 'rabbitmqTxSubmitProvider' }); } return { ok }; @@ -108,14 +154,71 @@ export class RabbitMqTxSubmitProvider implements TxSubmitProvider { /** * Submit a transaction to RabbitMQ * - * @param {Uint8Array} signedTransaction The Uint8Array representation of a signedTransaction + * @param signedTransaction The Uint8Array representation of a signedTransaction */ async submitTx(signedTransaction: Uint8Array) { - try { - await this.#ensureQueue(); - this.#channel!.sendToQueue(TX_SUBMISSION_QUEUE, Buffer.from(signedTransaction)); - } catch (error) { - throw Cardano.util.asTxSubmissionError(error) || new Cardano.UnknownTxSubmissionError(error); - } + return new Promise(async (resolve, reject) => { + let txId = ''; + + const done = (error?: unknown) => { + this.#dependencies.logger!.debug(`${moduleName}: ${error ? 'rejecting' : 'resolving'} tx id: ${txId}`); + + if (error) reject(error); + else resolve(); + }; + + try { + txId = txBodyToId(signedTransaction, this.#config.dummyTxId); + this.#dependencies.logger!.info(`${moduleName}: queuing tx id: ${txId}`); + + // Actually send the message + await this.#ensureQueue(); + this.#channel!.sendToQueue(TX_SUBMISSION_QUEUE, Buffer.from(signedTransaction)); + this.#dependencies.logger!.debug(`${moduleName}: queued tx id: ${txId}`); + + // Set the queue for response message + this.#dependencies.logger!.debug(`${moduleName}: creating queue: ${txId}`); + await this.#channel!.assertQueue(txId); + this.#dependencies.logger!.debug(`${moduleName}: created queue: ${txId}`); + + // We noticed that may happens that the response message handler is called before the + // Promise is resolved, that's why we are awaiting for it inside the handler itself + const consumePromise = this.#channel!.consume(txId, async (message) => { + try { + this.#dependencies.logger!.debug(`${moduleName}: got result message from queue: ${txId}`); + + // This should never happen, just handle it for correct logging + if (!message) return done(new Error('null message from result queue')); + + this.#channel!.ack(message); + + const { consumerTag } = await consumePromise; + + this.#dependencies.logger!.debug(`${moduleName}: canceling consumer for queue: ${txId}`); + await this.#channel!.cancel(consumerTag); + this.#dependencies.logger!.debug(`${moduleName}: deleting queue: ${txId}`); + await this.#channel!.deleteQueue(txId); + this.#dependencies.logger!.debug(`${moduleName}: deleted queue: ${txId}`); + + const result = JSON.parse(message.content.toString()) as ErrorDescription; + + // An empty result message means submission ok + if (Object.keys(result).length === 0) return done(); + + const { type } = result; + + done(deserializeErrors[type!](result)); + } catch (error) { + this.#dependencies.logger!.error(`${moduleName}: while handling response message: ${txId}`); + this.#dependencies.logger!.error(error); + done(error); + } + }); + } catch (error) { + this.#dependencies.logger!.error(`${moduleName}: while queuing transaction: ${txId}`); + this.#dependencies.logger!.error(error); + done(Cardano.util.asTxSubmissionError(error) || new Cardano.UnknownTxSubmissionError(error)); + } + }); } } diff --git a/packages/rabbitmq/src/utils.ts b/packages/rabbitmq/src/utils.ts new file mode 100644 index 00000000000..c796518d7e5 --- /dev/null +++ b/packages/rabbitmq/src/utils.ts @@ -0,0 +1,33 @@ +import { CSL, cslToCore } from '@cardano-sdk/core'; + +export const TX_SUBMISSION_QUEUE = 'cardano-tx-submit'; + +export const txBodyToId = (txBody: Buffer | Uint8Array, dummy?: boolean) => { + if (dummy) { + const buffer = txBody instanceof Buffer ? txBody : Buffer.from(txBody); + + return buffer.toString('hex'); + } + + const buffer = txBody instanceof Buffer ? txBody : Buffer.from(txBody); + const txDecoded = CSL.Transaction.from_bytes(buffer); + const txData = cslToCore.newTx(txDecoded); + + return txData.id.toString(); +}; + +// Workaround inspired to https://github.com/amqp-node/amqplib/issues/250#issuecomment-888558719 +// to avoid the error reported on https://github.com/amqp-node/amqplib/issues/692 +export const waitForPending = async (channel: unknown) => { + const check = () => { + const { pending, reply } = channel as { pending: unknown[]; reply: unknown }; + + return pending.length > 0 || reply !== null; + }; + + try { + while (check()) await new Promise((resolve) => setTimeout(resolve, 50)); + } catch { + // If something is wrong in the workaround as well... let's simply go on and close the channel + } +}; diff --git a/packages/rabbitmq/test/TxSubmitWorker.test.ts b/packages/rabbitmq/test/TxSubmitWorker.test.ts index d20f8a865a8..cbc8a94aa70 100644 --- a/packages/rabbitmq/test/TxSubmitWorker.test.ts +++ b/packages/rabbitmq/test/TxSubmitWorker.test.ts @@ -1,30 +1,49 @@ -import { BAD_CONNECTION_URL, GOOD_CONNECTION_URL, enqueueFakeTx, removeAllMessagesFromQueue } from './utils'; -import { TxSubmitProvider } from '@cardano-sdk/core'; -import { TxSubmitWorker } from '../src'; +import { BAD_CONNECTION_URL, GOOD_CONNECTION_URL, enqueueFakeTx, removeAllQueues, testLogger } from './utils'; +import { Cardano, ProviderError, TxSubmitProvider } from '@cardano-sdk/core'; +import { RabbitMqTxSubmitProvider, TxSubmitWorker } from '../src'; import { createMockOgmiosServer, listenPromise, serverClosePromise } from '@cardano-sdk/ogmios/test/mocks/mockOgmiosServer'; -import { dummyLogger } from 'ts-log'; import { getRandomPort } from 'get-port-please'; import { ogmiosTxSubmitProvider, urlToConnectionConfig } from '@cardano-sdk/ogmios'; -import { removeRabbitMQContainer, setupRabbitMQContainer } from './jest-setup/docker'; import http from 'http'; -const logger = dummyLogger; - describe('TxSubmitWorker', () => { - let txSubmitProvider: TxSubmitProvider; + let logger: ReturnType; + let mock: http.Server | undefined; let port: number; + let txSubmitProvider: TxSubmitProvider; + let worker: TxSubmitWorker | undefined; beforeAll(async () => { port = await getRandomPort(); txSubmitProvider = ogmiosTxSubmitProvider(urlToConnectionConfig(new URL(`http://localhost:${port}/`))); }); + beforeEach(async () => { + await removeAllQueues(); + logger = testLogger(); + }); + + afterEach(async () => { + if (mock) { + await serverClosePromise(mock); + mock = undefined; + } + + if (worker) { + await worker.stop(); + worker = undefined; + } + + // Uncomment this to have evidence of all the log messages + // console.log(logger.messages); + }); + it('is safe to call stop method on an idle worker', async () => { - const worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL }, { logger, txSubmitProvider }); + worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL }, { logger, txSubmitProvider }); expect(worker).toBeInstanceOf(TxSubmitWorker); expect(worker.getStatus()).toEqual('idle'); @@ -33,191 +52,149 @@ describe('TxSubmitWorker', () => { }); it('rejects if the TxSubmitProvider is unhealthy on start', async () => { - const worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL }, { logger, txSubmitProvider }); - - const unhealthyMock = createMockOgmiosServer({ + mock = createMockOgmiosServer({ healthCheck: { response: { networkSynchronization: 0.8, success: true } }, submitTx: { response: { success: true } } }); - await listenPromise(unhealthyMock, port); + await listenPromise(mock, port); - try { - const res = await worker.start(); - expect(res).toBeDefined(); - } catch (error) { - expect(error).toBeDefined(); - } + worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL }, { logger, txSubmitProvider }); - await serverClosePromise(unhealthyMock); + await expect(worker.start()).rejects.toBeInstanceOf(ProviderError); }); it('rejects if unable to connect to the RabbitMQ broker', async () => { - const worker = new TxSubmitWorker({ rabbitmqUrl: BAD_CONNECTION_URL }, { logger, txSubmitProvider }); - const healthyMock = createMockOgmiosServer({ + mock = createMockOgmiosServer({ healthCheck: { response: { networkSynchronization: 1, success: true } }, submitTx: { response: { success: true } } }); - await listenPromise(healthyMock, port); + await listenPromise(mock, port); - try { - const res = await worker.start(); - expect(res).toBeDefined(); - } catch (error) { - expect(error).toBeDefined(); - } + worker = new TxSubmitWorker({ rabbitmqUrl: BAD_CONNECTION_URL }, { logger, txSubmitProvider }); - await serverClosePromise(healthyMock); + await expect(worker.start()).rejects.toBeInstanceOf(ProviderError); }); - describe('RabbitMQ connection failure while running', () => { - const CONTAINER_NAME = 'cardano-rabbitmq-local-test'; - - const performTest = async (options: { parallel: boolean }) => { - const healthyMock = createMockOgmiosServer({ - healthCheck: { response: { networkSynchronization: 1, success: true } }, - submitTx: { response: { success: true } } - }); - - await listenPromise(healthyMock, port); - - // Set up a new RabbitMQ container to test TxSubmitWorker on RabbitMQ server shut down - const rabbitmqPort = await getRandomPort(); - await setupRabbitMQContainer(CONTAINER_NAME, rabbitmqPort); - - // Actually create the TxSubmitWorker - const worker = new TxSubmitWorker( - { rabbitmqUrl: new URL(`amqp://localhost:${rabbitmqPort}`), ...options }, - { logger, txSubmitProvider } - ); - const startPromise = worker.start(); - - // Wait until the worker is connected to the RabbitMQ server - await new Promise(async (resolve) => { - // eslint-disable-next-line @typescript-eslint/no-shadow - while (worker.getStatus() === 'connecting') await new Promise((resolve) => setTimeout(resolve, 50)); - resolve(); - }); - - // Stop the RabbitMQ container while the TxSubmitWorker is connected - const removeContainerPromise = removeRabbitMQContainer(CONTAINER_NAME); - const innerMessage = "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"; - const fullMessage = `Connection closed: 320 (CONNECTION-FORCED) with message "${innerMessage}"`; - - // Test the TxSubmitWorker actually exits with error - await expect(startPromise).rejects.toThrow(new Error(fullMessage)); - - // Test teardown - await serverClosePromise(healthyMock); - await removeContainerPromise; - }; - - it('rejects when configured to process jobs serially', async () => await performTest({ parallel: false }), 20_000); - it( - 'rejects when configured to process jobs in parallel', - async () => await performTest({ parallel: true }), - 20_000 - ); - }); + describe('error while tx submission', () => { + describe('tx submission is retried if the error is retiable', () => { + // eslint-disable-next-line unicorn/consistent-function-scoping + const performTest = async (options: { parallel: boolean }) => { + const spy = jest.fn(); + let hookAlreadyCalled = false; + let successMockListenPromise = Promise.resolve(new http.Server()); + let successMock = new http.Server(); + + const failMock = createMockOgmiosServer({ + healthCheck: { response: { networkSynchronization: 1, success: true } }, + submitTx: { response: { failWith: { type: 'beforeValidityInterval' }, success: false } }, + submitTxHook: () => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + (async () => { + if (hookAlreadyCalled) return; + + // This hook may be called multple times... ensure the core is executed only once + hookAlreadyCalled = true; + + // Stop the failing mock and start the succes one + await serverClosePromise(failMock); + successMockListenPromise = listenPromise(successMock, port); + })(); + } + }); + + // Start a failing ogmios server + await listenPromise(failMock, port); + + // Enqueue a tx + const providerClosePromise = enqueueFakeTx(); + + // Actually create the TxSubmitWorker + worker = new TxSubmitWorker( + { dummyTxId: true, rabbitmqUrl: GOOD_CONNECTION_URL, ...options }, + { logger, txSubmitProvider } + ); + await worker.start(); + + await new Promise((resolve) => { + successMock = createMockOgmiosServer({ + healthCheck: { response: { networkSynchronization: 1, success: true } }, + submitTx: { response: { success: true } }, + submitTxHook: () => { + spy(); + // Once the transaction is submitted with success we can stop the worker + // We wait half a second to be sure the tx is submitted only once + setTimeout(() => { + resolve(); + }, 500); + } + }); + }); + + // All these Promises are the return value of async functions called in a not async context, + // we need to await for them to perform a correct test teardown + await Promise.all([successMockListenPromise, providerClosePromise, serverClosePromise(successMock)]); + expect(spy).toBeCalledTimes(1); + }; + + it('when configured to process jobs serially', async () => performTest({ parallel: false })); + it('when configured to process jobs in parallel', async () => performTest({ parallel: true })); + }); - describe('tx submission is retried until success', () => { - // First of all we need to remove from the queue every message sent by previous tests/suites - beforeAll(removeAllMessagesFromQueue); - - // eslint-disable-next-line unicorn/consistent-function-scoping - const performTest = async (options: { parallel: boolean }) => { - const spy = jest.fn(); - let hookAlreadyCalled = false; - let successMockListenPromise = Promise.resolve(new http.Server()); - let stopPromise = Promise.resolve(); - // eslint-disable-next-line prefer-const - let worker: TxSubmitWorker; - - const successMock = createMockOgmiosServer({ - healthCheck: { response: { networkSynchronization: 1, success: true } }, - submitTx: { response: { success: true } }, - submitTxHook: () => { - spy(); - // Once the transaction is submitted with success we can stop the worker - // We wait half a second to be ensure the tx is submitted only once - setTimeout(() => (stopPromise = worker.stop()), 500); - } - }); - - const failMock = createMockOgmiosServer({ - healthCheck: { response: { networkSynchronization: 1, success: true } }, - submitTx: { response: { failWith: { type: 'eraMismatch' }, success: false } }, - submitTxHook: () => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - (async () => { - if (hookAlreadyCalled) return; - - // This hook may be called multple times... ensure the core is executed only once - hookAlreadyCalled = true; - - // Stop the failing mock and start the succes one - await serverClosePromise(failMock); - successMockListenPromise = listenPromise(successMock, port); - })(); - } - }); - - // Start a failing ogmios server - await listenPromise(failMock, port); - - // Enqueue a tx - const providerClosePromise = enqueueFakeTx(); - - // Actually create the TxSubmitWorker - worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL, ...options }, { logger, txSubmitProvider }); - const startPromise = worker.start(); - - await expect(startPromise).resolves.toEqual(undefined); - await serverClosePromise(successMock); - // All these Promises are the return value of async functions called in a not async context, - // we need to await for them to perform a correct test teardown - await Promise.all([stopPromise, successMockListenPromise, providerClosePromise]); - expect(spy).toBeCalledTimes(1); - }; - - it('when configured to process jobs serially', async () => performTest({ parallel: false })); - it('when configured to process jobs in parallel', async () => performTest({ parallel: true })); + describe('the error is propagated to RabbitMqTxSubmitProvider', () => { + // eslint-disable-next-line unicorn/consistent-function-scoping + const performTest = async (options: { parallel: boolean }) => { + mock = createMockOgmiosServer({ + healthCheck: { response: { networkSynchronization: 1, success: true } }, + submitTx: { response: { failWith: { type: 'eraMismatch' }, success: false } } + }); + + // Start the mock + await listenPromise(mock, port); + + // Actually create the TxSubmitWorker + worker = new TxSubmitWorker( + { dummyTxId: true, pollingCycle: 50, rabbitmqUrl: GOOD_CONNECTION_URL, ...options }, + { logger, txSubmitProvider } + ); + await worker.start(); + + // Tx submission by RabbitMqTxSubmitProvider must reject with the same error got by TxSubmitWorker + await expect(enqueueFakeTx([1, 2, 3], logger)).rejects.toBeInstanceOf( + Cardano.TxSubmissionErrors.EraMismatchError + ); + }; + + it('when configured to process jobs serially', async () => performTest({ parallel: false })); + it('when configured to process jobs in parallel', async () => performTest({ parallel: true })); + }); }); it('submission is parallelized up to parallelTx Tx simultaneously', async () => { - // First of all we need to remove from the queue every message sent by previous tests/suites - await removeAllMessagesFromQueue(); - - let stopPromise = Promise.resolve(); - - const loggedMessages: unknown[][] = []; - const testLogger = { - debug: (...args: unknown[]) => loggedMessages.push(args), - error: jest.fn(), - info: jest.fn(), - trace: jest.fn(), - warn: jest.fn() - }; - - const worker = new TxSubmitWorker( - { parallel: true, parallelTxs: 4, rabbitmqUrl: GOOD_CONNECTION_URL }, - { logger: testLogger, txSubmitProvider } - ); - - const mock = createMockOgmiosServer({ + mock = createMockOgmiosServer({ healthCheck: { response: { networkSynchronization: 1, success: true } }, submitTx: { response: { success: true } }, submitTxHook: async (data) => { // Wait 100ms * the first byte of the Tx before sending the result + // eslint-disable-next-line @typescript-eslint/no-shadow await new Promise((resolve) => setTimeout(resolve, 100 * data![0])); - // Exit condition: a Tx with length === 2 - if (data?.length === 2) stopPromise = worker.stop(); } }); await listenPromise(mock, port); + worker = new TxSubmitWorker( + { dummyTxId: true, parallel: true, parallelTxs: 4, rabbitmqUrl: GOOD_CONNECTION_URL }, + { logger, txSubmitProvider } + ); + await worker.start(); + + const rabbitMqTxSubmitProvider = new RabbitMqTxSubmitProvider({ + dummyTxId: true, + rabbitmqUrl: GOOD_CONNECTION_URL + }); + /* * Tx submission plan, time sample: 100ms * 11111 @@ -225,33 +202,41 @@ describe('TxSubmitWorker', () => { * 3555 * 4447777 */ - await enqueueFakeTx([5]); - await enqueueFakeTx([2]); - await enqueueFakeTx([1]); - await enqueueFakeTx([3]); - await enqueueFakeTx([3]); - await enqueueFakeTx([4]); - await enqueueFakeTx([4, 0]); - await expect(worker.start()).resolves.toEqual(undefined); - await Promise.all([stopPromise, serverClosePromise(mock)]); + const promises: Promise[] = []; + const result = [undefined, undefined, undefined, undefined, undefined, undefined, undefined]; + + for (const tx of [[5], [2], [1], [3, 0, 0], [3, 0, 1], [4], [4, 0]]) { + promises.push(rabbitMqTxSubmitProvider.submitTx(new Uint8Array(tx))); + // Wait 10ms to be sure the transactions are enqueued in the right order + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + await expect(Promise.all(promises)).resolves.toEqual(result); + + await rabbitMqTxSubmitProvider.close(); // We check only the relevant messages - expect(loggedMessages.filter(([_]) => typeof _ === 'string' && _.match(/(tx \d dump)|ACKing RabbitMQ/))).toEqual([ - ['TxSubmitWorker: tx 1 dump:', '05'], - ['TxSubmitWorker: tx 2 dump:', '02'], - ['TxSubmitWorker: tx 3 dump:', '01'], - ['TxSubmitWorker: tx 4 dump:', '03'], - ['TxSubmitWorker: ACKing RabbitMQ message 3'], - ['TxSubmitWorker: tx 5 dump:', '03'], - ['TxSubmitWorker: ACKing RabbitMQ message 2'], - ['TxSubmitWorker: tx 6 dump:', '04'], - ['TxSubmitWorker: ACKing RabbitMQ message 4'], - ['TxSubmitWorker: tx 7 dump:', '0400'], - ['TxSubmitWorker: ACKing RabbitMQ message 5'], - ['TxSubmitWorker: ACKing RabbitMQ message 1'], - ['TxSubmitWorker: ACKing RabbitMQ message 6'], - ['TxSubmitWorker: ACKing RabbitMQ message 7'] + expect( + logger.messages + .filter(({ level }) => level === 'debug') + .filter(({ message }) => typeof message[0] === 'string' && message[0].match(/(tx #\d dump)|ACKing RabbitMQ/)) + .map(({ message }) => message) + ).toEqual([ + ['TxSubmitWorker: tx #1 dump:', '05'], + ['TxSubmitWorker: tx #2 dump:', '02'], + ['TxSubmitWorker: tx #3 dump:', '01'], + ['TxSubmitWorker: tx #4 dump:', '030000'], + ['TxSubmitWorker: ACKing RabbitMQ message #3'], + ['TxSubmitWorker: tx #5 dump:', '030001'], + ['TxSubmitWorker: ACKing RabbitMQ message #2'], + ['TxSubmitWorker: tx #6 dump:', '04'], + ['TxSubmitWorker: ACKing RabbitMQ message #4'], + ['TxSubmitWorker: tx #7 dump:', '0400'], + ['TxSubmitWorker: ACKing RabbitMQ message #5'], + ['TxSubmitWorker: ACKing RabbitMQ message #1'], + ['TxSubmitWorker: ACKing RabbitMQ message #6'], + ['TxSubmitWorker: ACKing RabbitMQ message #7'] ]); }); }); diff --git a/packages/rabbitmq/test/jest-setup/docker.ts b/packages/rabbitmq/test/jest-setup/docker.ts index f05c23f441b..ca436c0953e 100644 --- a/packages/rabbitmq/test/jest-setup/docker.ts +++ b/packages/rabbitmq/test/jest-setup/docker.ts @@ -2,7 +2,7 @@ import { connect } from 'amqplib'; import { imageExists, pullImageAsync } from 'dockerode-utils'; import Docker from 'dockerode'; -const CONTAINER_IMAGE = 'rabbitmq:3.8-alpine'; +const CONTAINER_IMAGE = 'rabbitmq:3.10-management'; const CONTAINER_NAME = 'cardano-rabbitmq-test'; export const removeRabbitMQContainer = async (containerName = CONTAINER_NAME) => { @@ -34,7 +34,7 @@ export const setupRabbitMQContainer = async (containerName = CONTAINER_NAME, por await removeRabbitMQContainer(containerName); const container = await docker.createContainer({ - HostConfig: { PortBindings: { '5672/tcp': [{ HostPort: `${port}` }] } }, + HostConfig: { PortBindings: { '5672/tcp': [{ HostPort: `${port}` }], '15672/tcp': [{ HostPort: '15672' }] } }, Image: CONTAINER_IMAGE, name: containerName }); diff --git a/packages/rabbitmq/test/rabbitmqTxSubmitProvider.test.ts b/packages/rabbitmq/test/rabbitmqTxSubmitProvider.test.ts index 40fd97e232d..3ee1bd019d2 100644 --- a/packages/rabbitmq/test/rabbitmqTxSubmitProvider.test.ts +++ b/packages/rabbitmq/test/rabbitmqTxSubmitProvider.test.ts @@ -1,21 +1,35 @@ -import { BAD_CONNECTION_URL, GOOD_CONNECTION_URL } from './utils'; +import { BAD_CONNECTION_URL, GOOD_CONNECTION_URL, removeAllQueues, testLogger } from './utils'; import { ProviderError, TxSubmitProvider } from '@cardano-sdk/core'; -import { RabbitMqTxSubmitProvider } from '../src'; +import { RabbitMqTxSubmitProvider, TxSubmitWorker } from '../src'; describe('RabbitMqTxSubmitProvider', () => { - let provider: TxSubmitProvider; + let logger: ReturnType; + let provider: TxSubmitProvider | undefined; - afterEach(() => provider?.close!()); + beforeEach(async () => { + await removeAllQueues(); + logger = testLogger(); + }); + + afterEach(async () => { + if (provider) { + await provider.close!(); + provider = undefined; + } + + // Uncomment this to have evidence of all the log messages + // console.log(logger.messages); + }); describe('healthCheck', () => { it('is not ok if cannot connect', async () => { - provider = new RabbitMqTxSubmitProvider(BAD_CONNECTION_URL); + provider = new RabbitMqTxSubmitProvider({ dummyTxId: true, rabbitmqUrl: BAD_CONNECTION_URL }); const res = await provider.healthCheck(); expect(res).toEqual({ ok: false }); }); it('is ok if can connect', async () => { - provider = new RabbitMqTxSubmitProvider(GOOD_CONNECTION_URL); + provider = new RabbitMqTxSubmitProvider({ dummyTxId: true, rabbitmqUrl: GOOD_CONNECTION_URL }); const resA = await provider.healthCheck(); // Call again to cover the idemopotent RabbitMqTxSubmitProvider.#connectAndCreateChannel() operation const resB = await provider.healthCheck(); @@ -24,29 +38,36 @@ describe('RabbitMqTxSubmitProvider', () => { }); }); - const performSubmitTxTest = async (connectionURL: URL) => { - try { - provider = new RabbitMqTxSubmitProvider(connectionURL); - const resA = await provider.submitTx(new Uint8Array()); - // Called again to cover the idemopotent RabbitMqTxSubmitProvider.#ensureQueue() operation - const resB = await provider.submitTx(new Uint8Array()); - expect(resA).toBeUndefined(); - expect(resB).toBeUndefined(); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (error: any) { - expect(error.innerError).toBeInstanceOf(ProviderError); - } - }; - describe('submitTx', () => { + // eslint-disable-next-line unicorn/consistent-function-scoping + const performTest = async (rabbitmqUrl: URL) => { + try { + provider = new RabbitMqTxSubmitProvider({ dummyTxId: true, rabbitmqUrl }, { logger }); + const resA = await provider.submitTx(new Uint8Array([1])); + // Called again to cover the idemopotent RabbitMqTxSubmitProvider.#ensureQueue() operation + const resB = await provider.submitTx(new Uint8Array([2])); + expect(resA).toBeUndefined(); + expect(resB).toBeUndefined(); + } catch (error) { + expect((error as { innerError: unknown }).innerError).toBeInstanceOf(ProviderError); + } + }; + it('resolves if successful', async () => { + const worker = new TxSubmitWorker( + { dummyTxId: true, parallel: true, rabbitmqUrl: GOOD_CONNECTION_URL }, + { logger, txSubmitProvider: { healthCheck: async () => ({ ok: true }), submitTx: () => Promise.resolve() } } + ); + expect.assertions(2); - await performSubmitTxTest(GOOD_CONNECTION_URL); + await worker.start(); + await performTest(GOOD_CONNECTION_URL); + await worker.stop(); }); it('rejects with errors thrown by the service', async () => { expect.assertions(1); - await performSubmitTxTest(BAD_CONNECTION_URL); + await performTest(BAD_CONNECTION_URL); }); }); }); diff --git a/packages/rabbitmq/test/utils.ts b/packages/rabbitmq/test/utils.ts index dbcf6c21397..50b3e387935 100644 --- a/packages/rabbitmq/test/utils.ts +++ b/packages/rabbitmq/test/utils.ts @@ -1,26 +1,54 @@ -import { Message, connect } from 'amqplib'; -import { RabbitMqTxSubmitProvider, TX_SUBMISSION_QUEUE } from '../src'; +import { RabbitMqTxSubmitProvider } from '../src'; +import { connect } from 'amqplib'; +import { dummyLogger } from 'ts-log'; +import axios from 'axios'; export const BAD_CONNECTION_URL = new URL('amqp://localhost:1234'); export const GOOD_CONNECTION_URL = new URL('amqp://localhost'); -export const enqueueFakeTx = async (data = [0, 1, 2, 3, 23]) => { - const rabbitMqTxSubmitProvider = new RabbitMqTxSubmitProvider(GOOD_CONNECTION_URL); - await rabbitMqTxSubmitProvider.submitTx(new Uint8Array(data)); - return rabbitMqTxSubmitProvider.close(); -}; +export const enqueueFakeTx = async (data = [0, 1, 2, 3, 23], logger = dummyLogger) => + new Promise(async (resolve, reject) => { + let err: unknown; + let rabbitMqTxSubmitProvider: RabbitMqTxSubmitProvider | null = null; + + try { + rabbitMqTxSubmitProvider = new RabbitMqTxSubmitProvider( + { dummyTxId: true, rabbitmqUrl: GOOD_CONNECTION_URL }, + { logger } + ); + await rabbitMqTxSubmitProvider.submitTx(new Uint8Array(data)); + } catch (error) { + err = error; + } finally { + if (rabbitMqTxSubmitProvider) await rabbitMqTxSubmitProvider.close(); + if (err) reject(err); + else resolve(); + } + }); + +export const removeAllQueues = async () => { + const queues = await axios.get('http://guest:guest@localhost:15672/api/queues/'); + + if (queues.data.length === 0) return; -export const removeAllMessagesFromQueue = async () => { const connection = await connect(GOOD_CONNECTION_URL.toString()); const channel = await connection.createChannel(); - await channel.assertQueue(TX_SUBMISSION_QUEUE); - let message: Message | false; - do { - message = await channel.get(TX_SUBMISSION_QUEUE); - if (message) channel.ack(message); - } while (message); + for (const queue of queues.data) await channel.deleteQueue(queue.name as string); await channel.close(); await connection.close(); }; + +export const testLogger = () => { + const messages: { message: unknown[]; level: 'debug' | 'error' | 'info' | 'trace' | 'warn' }[] = []; + + return { + debug: (...message: unknown[]) => messages.push({ level: 'debug', message }), + error: (...message: unknown[]) => messages.push({ level: 'error', message }), + info: (...message: unknown[]) => messages.push({ level: 'info', message }), + messages, + trace: (...message: unknown[]) => messages.push({ level: 'trace', message }), + warn: (...message: unknown[]) => messages.push({ level: 'warn', message }) + }; +}; diff --git a/yarn.lock b/yarn.lock index 17ae43669f6..ddc1872006a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3391,13 +3391,12 @@ ajv@^8.0.0, ajv@^8.0.1, ajv@^8.8.0: require-from-string "^2.0.2" uri-js "^4.2.2" -amqplib@^0.9.0: - version "0.9.1" - resolved "https://registry.yarnpkg.com/amqplib/-/amqplib-0.9.1.tgz#f2ad6e518a148a761e102bf791314fbe4cb94346" - integrity sha512-a1DP0H1LcLSMKPAnhUN2AKbVyEPqEUrUf7O+odhKGxaO+Tf0nWtuD7Zq5P9uZwZteu56OfW9EQozSCTKsAEk5w== +amqplib@^0.10.0: + version "0.10.0" + resolved "https://registry.yarnpkg.com/amqplib/-/amqplib-0.10.0.tgz#766d696f8ceae097ee9eb73e6796999e5d40a1db" + integrity sha512-UueEnRGY6upiSvGsSYM22Woa1SeSukqYtqgYW4Gj8gHvbf5BRhhYRqf3kQ8aSUYYffTOZi6SeOVW2eOXt0hpPA== dependencies: bitsyntax "~0.1.0" - bluebird "^3.7.2" buffer-more-ints "~1.0.0" readable-stream "1.x >=1.1.9" url-parse "~1.5.10" @@ -3981,11 +3980,6 @@ blake2b@2.1.3: blake2b-wasm "^1.1.0" nanoassert "^1.0.0" -bluebird@^3.7.2: - version "3.7.2" - resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.7.2.tgz#9f229c15be272454ffa973ace0dbee79a1b0c36f" - integrity sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg== - bn.js@^4.0.0, bn.js@^4.11.8, bn.js@^4.11.9: version "4.12.0" resolved "https://registry.yarnpkg.com/bn.js/-/bn.js-4.12.0.tgz#775b3f278efbb9718eec7361f483fb36fbbfea88"