diff --git a/packages/rabbitmq/package.json b/packages/rabbitmq/package.json index 5a67af3187f..6d9fa511e98 100644 --- a/packages/rabbitmq/package.json +++ b/packages/rabbitmq/package.json @@ -37,12 +37,15 @@ "@cardano-sdk/ogmios": "0.3.0", "@types/amqplib": "^0.8.2", "get-port-please": "^2.5.0", + "axios": "^0.27.2", "shx": "^0.3.3", "ws": "^8.5.0" }, "dependencies": { + "@cardano-ogmios/schema": "5.1.0", "@cardano-sdk/core": "0.3.0", - "amqplib": "^0.9.0", + "@cardano-sdk/util": "0.3.0", + "amqplib": "^0.10.0", "ts-log": "^2.2.4" }, "files": [ diff --git a/packages/rabbitmq/src/TxSubmitWorker.ts b/packages/rabbitmq/src/TxSubmitWorker.ts index b6ae1669103..99d325db101 100644 --- a/packages/rabbitmq/src/TxSubmitWorker.ts +++ b/packages/rabbitmq/src/TxSubmitWorker.ts @@ -1,11 +1,13 @@ /* eslint-disable @typescript-eslint/no-shadow */ +import { Cardano, ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core'; import { Channel, Connection, Message, connect } from 'amqplib'; import { Logger, dummyLogger } from 'ts-log'; -import { ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core'; -import { TX_SUBMISSION_QUEUE } from './rabbitmqTxSubmitProvider'; +import { TX_SUBMISSION_QUEUE, serializeError, waitForPending } from './utils'; const moduleName = 'TxSubmitWorker'; +type Optional = Pick, K> & Omit; + /** * Configuration options parameters for the TxSubmitWorker */ @@ -41,7 +43,7 @@ export interface TxSubmitWorkerDependencies { /** * The logger. Default: silent */ - logger?: Logger; + logger: Logger; /** * The provider to use to submit tx @@ -89,25 +91,34 @@ export class TxSubmitWorker { */ #dependencies: TxSubmitWorkerDependencies; - /** - * The function to call to resolve the start method exit Promise - */ - #exitResolver?: () => void; - /** * The internal worker status */ #status: 'connected' | 'connecting' | 'error' | 'idle' = 'idle'; /** - * @param {TxSubmitWorkerConfig} config The configuration options - * @param {TxSubmitWorkerDependencies} dependencies The dependency objects + * @param config The configuration options + * @param dependencies The dependency objects */ - constructor(config: TxSubmitWorkerConfig, dependencies: TxSubmitWorkerDependencies) { + constructor(config: TxSubmitWorkerConfig, dependencies: Optional) { this.#config = { parallelTxs: 3, pollingCycle: 500, ...config }; this.#dependencies = { logger: dummyLogger, ...dependencies }; } + /** + * The common handler for errors + * + * @param isAsync flag to identify asynchronous errors + * @param err the error itself + */ + private async errorHandler(isAsync: boolean, err: unknown) { + if (err) { + this.logError(err, isAsync); + this.#status = 'error'; + await this.stop(); + } + } + /** * Get the status of the worker * @@ -120,118 +131,108 @@ export class TxSubmitWorker { /** * Starts the worker */ - start() { - return new Promise(async (resolve, reject) => { - const closeHandler = async (isAsync: boolean, err: unknown) => { - if (err) { - this.logError(err, isAsync); - this.#exitResolver = undefined; - this.#status = 'error'; - await this.stop(); - reject(err); - } - }; - - try { - this.#dependencies.logger!.info(`${moduleName} init: checking tx submission provider health status`); + async start() { + try { + this.#dependencies.logger.info(`${moduleName} init: checking tx submission provider health status`); - const { ok } = await this.#dependencies.txSubmitProvider.healthCheck(); + const { ok } = await this.#dependencies.txSubmitProvider.healthCheck(); - if (!ok) throw new ProviderError(ProviderFailure.Unhealthy); + if (!ok) throw new ProviderError(ProviderFailure.Unhealthy); - this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ connection`); - this.#exitResolver = resolve; - this.#status = 'connecting'; - this.#connection = await connect(this.#config.rabbitmqUrl.toString()); - this.#connection.on('close', (error) => closeHandler(true, error)); + this.#dependencies.logger.info(`${moduleName} init: opening RabbitMQ connection`); + this.#status = 'connecting'; + this.#connection = await connect(this.#config.rabbitmqUrl.toString()); + this.#connection.on('close', (error) => this.errorHandler(true, error)); - this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ channel`); - this.#channel = await this.#connection.createChannel(); - this.#channel.on('close', (error) => closeHandler(true, error)); + this.#dependencies.logger.info(`${moduleName} init: opening RabbitMQ channel`); + this.#channel = await this.#connection.createChannel(); + this.#channel.on('close', (error) => this.errorHandler(true, error)); - this.#dependencies.logger!.info(`${moduleName} init: ensuring RabbitMQ queue`); - await this.#channel.assertQueue(TX_SUBMISSION_QUEUE); - this.#dependencies.logger!.info(`${moduleName}: init completed`); + this.#dependencies.logger.info(`${moduleName} init: ensuring RabbitMQ queue`); + await this.#channel.assertQueue(TX_SUBMISSION_QUEUE); + this.#dependencies.logger.info(`${moduleName}: init completed`); - if (this.#config.parallel) { - this.#dependencies.logger!.info(`${moduleName}: starting parallel mode`); - await this.#channel.prefetch(this.#config.parallelTxs!, true); + if (this.#config.parallel) { + this.#dependencies.logger.info(`${moduleName}: starting parallel mode`); + await this.#channel.prefetch(this.#config.parallelTxs!, true); - const parallelHandler = (message: Message | null) => (message ? this.submitTx(message) : null); - const { consumerTag } = await this.#channel.consume(TX_SUBMISSION_QUEUE, parallelHandler); + const parallelHandler = (message: Message | null) => (message ? this.submitTx(message) : null); - this.#consumerTag = consumerTag; - this.#status = 'connected'; - } else { - this.#dependencies.logger!.info(`${moduleName}: starting serial mode`); - await this.infiniteLoop(); - } - } catch (error) { - await closeHandler(false, error); + this.#consumerTag = (await this.#channel.consume(TX_SUBMISSION_QUEUE, parallelHandler)).consumerTag; + } else { + this.#dependencies.logger.info(`${moduleName}: starting serial mode`); + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.infiniteLoop(); } - }); + + this.#status = 'connected'; + } catch (error) { + await this.errorHandler(false, error); + if (error instanceof ProviderError) throw error; + throw new ProviderError(ProviderFailure.ConnectionFailure, error); + } } /** - * Stops the worker. Once connection shutdown is completed, - * the Promise returned by the start method is resolved as well + * Stops the worker. */ async stop() { - // This method needs to call this.#exitResolver at the end. - // Since it may be called more than once simultaneously, - // we need to ensure this.#exitResolver is called only once, - // so we immediately store its value in a local variable and we reset it - const exitResolver = this.#exitResolver; - this.#exitResolver = undefined; - - try { - this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ channel`); + this.#dependencies.logger.info(`${moduleName} shutdown: closing RabbitMQ channel`); + // In case of parallel worker; first of all cancel the consumer + if (this.#consumerTag) try { - if (this.#consumerTag) { - const consumerTag = this.#consumerTag; - this.#consumerTag = undefined; + // Let's immediately reset this.#consumerTag to be sure the cancel operation is called + // only once even if the this.stop methond is called more than once + const consumerTag = this.#consumerTag; + this.#consumerTag = undefined; - await this.#channel?.cancel(consumerTag); - } + await this.#channel!.cancel(consumerTag); } catch (error) { this.logError(error); } + // In case of serial worker; just instruct the infinite loop it can exit + else this.#continueForever = false; - this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ connection`); + // Wait for pending operations before closing + await waitForPending(this.#channel); - try { - await this.#connection?.close(); - } catch (error) { - this.logError(error); - } + try { + await this.#channel?.close(); + } catch (error) { + this.logError(error); + } - this.#dependencies.logger!.info(`${moduleName}: shutdown completed`); - this.#channel = undefined; - this.#connection = undefined; - this.#consumerTag = undefined; - this.#continueForever = false; - this.#status = 'idle'; - } finally { - // Only logging functions could throw an error here... - // Although this is almost impossible, we want to be sure exitResolver is called - exitResolver?.(); + this.#dependencies.logger.info(`${moduleName} shutdown: closing RabbitMQ connection`); + + try { + await this.#connection?.close(); + } catch (error) { + this.logError(error); } + + this.#dependencies.logger.info(`${moduleName}: shutdown completed`); + this.#channel = undefined; + this.#connection = undefined; + this.#status = 'idle'; } /** * Wrapper to log errors from try/catch blocks * - * @param {any} error the error to log + * @param error the error to log + * @param isAsync flag to set in case the error is asynchronous + * @param asWarning flag to log the error with warning loglevel */ - private logError(error: unknown, isAsync = false) { + private logError(error: unknown, isAsync = false, asWarning = false) { const errorMessage = // eslint-disable-next-line prettier/prettier error instanceof Error ? error.message : (typeof error === 'string' ? error : JSON.stringify(error)); const errorObject = { error: error instanceof Error ? error.name : 'Unknown error', isAsync, module: moduleName }; - this.#dependencies.logger!.error(errorObject, errorMessage); - if (error instanceof Error) this.#dependencies.logger!.debug(`${moduleName}:`, error.stack); + if (asWarning) this.#dependencies.logger.warn(errorObject, errorMessage); + else this.#dependencies.logger.error(errorObject, errorMessage); + if (error instanceof Error) this.#dependencies.logger.debug(`${moduleName}:`, error.stack); } /** @@ -239,7 +240,6 @@ export class TxSubmitWorker { */ private async infiniteLoop() { this.#continueForever = true; - this.#status = 'connected'; while (this.#continueForever) { const message = await this.#channel?.get(TX_SUBMISSION_QUEUE); @@ -254,29 +254,70 @@ export class TxSubmitWorker { /** * Submit a tx to the provider and ack (or nack) the message * - * @param {Message} message the message containing the transaction + * @param message the message containing the transaction */ private async submitTx(message: Message) { + const counter = ++this.#counter; + let isRetriable = false; + let serializableError: unknown; + let txId = ''; + try { - const counter = ++this.#counter; const { content } = message; + const txBody = new Uint8Array(content); + + // Register the handling of current transaction + txId = Cardano.util.deserializeTx(txBody).id.toString(); - this.#dependencies.logger!.info(`${moduleName}: submitting tx`); - this.#dependencies.logger!.debug(`${moduleName}: tx ${counter} dump:`, content.toString('hex')); - await this.#dependencies.txSubmitProvider.submitTx(new Uint8Array(content)); + this.#dependencies.logger.info(`${moduleName}: submitting tx #${counter} id: ${txId}`); + this.#dependencies.logger.debug(`${moduleName}: tx #${counter} dump:`, content.toString('hex')); + await this.#dependencies.txSubmitProvider.submitTx(txBody); - this.#dependencies.logger!.debug(`${moduleName}: ACKing RabbitMQ message ${counter}`); + this.#dependencies.logger.debug(`${moduleName}: ACKing RabbitMQ message #${counter}`); this.#channel?.ack(message); } catch (error) { - this.logError(error); - - try { - this.#dependencies.logger!.info(`${moduleName}: NACKing RabbitMQ message`); - this.#channel?.nack(message); - // eslint-disable-next-line no-catch-shadow - } catch (error) { - this.logError(error); + ({ isRetriable, serializableError } = await this.submitTxErrorHandler(error, counter, message)); + } finally { + // If there is no error or the error can't be retried + if (!serializableError || !isRetriable) { + // Send the response to the original submitter + try { + // An empty response message means succesful submission + const message = serializableError || {}; + await this.#channel!.assertQueue(txId); + this.logError(`${moduleName}: sending response for message #${counter}`); + this.#channel!.sendToQueue(txId, Buffer.from(JSON.stringify(message))); + } catch (error) { + this.logError(`${moduleName}: while sending response for message #${counter}`); + this.logError(error); + } } } } + + /** + * The error handler of submitTx method + */ + private async submitTxErrorHandler(err: unknown, counter: number, message: Message) { + const { isRetriable, serializableError } = serializeError(err); + + if (isRetriable) this.#dependencies.logger.warn(`${moduleName}: submitting tx #${counter}`); + else this.#dependencies.logger.error(`${moduleName}: submitting tx #${counter}`); + this.logError(err, false, isRetriable); + + const action = `${isRetriable ? 'N' : ''}ACKing RabbitMQ message #${counter}`; + + try { + this.#dependencies.logger.info(`${moduleName}: ${action}`); + // In RabbitMQ languange, NACKing a message means to ask to retry for it + // We NACK only those messages which had an error which can be retried + if (isRetriable) this.#channel?.nack(message); + else this.#channel?.ack(message); + } catch (error) { + this.logError(`${moduleName}: while ${action}`); + this.logError(error); + } + + return { isRetriable, serializableError }; + } } diff --git a/packages/rabbitmq/src/index.ts b/packages/rabbitmq/src/index.ts index 94ce53d21d6..25bede95d26 100644 --- a/packages/rabbitmq/src/index.ts +++ b/packages/rabbitmq/src/index.ts @@ -1,2 +1,3 @@ export * from './TxSubmitWorker'; export * from './rabbitmqTxSubmitProvider'; +export * from './utils'; diff --git a/packages/rabbitmq/src/rabbitmqTxSubmitProvider.ts b/packages/rabbitmq/src/rabbitmqTxSubmitProvider.ts index 71027472c87..6f3d55c149a 100644 --- a/packages/rabbitmq/src/rabbitmqTxSubmitProvider.ts +++ b/packages/rabbitmq/src/rabbitmqTxSubmitProvider.ts @@ -2,8 +2,30 @@ import { Buffer } from 'buffer'; import { Cardano, HealthCheckResponse, ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core'; import { Channel, Connection, connect } from 'amqplib'; import { Logger, dummyLogger } from 'ts-log'; +import { TX_SUBMISSION_QUEUE, getErrorPrototype, waitForPending } from './utils'; +import { fromSerializableObject } from '@cardano-sdk/util'; -export const TX_SUBMISSION_QUEUE = 'cardano-tx-submit'; +const moduleName = 'RabbitMqTxSubmitProvider'; + +/** + * Configuration options parameters for the RabbitMqTxSubmitProvider + */ +export interface RabbitMqTxSubmitProviderConfig { + /** + * The RabbitMQ connection URL + */ + rabbitmqUrl: URL; +} + +/** + * Dependencies for the RabbitMqTxSubmitProvider + */ +export interface RabbitMqTxSubmitProviderDependencies { + /** + * The logger. Default: silent + */ + logger: Logger; +} /** * Connect to a [RabbitMQ](https://www.rabbitmq.com/) instance @@ -11,17 +33,28 @@ export const TX_SUBMISSION_QUEUE = 'cardano-tx-submit'; export class RabbitMqTxSubmitProvider implements TxSubmitProvider { #channel?: Channel; #connection?: Connection; - #connectionURL: URL; - #logger: Logger; #queueWasCreated = false; /** - * @param {URL} connectionURL RabbitMQ connection URL - * @param {Logger} logger object implementing the Logger abstract class + * The configuration options + */ + #config: RabbitMqTxSubmitProviderConfig; + + /** + * The dependency objects */ - constructor(connectionURL: URL, logger: Logger = dummyLogger) { - this.#connectionURL = connectionURL; - this.#logger = logger; + #dependencies: RabbitMqTxSubmitProviderDependencies; + + /** + * @param config The configuration options + * @param dependencies The dependency objects + */ + constructor( + config: RabbitMqTxSubmitProviderConfig, + dependencies: Partial = {} + ) { + this.#config = config; + this.#dependencies = { logger: dummyLogger, ...dependencies }; } /** @@ -31,35 +64,43 @@ export class RabbitMqTxSubmitProvider implements TxSubmitProvider { if (this.#connection) return; try { - this.#connection = await connect(this.#connectionURL.toString()); + this.#connection = await connect(this.#config.rabbitmqUrl.toString()); } catch (error) { - await this.close(); + this.#dependencies.logger.error(`${moduleName}: while connecting`, error); + void this.close(); throw new ProviderError(ProviderFailure.ConnectionFailure, error); } + this.#connection.on('error', (error: unknown) => + this.#dependencies.logger.error(`${moduleName}: connection error`, error) + ); try { this.#channel = await this.#connection.createChannel(); } catch (error) { - await this.close(); + this.#dependencies.logger.error(`${moduleName}: while creating channel`, error); + void this.close(); throw new ProviderError(ProviderFailure.ConnectionFailure, error); } + this.#channel.on('error', (error: unknown) => + this.#dependencies.logger.error(`${moduleName}: channel error`, error) + ); } /** * Idempotently (channel.assertQueue does the job for us) creates the queue * - * @param {boolean} force Forces the creation of the queue just to have a response from the server + * @param force Forces the creation of the queue just to have a response from the server */ async #ensureQueue(force?: boolean) { if (this.#queueWasCreated && !force) return; await this.#connectAndCreateChannel(); - this.#queueWasCreated = true; try { await this.#channel!.assertQueue(TX_SUBMISSION_QUEUE); + this.#queueWasCreated = true; } catch (error) { - await this.close(); + void this.close(); throw new ProviderError(ProviderFailure.ConnectionFailure, error); } } @@ -68,18 +109,21 @@ export class RabbitMqTxSubmitProvider implements TxSubmitProvider { * Closes the connection to RabbitMQ and (for interl purposes) it resets the state as well */ async close() { + // Wait for pending operations before closing + await waitForPending(this.#channel); + try { await this.#channel?.close(); // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (error: any) { - this.#logger.error({ error: error.name, module: 'rabbitmqTxSubmitProvider' }, error.message); + this.#dependencies.logger.error({ error: error.name, module: moduleName }, error.message); } try { await this.#connection?.close(); // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (error: any) { - this.#logger.error({ error: error.name, module: 'rabbitmqTxSubmitProvider' }, error.message); + this.#dependencies.logger.error({ error: error.name, module: moduleName }, error.message); } this.#channel = undefined; @@ -99,7 +143,7 @@ export class RabbitMqTxSubmitProvider implements TxSubmitProvider { await this.#ensureQueue(true); ok = true; } catch { - this.#logger.error({ error: 'Connection error', module: 'rabbitmqTxSubmitProvider' }); + this.#dependencies.logger.error({ error: 'Connection error', module: 'rabbitmqTxSubmitProvider' }); } return { ok }; @@ -108,14 +152,70 @@ export class RabbitMqTxSubmitProvider implements TxSubmitProvider { /** * Submit a transaction to RabbitMQ * - * @param {Uint8Array} signedTransaction The Uint8Array representation of a signedTransaction + * @param signedTransaction The Uint8Array representation of a signedTransaction */ async submitTx(signedTransaction: Uint8Array) { - try { - await this.#ensureQueue(); - this.#channel!.sendToQueue(TX_SUBMISSION_QUEUE, Buffer.from(signedTransaction)); - } catch (error) { - throw Cardano.util.asTxSubmissionError(error) || new Cardano.UnknownTxSubmissionError(error); - } + return new Promise(async (resolve, reject) => { + let txId = ''; + + const done = (error?: unknown) => { + this.#dependencies.logger.debug(`${moduleName}: ${error ? 'rejecting' : 'resolving'} tx id: ${txId}`); + + if (error) reject(error); + else resolve(); + }; + + try { + txId = Cardano.util.deserializeTx(signedTransaction).id.toString(); + + this.#dependencies.logger.info(`${moduleName}: queuing tx id: ${txId}`); + + // Actually send the message + await this.#ensureQueue(); + this.#channel!.sendToQueue(TX_SUBMISSION_QUEUE, Buffer.from(signedTransaction)); + this.#dependencies.logger.debug(`${moduleName}: queued tx id: ${txId}`); + + // Set the queue for response message + this.#dependencies.logger.debug(`${moduleName}: creating queue: ${txId}`); + await this.#channel!.assertQueue(txId); + this.#dependencies.logger.debug(`${moduleName}: created queue: ${txId}`); + + // We noticed that may happens that the response message handler is called before the + // Promise is resolved, that's why we are awaiting for it inside the handler itself + const consumePromise = this.#channel!.consume(txId, async (message) => { + try { + this.#dependencies.logger.debug(`${moduleName}: got result message from queue: ${txId}`); + + // This should never happen, just handle it for correct logging + if (!message) return done(new Error('null message from result queue')); + + this.#channel!.ack(message); + + const { consumerTag } = await consumePromise; + + this.#dependencies.logger.debug(`${moduleName}: canceling consumer for queue: ${txId}`); + await this.#channel!.cancel(consumerTag); + this.#dependencies.logger.debug(`${moduleName}: deleting queue: ${txId}`); + await this.#channel!.deleteQueue(txId); + this.#dependencies.logger.debug(`${moduleName}: deleted queue: ${txId}`); + + const result = JSON.parse(message.content.toString()); + + // An empty result message means submission ok + if (Object.keys(result).length === 0) return done(); + + done(fromSerializableObject(result, { getErrorPrototype })); + } catch (error) { + this.#dependencies.logger.error(`${moduleName}: while handling response message: ${txId}`); + this.#dependencies.logger.error(error); + done(error); + } + }); + } catch (error) { + this.#dependencies.logger.error(`${moduleName}: while queuing transaction: ${txId}`); + this.#dependencies.logger.error(error); + done(Cardano.util.asTxSubmissionError(error) || new Cardano.UnknownTxSubmissionError(error)); + } + }); } } diff --git a/packages/rabbitmq/src/tsconfig.json b/packages/rabbitmq/src/tsconfig.json index 768efea96fd..3b3f560dd6c 100644 --- a/packages/rabbitmq/src/tsconfig.json +++ b/packages/rabbitmq/src/tsconfig.json @@ -5,9 +5,5 @@ "outDir": "../dist/cjs", "rootDir": "." }, - "references": [ - { - "path": "../../core/src" - } - ] + "references": [{ "path": "../../core/src" }, { "path": "../../util/src" }] } diff --git a/packages/rabbitmq/src/utils.ts b/packages/rabbitmq/src/utils.ts new file mode 100644 index 00000000000..b4410af03e4 --- /dev/null +++ b/packages/rabbitmq/src/utils.ts @@ -0,0 +1,62 @@ +import { Cardano } from '@cardano-sdk/core'; +import { OutsideOfValidityInterval } from '@cardano-ogmios/schema'; +import { toSerializableObject } from '@cardano-sdk/util'; + +export const TX_SUBMISSION_QUEUE = 'cardano-tx-submit'; + +/** + * Analyzes a serializable error to get the right prototype object + * + * @param error the error to analyze + * @returns the right prototype for the error + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const getErrorPrototype = (error: unknown) => { + if (typeof error === 'object') { + const rawError = error as Cardano.TxSubmissionError; + + if (typeof rawError.name === 'string' && typeof rawError.message === 'string') { + const txSubmissionErrorName = rawError.name as keyof typeof Cardano.TxSubmissionErrors; + const ErrorClass = Cardano.TxSubmissionErrors[txSubmissionErrorName]; + + if (ErrorClass) return ErrorClass.prototype; + } + } + + return Error.prototype; +}; + +/** + * Serializes an error and checks if it is retriable + * + * @param err the error to serialize + */ +export const serializeError = (err: unknown) => { + let isRetriable = false; + + const serializableError = toSerializableObject(err); + + if (err instanceof Cardano.TxSubmissionErrors.OutsideOfValidityIntervalError) { + const details = JSON.parse(err.message) as OutsideOfValidityInterval['outsideOfValidityInterval']; + + if (details.interval.invalidBefore && details.currentSlot <= details.interval.invalidBefore) isRetriable = true; + } + + return { isRetriable, serializableError }; +}; + +// Workaround inspired to https://github.com/amqp-node/amqplib/issues/250#issuecomment-888558719 +// to avoid the error reported on https://github.com/amqp-node/amqplib/issues/692 +export const waitForPending = async (channel: unknown) => { + const check = () => { + const { pending, reply } = channel as { pending: unknown[]; reply: unknown }; + + return pending.length > 0 || reply !== null; + }; + + try { + while (check()) await new Promise((resolve) => setTimeout(resolve, 50)); + } catch { + // If something is wrong in the workaround as well... let's simply go on and close the channel + } +}; diff --git a/packages/rabbitmq/test/TxSubmitWorker.test.ts b/packages/rabbitmq/test/TxSubmitWorker.test.ts index d20f8a865a8..1b2caf37dc6 100644 --- a/packages/rabbitmq/test/TxSubmitWorker.test.ts +++ b/packages/rabbitmq/test/TxSubmitWorker.test.ts @@ -1,30 +1,56 @@ -import { BAD_CONNECTION_URL, GOOD_CONNECTION_URL, enqueueFakeTx, removeAllMessagesFromQueue } from './utils'; -import { TxSubmitProvider } from '@cardano-sdk/core'; -import { TxSubmitWorker } from '../src'; +import { + BAD_CONNECTION_URL, + GOOD_CONNECTION_URL, + enqueueFakeTx, + removeAllQueues, + testLogger, + txsPromise +} from './utils'; +import { Cardano, ProviderError, TxSubmitProvider } from '@cardano-sdk/core'; +import { RabbitMqTxSubmitProvider, TxSubmitWorker } from '../src'; import { createMockOgmiosServer, listenPromise, serverClosePromise } from '@cardano-sdk/ogmios/test/mocks/mockOgmiosServer'; -import { dummyLogger } from 'ts-log'; import { getRandomPort } from 'get-port-please'; import { ogmiosTxSubmitProvider, urlToConnectionConfig } from '@cardano-sdk/ogmios'; -import { removeRabbitMQContainer, setupRabbitMQContainer } from './jest-setup/docker'; import http from 'http'; -const logger = dummyLogger; - describe('TxSubmitWorker', () => { - let txSubmitProvider: TxSubmitProvider; + let logger: ReturnType; + let mock: http.Server | undefined; let port: number; + let txSubmitProvider: TxSubmitProvider; + let worker: TxSubmitWorker | undefined; beforeAll(async () => { port = await getRandomPort(); txSubmitProvider = ogmiosTxSubmitProvider(urlToConnectionConfig(new URL(`http://localhost:${port}/`))); }); + beforeEach(async () => { + await removeAllQueues(); + logger = testLogger(); + }); + + afterEach(async () => { + if (mock) { + await serverClosePromise(mock); + mock = undefined; + } + + if (worker) { + await worker.stop(); + worker = undefined; + } + + // Uncomment this to have evidence of all the log messages + // console.log(logger.messages); + }); + it('is safe to call stop method on an idle worker', async () => { - const worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL }, { logger, txSubmitProvider }); + worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL }, { logger, txSubmitProvider }); expect(worker).toBeInstanceOf(TxSubmitWorker); expect(worker.getStatus()).toEqual('idle'); @@ -33,191 +59,149 @@ describe('TxSubmitWorker', () => { }); it('rejects if the TxSubmitProvider is unhealthy on start', async () => { - const worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL }, { logger, txSubmitProvider }); - - const unhealthyMock = createMockOgmiosServer({ + mock = createMockOgmiosServer({ healthCheck: { response: { networkSynchronization: 0.8, success: true } }, submitTx: { response: { success: true } } }); - await listenPromise(unhealthyMock, port); + await listenPromise(mock, port); - try { - const res = await worker.start(); - expect(res).toBeDefined(); - } catch (error) { - expect(error).toBeDefined(); - } + worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL }, { logger, txSubmitProvider }); - await serverClosePromise(unhealthyMock); + await expect(worker.start()).rejects.toBeInstanceOf(ProviderError); }); it('rejects if unable to connect to the RabbitMQ broker', async () => { - const worker = new TxSubmitWorker({ rabbitmqUrl: BAD_CONNECTION_URL }, { logger, txSubmitProvider }); - const healthyMock = createMockOgmiosServer({ + mock = createMockOgmiosServer({ healthCheck: { response: { networkSynchronization: 1, success: true } }, submitTx: { response: { success: true } } }); - await listenPromise(healthyMock, port); + await listenPromise(mock, port); - try { - const res = await worker.start(); - expect(res).toBeDefined(); - } catch (error) { - expect(error).toBeDefined(); - } + worker = new TxSubmitWorker({ rabbitmqUrl: BAD_CONNECTION_URL }, { logger, txSubmitProvider }); - await serverClosePromise(healthyMock); + await expect(worker.start()).rejects.toBeInstanceOf(ProviderError); }); - describe('RabbitMQ connection failure while running', () => { - const CONTAINER_NAME = 'cardano-rabbitmq-local-test'; - - const performTest = async (options: { parallel: boolean }) => { - const healthyMock = createMockOgmiosServer({ - healthCheck: { response: { networkSynchronization: 1, success: true } }, - submitTx: { response: { success: true } } - }); - - await listenPromise(healthyMock, port); - - // Set up a new RabbitMQ container to test TxSubmitWorker on RabbitMQ server shut down - const rabbitmqPort = await getRandomPort(); - await setupRabbitMQContainer(CONTAINER_NAME, rabbitmqPort); - - // Actually create the TxSubmitWorker - const worker = new TxSubmitWorker( - { rabbitmqUrl: new URL(`amqp://localhost:${rabbitmqPort}`), ...options }, - { logger, txSubmitProvider } - ); - const startPromise = worker.start(); - - // Wait until the worker is connected to the RabbitMQ server - await new Promise(async (resolve) => { - // eslint-disable-next-line @typescript-eslint/no-shadow - while (worker.getStatus() === 'connecting') await new Promise((resolve) => setTimeout(resolve, 50)); - resolve(); - }); - - // Stop the RabbitMQ container while the TxSubmitWorker is connected - const removeContainerPromise = removeRabbitMQContainer(CONTAINER_NAME); - const innerMessage = "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"; - const fullMessage = `Connection closed: 320 (CONNECTION-FORCED) with message "${innerMessage}"`; - - // Test the TxSubmitWorker actually exits with error - await expect(startPromise).rejects.toThrow(new Error(fullMessage)); - - // Test teardown - await serverClosePromise(healthyMock); - await removeContainerPromise; - }; - - it('rejects when configured to process jobs serially', async () => await performTest({ parallel: false }), 20_000); - it( - 'rejects when configured to process jobs in parallel', - async () => await performTest({ parallel: true }), - 20_000 - ); - }); + describe('error while tx submission', () => { + describe('tx submission is retried if the error is retiable', () => { + // eslint-disable-next-line unicorn/consistent-function-scoping + const performTest = async (options: { parallel: boolean }) => { + const spy = jest.fn(); + let hookAlreadyCalled = false; + let successMockListenPromise = Promise.resolve(new http.Server()); + let successMock = new http.Server(); + + const failMock = createMockOgmiosServer({ + healthCheck: { response: { networkSynchronization: 1, success: true } }, + submitTx: { response: { failWith: { type: 'beforeValidityInterval' }, success: false } }, + submitTxHook: () => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + (async () => { + if (hookAlreadyCalled) return; + + // This hook may be called multple times... ensure the core is executed only once + hookAlreadyCalled = true; + + // Stop the failing mock and start the succes one + await serverClosePromise(failMock); + successMockListenPromise = listenPromise(successMock, port); + })(); + } + }); + + // Start a failing ogmios server + await listenPromise(failMock, port); + + // Enqueue a tx + const providerClosePromise = enqueueFakeTx(); + + // Actually create the TxSubmitWorker + worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL, ...options }, { logger, txSubmitProvider }); + await worker.start(); + + await new Promise((resolve) => { + successMock = createMockOgmiosServer({ + healthCheck: { response: { networkSynchronization: 1, success: true } }, + submitTx: { response: { success: true } }, + submitTxHook: () => { + spy(); + // Once the transaction is submitted with success we can stop the worker + // We wait half a second to be sure the tx is submitted only once + setTimeout(() => { + resolve(); + }, 500); + } + }); + }); + + // All these Promises are the return value of async functions called in a not async context, + // we need to await for them to perform a correct test teardown + await Promise.all([successMockListenPromise, providerClosePromise, serverClosePromise(successMock)]); + expect(spy).toBeCalledTimes(1); + }; + + it('when configured to process jobs serially', async () => performTest({ parallel: false })); + it('when configured to process jobs in parallel', async () => performTest({ parallel: true })); + }); - describe('tx submission is retried until success', () => { - // First of all we need to remove from the queue every message sent by previous tests/suites - beforeAll(removeAllMessagesFromQueue); - - // eslint-disable-next-line unicorn/consistent-function-scoping - const performTest = async (options: { parallel: boolean }) => { - const spy = jest.fn(); - let hookAlreadyCalled = false; - let successMockListenPromise = Promise.resolve(new http.Server()); - let stopPromise = Promise.resolve(); - // eslint-disable-next-line prefer-const - let worker: TxSubmitWorker; - - const successMock = createMockOgmiosServer({ - healthCheck: { response: { networkSynchronization: 1, success: true } }, - submitTx: { response: { success: true } }, - submitTxHook: () => { - spy(); - // Once the transaction is submitted with success we can stop the worker - // We wait half a second to be ensure the tx is submitted only once - setTimeout(() => (stopPromise = worker.stop()), 500); - } - }); - - const failMock = createMockOgmiosServer({ - healthCheck: { response: { networkSynchronization: 1, success: true } }, - submitTx: { response: { failWith: { type: 'eraMismatch' }, success: false } }, - submitTxHook: () => { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - (async () => { - if (hookAlreadyCalled) return; - - // This hook may be called multple times... ensure the core is executed only once - hookAlreadyCalled = true; - - // Stop the failing mock and start the succes one - await serverClosePromise(failMock); - successMockListenPromise = listenPromise(successMock, port); - })(); - } - }); - - // Start a failing ogmios server - await listenPromise(failMock, port); - - // Enqueue a tx - const providerClosePromise = enqueueFakeTx(); - - // Actually create the TxSubmitWorker - worker = new TxSubmitWorker({ rabbitmqUrl: GOOD_CONNECTION_URL, ...options }, { logger, txSubmitProvider }); - const startPromise = worker.start(); - - await expect(startPromise).resolves.toEqual(undefined); - await serverClosePromise(successMock); - // All these Promises are the return value of async functions called in a not async context, - // we need to await for them to perform a correct test teardown - await Promise.all([stopPromise, successMockListenPromise, providerClosePromise]); - expect(spy).toBeCalledTimes(1); - }; - - it('when configured to process jobs serially', async () => performTest({ parallel: false })); - it('when configured to process jobs in parallel', async () => performTest({ parallel: true })); + describe('the error is propagated to RabbitMqTxSubmitProvider', () => { + // eslint-disable-next-line unicorn/consistent-function-scoping + const performTest = async (options: { parallel: boolean }) => { + mock = createMockOgmiosServer({ + healthCheck: { response: { networkSynchronization: 1, success: true } }, + submitTx: { response: { failWith: { type: 'eraMismatch' }, success: false } } + }); + + // Start the mock + await listenPromise(mock, port); + + // Actually create the TxSubmitWorker + worker = new TxSubmitWorker( + { pollingCycle: 50, rabbitmqUrl: GOOD_CONNECTION_URL, ...options }, + { logger, txSubmitProvider } + ); + await worker.start(); + + // Tx submission by RabbitMqTxSubmitProvider must reject with the same error got by TxSubmitWorker + await expect(enqueueFakeTx(0, logger)).rejects.toBeInstanceOf(Cardano.TxSubmissionErrors.EraMismatchError); + }; + + it('when configured to process jobs serially', async () => performTest({ parallel: false })); + it('when configured to process jobs in parallel', async () => performTest({ parallel: true })); + }); }); it('submission is parallelized up to parallelTx Tx simultaneously', async () => { - // First of all we need to remove from the queue every message sent by previous tests/suites - await removeAllMessagesFromQueue(); - - let stopPromise = Promise.resolve(); - - const loggedMessages: unknown[][] = []; - const testLogger = { - debug: (...args: unknown[]) => loggedMessages.push(args), - error: jest.fn(), - info: jest.fn(), - trace: jest.fn(), - warn: jest.fn() - }; - - const worker = new TxSubmitWorker( - { parallel: true, parallelTxs: 4, rabbitmqUrl: GOOD_CONNECTION_URL }, - { logger: testLogger, txSubmitProvider } - ); + const txs = await txsPromise; + const delays = [5, 2, 1, 3, 3, 4, 4]; - const mock = createMockOgmiosServer({ + mock = createMockOgmiosServer({ healthCheck: { response: { networkSynchronization: 1, success: true } }, submitTx: { response: { success: true } }, submitTxHook: async (data) => { + const txBody = Buffer.from(data!).toString('hex'); + const txIdx = (() => { + for (let i = 0; i < txs.length; ++i) if (txBody === txs[i].txBodyHex) return i; + })(); + // Wait 100ms * the first byte of the Tx before sending the result - await new Promise((resolve) => setTimeout(resolve, 100 * data![0])); - // Exit condition: a Tx with length === 2 - if (data?.length === 2) stopPromise = worker.stop(); + // eslint-disable-next-line @typescript-eslint/no-shadow + await new Promise((resolve) => setTimeout(resolve, 100 * delays[txIdx!])); } }); await listenPromise(mock, port); + worker = new TxSubmitWorker( + { parallel: true, parallelTxs: 4, rabbitmqUrl: GOOD_CONNECTION_URL }, + { logger, txSubmitProvider } + ); + await worker.start(); + + const rabbitMqTxSubmitProvider = new RabbitMqTxSubmitProvider({ rabbitmqUrl: GOOD_CONNECTION_URL }); + /* * Tx submission plan, time sample: 100ms * 11111 @@ -225,33 +209,42 @@ describe('TxSubmitWorker', () => { * 3555 * 4447777 */ - await enqueueFakeTx([5]); - await enqueueFakeTx([2]); - await enqueueFakeTx([1]); - await enqueueFakeTx([3]); - await enqueueFakeTx([3]); - await enqueueFakeTx([4]); - await enqueueFakeTx([4, 0]); - await expect(worker.start()).resolves.toEqual(undefined); - await Promise.all([stopPromise, serverClosePromise(mock)]); + const promises: Promise[] = []; + const result = [undefined, undefined, undefined, undefined, undefined, undefined, undefined]; + + for (let i = 0; i < 7; ++i) { + promises.push(rabbitMqTxSubmitProvider.submitTx(txs[i].txBodyUint8Array)); + // Wait 10ms to be sure the transactions are enqueued in the right order + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + await expect(Promise.all(promises)).resolves.toEqual(result); + + await rabbitMqTxSubmitProvider.close(); // We check only the relevant messages - expect(loggedMessages.filter(([_]) => typeof _ === 'string' && _.match(/(tx \d dump)|ACKing RabbitMQ/))).toEqual([ - ['TxSubmitWorker: tx 1 dump:', '05'], - ['TxSubmitWorker: tx 2 dump:', '02'], - ['TxSubmitWorker: tx 3 dump:', '01'], - ['TxSubmitWorker: tx 4 dump:', '03'], - ['TxSubmitWorker: ACKing RabbitMQ message 3'], - ['TxSubmitWorker: tx 5 dump:', '03'], - ['TxSubmitWorker: ACKing RabbitMQ message 2'], - ['TxSubmitWorker: tx 6 dump:', '04'], - ['TxSubmitWorker: ACKing RabbitMQ message 4'], - ['TxSubmitWorker: tx 7 dump:', '0400'], - ['TxSubmitWorker: ACKing RabbitMQ message 5'], - ['TxSubmitWorker: ACKing RabbitMQ message 1'], - ['TxSubmitWorker: ACKing RabbitMQ message 6'], - ['TxSubmitWorker: ACKing RabbitMQ message 7'] + expect( + logger.messages + .filter(({ level }) => level === 'debug') + .filter(({ message }) => typeof message[0] === 'string' && message[0].match(/(tx #\d dump)|ACKing RabbitMQ/)) + .map(({ message }) => message) + .map((message) => (message.length === 2 ? [message[0]] : message)) + ).toEqual([ + ['TxSubmitWorker: tx #1 dump:'], + ['TxSubmitWorker: tx #2 dump:'], + ['TxSubmitWorker: tx #3 dump:'], + ['TxSubmitWorker: tx #4 dump:'], + ['TxSubmitWorker: ACKing RabbitMQ message #3'], + ['TxSubmitWorker: tx #5 dump:'], + ['TxSubmitWorker: ACKing RabbitMQ message #2'], + ['TxSubmitWorker: tx #6 dump:'], + ['TxSubmitWorker: ACKing RabbitMQ message #4'], + ['TxSubmitWorker: tx #7 dump:'], + ['TxSubmitWorker: ACKing RabbitMQ message #5'], + ['TxSubmitWorker: ACKing RabbitMQ message #1'], + ['TxSubmitWorker: ACKing RabbitMQ message #6'], + ['TxSubmitWorker: ACKing RabbitMQ message #7'] ]); }); }); diff --git a/packages/rabbitmq/test/jest-setup/docker.ts b/packages/rabbitmq/test/jest-setup/docker.ts index f05c23f441b..ca436c0953e 100644 --- a/packages/rabbitmq/test/jest-setup/docker.ts +++ b/packages/rabbitmq/test/jest-setup/docker.ts @@ -2,7 +2,7 @@ import { connect } from 'amqplib'; import { imageExists, pullImageAsync } from 'dockerode-utils'; import Docker from 'dockerode'; -const CONTAINER_IMAGE = 'rabbitmq:3.8-alpine'; +const CONTAINER_IMAGE = 'rabbitmq:3.10-management'; const CONTAINER_NAME = 'cardano-rabbitmq-test'; export const removeRabbitMQContainer = async (containerName = CONTAINER_NAME) => { @@ -34,7 +34,7 @@ export const setupRabbitMQContainer = async (containerName = CONTAINER_NAME, por await removeRabbitMQContainer(containerName); const container = await docker.createContainer({ - HostConfig: { PortBindings: { '5672/tcp': [{ HostPort: `${port}` }] } }, + HostConfig: { PortBindings: { '5672/tcp': [{ HostPort: `${port}` }], '15672/tcp': [{ HostPort: '15672' }] } }, Image: CONTAINER_IMAGE, name: containerName }); diff --git a/packages/rabbitmq/test/rabbitmqTxSubmitProvider.test.ts b/packages/rabbitmq/test/rabbitmqTxSubmitProvider.test.ts index 40fd97e232d..fe1acc1019f 100644 --- a/packages/rabbitmq/test/rabbitmqTxSubmitProvider.test.ts +++ b/packages/rabbitmq/test/rabbitmqTxSubmitProvider.test.ts @@ -1,21 +1,35 @@ -import { BAD_CONNECTION_URL, GOOD_CONNECTION_URL } from './utils'; +import { BAD_CONNECTION_URL, GOOD_CONNECTION_URL, removeAllQueues, testLogger, txsPromise } from './utils'; import { ProviderError, TxSubmitProvider } from '@cardano-sdk/core'; -import { RabbitMqTxSubmitProvider } from '../src'; +import { RabbitMqTxSubmitProvider, TxSubmitWorker } from '../src'; describe('RabbitMqTxSubmitProvider', () => { - let provider: TxSubmitProvider; + let logger: ReturnType; + let provider: TxSubmitProvider | undefined; - afterEach(() => provider?.close!()); + beforeEach(async () => { + await removeAllQueues(); + logger = testLogger(); + }); + + afterEach(async () => { + if (provider) { + await provider.close!(); + provider = undefined; + } + + // Uncomment this to have evidence of all the log messages + // console.log(logger.messages); + }); describe('healthCheck', () => { it('is not ok if cannot connect', async () => { - provider = new RabbitMqTxSubmitProvider(BAD_CONNECTION_URL); + provider = new RabbitMqTxSubmitProvider({ rabbitmqUrl: BAD_CONNECTION_URL }); const res = await provider.healthCheck(); expect(res).toEqual({ ok: false }); }); it('is ok if can connect', async () => { - provider = new RabbitMqTxSubmitProvider(GOOD_CONNECTION_URL); + provider = new RabbitMqTxSubmitProvider({ rabbitmqUrl: GOOD_CONNECTION_URL }); const resA = await provider.healthCheck(); // Call again to cover the idemopotent RabbitMqTxSubmitProvider.#connectAndCreateChannel() operation const resB = await provider.healthCheck(); @@ -24,29 +38,37 @@ describe('RabbitMqTxSubmitProvider', () => { }); }); - const performSubmitTxTest = async (connectionURL: URL) => { - try { - provider = new RabbitMqTxSubmitProvider(connectionURL); - const resA = await provider.submitTx(new Uint8Array()); - // Called again to cover the idemopotent RabbitMqTxSubmitProvider.#ensureQueue() operation - const resB = await provider.submitTx(new Uint8Array()); - expect(resA).toBeUndefined(); - expect(resB).toBeUndefined(); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (error: any) { - expect(error.innerError).toBeInstanceOf(ProviderError); - } - }; - describe('submitTx', () => { + // eslint-disable-next-line unicorn/consistent-function-scoping + const performTest = async (rabbitmqUrl: URL) => { + try { + const txs = await txsPromise; + provider = new RabbitMqTxSubmitProvider({ rabbitmqUrl }, { logger }); + const resA = await provider.submitTx(txs[0].txBodyUint8Array); + // Called again to cover the idemopotent RabbitMqTxSubmitProvider.#ensureQueue() operation + const resB = await provider.submitTx(txs[1].txBodyUint8Array); + expect(resA).toBeUndefined(); + expect(resB).toBeUndefined(); + } catch (error) { + expect((error as ProviderError).innerError).toBeInstanceOf(ProviderError); + } + }; + it('resolves if successful', async () => { + const worker = new TxSubmitWorker( + { parallel: true, rabbitmqUrl: GOOD_CONNECTION_URL }, + { logger, txSubmitProvider: { healthCheck: async () => ({ ok: true }), submitTx: () => Promise.resolve() } } + ); + expect.assertions(2); - await performSubmitTxTest(GOOD_CONNECTION_URL); + await worker.start(); + await performTest(GOOD_CONNECTION_URL); + await worker.stop(); }); it('rejects with errors thrown by the service', async () => { expect.assertions(1); - await performSubmitTxTest(BAD_CONNECTION_URL); + await performTest(BAD_CONNECTION_URL); }); }); }); diff --git a/packages/rabbitmq/test/transactions.txt b/packages/rabbitmq/test/transactions.txt new file mode 100644 index 00000000000..d3f6049fee3 --- /dev/null +++ b/packages/rabbitmq/test/transactions.txt @@ -0,0 +1,10 @@ +36cf5735e290a34ed8c6aeccb4bf357c9c063c05cfb629950f150f1daa093b37,83a400838258203b82f973f7e1892d56e2afd8283b64b281af2dbc51e2308f0a26d9a41e810844008258202dfc520a7f58da6fa6518912ee61974e7d8215c7ba66125d1967bccbf225d83100825820b75bb1ab212a2419396870e75df7ba755b6f8392df88a3d4c17f2fd637ffbef10001828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f42408258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a001c0227021a00029db1031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff35840379c4308ecb304e3ce5e641c53b801853cf0c0c171562644d277c4a8d90d5419e5cfff7eabff66bb765d332ab572bbbb2c35808ea8e12c5984aeb284b2f8ac04f6 +57790e48c16556891ad6242958e25e2d7f399779a8c84e40c07ace2515960db0,83a400828258204c7f1f38917469d9cc9c471ccd4f8003bb31c16cb8debd1e45d43871bd7642cb0182582036950848d38b427a3024fcb10a8682bf9ae1c231efa866bbb3787261656532d90101828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f46288258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a001b70ed021a00029781031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff358409486e3d6cf4635af973b357b3d1fc0e3395cb6cc1c1c8c4450d8ca83029be269964ef04ba50a539071e2a7bffc7de789f605eeea8c95ef0fe24f048937623a01f6 +e2acc56de1dc3870388034a0be69850d0c6faff4989813b59de66d01f03112a7,83a400838258207743898cd34346338d74a5fd263e67fdba988fa09c97a3fcf4c731280932896300825820a9ea85e2a588709f5ddaf4c7d0e5b80774cd93482879b1a83f20ede2130fb0e800825820fbb203291f62f17549e1ceaf3b18aff6610b03dc18875236a7dcfc4f548903330101828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f4a108258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a0029bc31021a00029db1031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff35840cb82b99a8ce8751504ef6db6eb6500d58e361f496b2d4f713f13809a93d89506e39e5919c8dbf60434e053881eb77ee504a126c4b2c2fd97b811cb5135731601f6 +73b17d551ce22ca6c896d9aef88d22a172ed7c007339380935c695e3e8bc2b5e,83a4008382582036950848d38b427a3024fcb10a8682bf9ae1c231efa866bbb3787261656532d90082582049157bead84c93f78e8fb7859a89845f84a57b4381bba265da6b90fe50bcfe090082582071fb85607a1772f7795aeec4a2a888aa150f216999ef1fca7c169be39afec2840101828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f4df88258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a002622bd021a00029db1031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff358408158b2568ecc995abb9a99baa00e1e7af3411b1868e56dc6b7f9081067fa5019eef36f4bd5df65b601b539f79edf218e706ad347515ba29dec618599a93f1804f6 +abeefccaae4dbcf3df1697f70eeaf05eabf05a6f140751bd3cec78adc108d242,83a40082825820e9aa21bead45e12871bf8870275f7316b36ae5472eaa0a672590436ff14e3dee01825820f13d90bb22e53fa971054b9266eb1da025fc4ce9d502558a6c19366365729b1b0101828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f51e08258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a04496c92021a00029781031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff358407f6fb5fad10200a5cb219560d136a9ffc916f49cf7eefb7b1a66675f3f47cf28dee423dcc1dce6bf67895e76bd42337c5818f9ab69dbb5bf0fd26709a0d77407f6 +b40f7294d84d3933953e8afb3fcad5b962848118ff3dc10fcee4f15278dbcafd,83a40082825820fbb203291f62f17549e1ceaf3b18aff6610b03dc18875236a7dcfc4f5489033300825820af04538057f84d99d2e17100bccccfa1f609bf2eb37291f8ad13e39b4393c4e60101828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f55c88258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a041965a9021a00029781031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff3584019e15f91d1c34958e962a1440c13c9c0949d691ef123044851605fc89927cb22dceadc539978795c8a6c1f8efbd1c80fdf5445036d21c495bdc1e36c6aa5520df6 +86d496bdb4afd1a70f6039db53a3114a4e59b6ea64b58aeb8da5e28a770b5aa7,83a4008382582020d89027c9fcac9eeb52b78e6147cd3984f374d3090cab8437118e72544f7c6100825820625e62f9c4d35dee021fe1313dc5229e8926e0ff4477cee0ee1a04063927f03400825820c8c7152564c233d614486c63c711ade88fdb40e6aa3eb311da667c3aaee27db30101828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f59b08258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a05287c8d021a00029db1031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff35840575ebd629ac4a05cd92eb8e73968635003683a51f8d7b315e7b6651ae110815c304e819a204a4b3b23c11348537ac198f3dddbd78a061ba8d8ddaff0e081e306f6 +ab1eef06b6586a2e99127536a221b13db7e2c563df6e3b9dcf4187a600519b9c,83a40083825820c7ab9954c3ab220246ea1b26988a6b5e1230abded8d5993a1f78ce302140626800825820ed05aea9f2946df5efae196c92e6cc0b6ad577c6a87f9949370863d88a006ed000825820a1d32319ef80343501829e4b9e1104f1a879f8a0bc10b0f1bae4896da92aa5450101828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f5d988258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a0023870c021a00029db1031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff3584000c1b1f83b7869ee70010362093132e3a98915b2fcaff70fdd9be1e99353f1cdfdc71aafc0f96e039878dd02983e28075342ef2a0d8ca42d9272befc1b488b07f6 +a926c72d538ed673af49a1716ecd7f0cd624fd068bdb1d2a67bedab176121844,83a40083825820a1d32319ef80343501829e4b9e1104f1a879f8a0bc10b0f1bae4896da92aa545008258207b951ddd736917fd026bf5f9dcedd422b5d8cb68031d0cf30e45ccb9c34907da008258201cb5e247d65ab72279afab04c71ee70f5b4f41ade8f58cf8c22087b454d00cef0001828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f61808258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a001c2167021a00029db1031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff358400a75ac011c26e59af1e7697381eb08d6d60160308a02b6628559f7b4730de343df08dd91198d04a28ac82e472fc0ebc04aec5aeba1501a2640ad052ee678330cf6 +9580dbb57df0e160902a942aa4a03ec0090a44bf7c485b2a8fdb8be67127fbf7,83a400828258209997ab8b1ccbde633aaf99bea1e01354af0ab0a9e79b4c94ec27504aaa0476ae018258204b20cd1b0052f933bb0e2c0b85327b0b26754c15ca62b1da4c5e67ce2c20c1370001828258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a000f65688258390070e5d9e766c3b56015acb46dcf06304ce7c2346b5dff6f2724e394e662cd0789a22c3185e7a6070c051320feb59c7acd9444432ae66ef86f1a0016d70d021a00029781031a03a45925a100818258207cace81954e301836404d6ee2075be3c6809949b24f1cb2f823a93da3ce46ff35840bbd61c0164c84e9ad72bc3ce111446f409697e6cff539b834032382acbd9b6086feb6a0726d74eba1d14fa376ea91fd8d8b5f6d632223f3e20da192a8323260df6 diff --git a/packages/rabbitmq/test/utils.ts b/packages/rabbitmq/test/utils.ts index dbcf6c21397..80bdc6f215f 100644 --- a/packages/rabbitmq/test/utils.ts +++ b/packages/rabbitmq/test/utils.ts @@ -1,26 +1,78 @@ -import { Message, connect } from 'amqplib'; -import { RabbitMqTxSubmitProvider, TX_SUBMISSION_QUEUE } from '../src'; +import { RabbitMqTxSubmitProvider } from '../src'; +import { connect } from 'amqplib'; +import { dummyLogger } from 'ts-log'; +import { readFile } from 'fs/promises'; +import axios from 'axios'; +import path from 'path'; export const BAD_CONNECTION_URL = new URL('amqp://localhost:1234'); export const GOOD_CONNECTION_URL = new URL('amqp://localhost'); -export const enqueueFakeTx = async (data = [0, 1, 2, 3, 23]) => { - const rabbitMqTxSubmitProvider = new RabbitMqTxSubmitProvider(GOOD_CONNECTION_URL); - await rabbitMqTxSubmitProvider.submitTx(new Uint8Array(data)); - return rabbitMqTxSubmitProvider.close(); -}; +interface TestTx { + txBodyHex: string; + txBodyUint8Array: Uint8Array; + txId: string; +} + +export const txsPromise = (async () => { + const ret: TestTx[] = []; + const body = await readFile(path.join(__dirname, 'transactions.txt')); + + for (const line of body.toString().split('\n')) + if (line) { + const tokens = line.split(','); + + ret.push({ + txBodyHex: tokens[1], + txBodyUint8Array: Uint8Array.from(Buffer.from(tokens[1], 'hex')), + txId: tokens[0] + }); + } + + return ret; +})(); + +export const enqueueFakeTx = async (idx = 0, logger = dummyLogger) => + new Promise(async (resolve, reject) => { + const txs = await txsPromise; + let err: unknown; + let rabbitMqTxSubmitProvider: RabbitMqTxSubmitProvider | null = null; + + try { + rabbitMqTxSubmitProvider = new RabbitMqTxSubmitProvider({ rabbitmqUrl: GOOD_CONNECTION_URL }, { logger }); + await rabbitMqTxSubmitProvider.submitTx(txs[idx].txBodyUint8Array); + } catch (error) { + err = error; + } finally { + if (rabbitMqTxSubmitProvider) await rabbitMqTxSubmitProvider.close(); + if (err) reject(err); + else resolve(); + } + }); + +export const removeAllQueues = async () => { + const queues = await axios.get('http://guest:guest@localhost:15672/api/queues/'); + + if (queues.data.length === 0) return; -export const removeAllMessagesFromQueue = async () => { const connection = await connect(GOOD_CONNECTION_URL.toString()); const channel = await connection.createChannel(); - await channel.assertQueue(TX_SUBMISSION_QUEUE); - let message: Message | false; - do { - message = await channel.get(TX_SUBMISSION_QUEUE); - if (message) channel.ack(message); - } while (message); + for (const queue of queues.data) await channel.deleteQueue(queue.name as string); await channel.close(); await connection.close(); }; + +export const testLogger = () => { + const messages: { message: unknown[]; level: 'debug' | 'error' | 'info' | 'trace' | 'warn' }[] = []; + + return { + debug: (...message: unknown[]) => messages.push({ level: 'debug', message }), + error: (...message: unknown[]) => messages.push({ level: 'error', message }), + info: (...message: unknown[]) => messages.push({ level: 'info', message }), + messages, + trace: (...message: unknown[]) => messages.push({ level: 'trace', message }), + warn: (...message: unknown[]) => messages.push({ level: 'warn', message }) + }; +}; diff --git a/yarn.lock b/yarn.lock index 03448860765..b7a7e44c886 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3364,13 +3364,12 @@ ajv@^8.0.0, ajv@^8.0.1, ajv@^8.8.0: require-from-string "^2.0.2" uri-js "^4.2.2" -amqplib@^0.9.0: - version "0.9.1" - resolved "https://registry.yarnpkg.com/amqplib/-/amqplib-0.9.1.tgz#f2ad6e518a148a761e102bf791314fbe4cb94346" - integrity sha512-a1DP0H1LcLSMKPAnhUN2AKbVyEPqEUrUf7O+odhKGxaO+Tf0nWtuD7Zq5P9uZwZteu56OfW9EQozSCTKsAEk5w== +amqplib@^0.10.0: + version "0.10.0" + resolved "https://registry.yarnpkg.com/amqplib/-/amqplib-0.10.0.tgz#766d696f8ceae097ee9eb73e6796999e5d40a1db" + integrity sha512-UueEnRGY6upiSvGsSYM22Woa1SeSukqYtqgYW4Gj8gHvbf5BRhhYRqf3kQ8aSUYYffTOZi6SeOVW2eOXt0hpPA== dependencies: bitsyntax "~0.1.0" - bluebird "^3.7.2" buffer-more-ints "~1.0.0" readable-stream "1.x >=1.1.9" url-parse "~1.5.10" @@ -3964,11 +3963,6 @@ blake2b@2.1.3: blake2b-wasm "^1.1.0" nanoassert "^1.0.0" -bluebird@^3.7.2: - version "3.7.2" - resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.7.2.tgz#9f229c15be272454ffa973ace0dbee79a1b0c36f" - integrity sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg== - bn.js@^4.0.0, bn.js@^4.11.8, bn.js@^4.11.9: version "4.12.0" resolved "https://registry.yarnpkg.com/bn.js/-/bn.js-4.12.0.tgz#775b3f278efbb9718eec7361f483fb36fbbfea88"