Skip to content

Commit 90d674d

Browse files
committed
refactor(rabbitmq): now the package respects all the repository standards
1 parent 6713b66 commit 90d674d

File tree

11 files changed

+629
-358
lines changed

11 files changed

+629
-358
lines changed

packages/rabbitmq/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@
3737
"@cardano-sdk/ogmios": "0.2.0",
3838
"@types/amqplib": "^0.8.2",
3939
"get-port-please": "^2.5.0",
40+
"axios": "^0.27.2",
4041
"shx": "^0.3.3",
4142
"ws": "^8.5.0"
4243
},
4344
"dependencies": {
45+
"@cardano-ogmios/schema": "5.1.0",
4446
"@cardano-sdk/core": "0.2.0",
45-
"amqplib": "^0.9.0",
47+
"amqplib": "^0.10.0",
4648
"ts-log": "^2.2.4"
4749
},
4850
"files": [
Lines changed: 153 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
/* eslint-disable @typescript-eslint/no-shadow */
22
import { Channel, Connection, Message, connect } from 'amqplib';
3+
import { ErrorDescription, serializeError } from './errorsSerialization';
34
import { Logger, dummyLogger } from 'ts-log';
45
import { ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core';
5-
import { TX_SUBMISSION_QUEUE } from './rabbitmqTxSubmitProvider';
6+
import { TX_SUBMISSION_QUEUE, txBodyToId, waitForPending } from './utils';
67

78
const moduleName = 'TxSubmitWorker';
89

910
/**
1011
* Configuration options parameters for the TxSubmitWorker
1112
*/
1213
export interface TxSubmitWorkerConfig {
14+
/**
15+
* Use the algorithm to get a dummy Tx Id from a Tx body. Used for tests
16+
*/
17+
dummyTxId?: boolean;
18+
1319
/**
1420
* Instructs the worker to process multiple transactions simultaneously.
1521
* Default: false (serial mode)
@@ -89,25 +95,34 @@ export class TxSubmitWorker {
8995
*/
9096
#dependencies: TxSubmitWorkerDependencies;
9197

92-
/**
93-
* The function to call to resolve the start method exit Promise
94-
*/
95-
#exitResolver?: () => void;
96-
9798
/**
9899
* The internal worker status
99100
*/
100101
#status: 'connected' | 'connecting' | 'error' | 'idle' = 'idle';
101102

102103
/**
103-
* @param {TxSubmitWorkerConfig} config The configuration options
104-
* @param {TxSubmitWorkerDependencies} dependencies The dependency objects
104+
* @param config The configuration options
105+
* @param dependencies The dependency objects
105106
*/
106107
constructor(config: TxSubmitWorkerConfig, dependencies: TxSubmitWorkerDependencies) {
107108
this.#config = { parallelTxs: 3, pollingCycle: 500, ...config };
108109
this.#dependencies = { logger: dummyLogger, ...dependencies };
109110
}
110111

112+
/**
113+
* The common handler for errors
114+
*
115+
* @param isAsync flag to identify asynchronous errors
116+
* @param err the error itself
117+
*/
118+
private async errorHandler(isAsync: boolean, err: unknown) {
119+
if (err) {
120+
this.logError(err, isAsync);
121+
this.#status = 'error';
122+
await this.stop();
123+
}
124+
}
125+
111126
/**
112127
* Get the status of the worker
113128
*
@@ -120,117 +135,107 @@ export class TxSubmitWorker {
120135
/**
121136
* Starts the worker
122137
*/
123-
start() {
124-
return new Promise<void>(async (resolve, reject) => {
125-
const closeHandler = async (isAsync: boolean, err: unknown) => {
126-
if (err) {
127-
this.logError(err, isAsync);
128-
this.#exitResolver = undefined;
129-
this.#status = 'error';
130-
await this.stop();
131-
reject(err);
132-
}
133-
};
134-
135-
try {
136-
this.#dependencies.logger!.info(`${moduleName} init: checking tx submission provider health status`);
138+
async start() {
139+
try {
140+
this.#dependencies.logger!.info(`${moduleName} init: checking tx submission provider health status`);
137141

138-
const { ok } = await this.#dependencies.txSubmitProvider.healthCheck();
142+
const { ok } = await this.#dependencies.txSubmitProvider.healthCheck();
139143

140-
if (!ok) throw new ProviderError(ProviderFailure.Unhealthy);
144+
if (!ok) throw new ProviderError(ProviderFailure.Unhealthy);
141145

142-
this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ connection`);
143-
this.#exitResolver = resolve;
144-
this.#status = 'connecting';
145-
this.#connection = await connect(this.#config.rabbitmqUrl.toString());
146-
this.#connection.on('close', (error) => closeHandler(true, error));
146+
this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ connection`);
147+
this.#status = 'connecting';
148+
this.#connection = await connect(this.#config.rabbitmqUrl.toString());
149+
this.#connection.on('close', (error) => this.errorHandler(true, error));
147150

148-
this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ channel`);
149-
this.#channel = await this.#connection.createChannel();
150-
this.#channel.on('close', (error) => closeHandler(true, error));
151+
this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ channel`);
152+
this.#channel = await this.#connection.createChannel();
153+
this.#channel.on('close', (error) => this.errorHandler(true, error));
151154

152-
this.#dependencies.logger!.info(`${moduleName} init: ensuring RabbitMQ queue`);
153-
await this.#channel.assertQueue(TX_SUBMISSION_QUEUE);
154-
this.#dependencies.logger!.info(`${moduleName}: init completed`);
155+
this.#dependencies.logger!.info(`${moduleName} init: ensuring RabbitMQ queue`);
156+
await this.#channel.assertQueue(TX_SUBMISSION_QUEUE);
157+
this.#dependencies.logger!.info(`${moduleName}: init completed`);
155158

156-
if (this.#config.parallel) {
157-
this.#dependencies.logger!.info(`${moduleName}: starting parallel mode`);
158-
await this.#channel.prefetch(this.#config.parallelTxs!, true);
159+
if (this.#config.parallel) {
160+
this.#dependencies.logger!.info(`${moduleName}: starting parallel mode`);
161+
await this.#channel.prefetch(this.#config.parallelTxs!, true);
159162

160-
const parallelHandler = (message: Message | null) => (message ? this.submitTx(message) : null);
161-
const { consumerTag } = await this.#channel.consume(TX_SUBMISSION_QUEUE, parallelHandler);
163+
const parallelHandler = (message: Message | null) => (message ? this.submitTx(message) : null);
162164

163-
this.#consumerTag = consumerTag;
164-
this.#status = 'connected';
165-
} else {
166-
this.#dependencies.logger!.info(`${moduleName}: starting serial mode`);
167-
await this.infiniteLoop();
168-
}
169-
} catch (error) {
170-
await closeHandler(false, error);
165+
this.#consumerTag = (await this.#channel.consume(TX_SUBMISSION_QUEUE, parallelHandler)).consumerTag;
166+
} else {
167+
this.#dependencies.logger!.info(`${moduleName}: starting serial mode`);
168+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
169+
this.infiniteLoop();
171170
}
172-
});
171+
172+
this.#status = 'connected';
173+
} catch (error) {
174+
await this.errorHandler(false, error);
175+
if (error instanceof ProviderError) throw error;
176+
throw new ProviderError(ProviderFailure.ConnectionFailure, error);
177+
}
173178
}
174179

175180
/**
176-
* Stops the worker. Once connection shutdown is completed,
177-
* the Promise returned by the start method is resolved as well
181+
* Stops the worker.
178182
*/
179183
async stop() {
180-
// This method needs to call this.#exitResolver at the end.
181-
// Since it may be called more than once simultaneously,
182-
// we need to ensure this.#exitResolver is called only once,
183-
// so we immediately store its value in a local variable and we reset it
184-
const exitResolver = this.#exitResolver;
185-
this.#exitResolver = undefined;
186-
187-
try {
188-
this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ channel`);
184+
this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ channel`);
189185

186+
// In case of parallel worker; first of all cancel the consumer
187+
if (this.#consumerTag)
190188
try {
191-
if (this.#consumerTag) {
192-
const consumerTag = this.#consumerTag;
193-
this.#consumerTag = undefined;
189+
// Let's immediately reset this.#consumerTag to be sure the cancel operation is called
190+
// only once even if the this.stop methond is called more than once
191+
const consumerTag = this.#consumerTag;
192+
this.#consumerTag = undefined;
194193

195-
await this.#channel?.cancel(consumerTag);
196-
}
194+
await this.#channel!.cancel(consumerTag);
197195
} catch (error) {
198196
this.logError(error);
199197
}
198+
// In case of serial worker; just instruct the infinite loop it can exit
199+
else this.#continueForever = false;
200200

201-
this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ connection`);
201+
// Wait for pending operations before closing
202+
await waitForPending(this.#channel);
202203

203-
try {
204-
await this.#connection?.close();
205-
} catch (error) {
206-
this.logError(error);
207-
}
204+
try {
205+
await this.#channel?.close();
206+
} catch (error) {
207+
this.logError(error);
208+
}
208209

209-
this.#dependencies.logger!.info(`${moduleName}: shutdown completed`);
210-
this.#channel = undefined;
211-
this.#connection = undefined;
212-
this.#consumerTag = undefined;
213-
this.#continueForever = false;
214-
this.#status = 'idle';
215-
} finally {
216-
// Only logging functions could throw an error here...
217-
// Although this is almost impossible, we want to be sure exitResolver is called
218-
exitResolver?.();
210+
this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ connection`);
211+
212+
try {
213+
await this.#connection?.close();
214+
} catch (error) {
215+
this.logError(error);
219216
}
217+
218+
this.#dependencies.logger!.info(`${moduleName}: shutdown completed`);
219+
this.#channel = undefined;
220+
this.#connection = undefined;
221+
this.#status = 'idle';
220222
}
221223

222224
/**
223225
* Wrapper to log errors from try/catch blocks
224226
*
225-
* @param {any} error the error to log
227+
* @param error the error to log
228+
* @param isAsync flag to set in case the error is asynchronous
229+
* @param asWarning flag to log the error with warning loglevel
226230
*/
227-
private logError(error: unknown, isAsync = false) {
231+
private logError(error: unknown, isAsync = false, asWarning = false) {
228232
const errorMessage =
229233
// eslint-disable-next-line prettier/prettier
230234
error instanceof Error ? error.message : (typeof error === 'string' ? error : JSON.stringify(error));
231235
const errorObject = { error: error instanceof Error ? error.name : 'Unknown error', isAsync, module: moduleName };
232236

233-
this.#dependencies.logger!.error(errorObject, errorMessage);
237+
if (asWarning) this.#dependencies.logger!.warn(errorObject, errorMessage);
238+
else this.#dependencies.logger!.error(errorObject, errorMessage);
234239
if (error instanceof Error) this.#dependencies.logger!.debug(`${moduleName}:`, error.stack);
235240
}
236241

@@ -239,7 +244,6 @@ export class TxSubmitWorker {
239244
*/
240245
private async infiniteLoop() {
241246
this.#continueForever = true;
242-
this.#status = 'connected';
243247

244248
while (this.#continueForever) {
245249
const message = await this.#channel?.get(TX_SUBMISSION_QUEUE);
@@ -254,29 +258,80 @@ export class TxSubmitWorker {
254258
/**
255259
* Submit a tx to the provider and ack (or nack) the message
256260
*
257-
* @param {Message} message the message containing the transaction
261+
* @param message the message containing the transaction
258262
*/
259263
private async submitTx(message: Message) {
264+
const counter = ++this.#counter;
265+
let errorDescription: ErrorDescription | null = null;
266+
let txId = '';
267+
260268
try {
261-
const counter = ++this.#counter;
262269
const { content } = message;
270+
const txBody = new Uint8Array(content);
271+
272+
// Register the handling of current transaction
273+
txId = txBodyToId(txBody, this.#config.dummyTxId);
263274

264-
this.#dependencies.logger!.info(`${moduleName}: submitting tx`);
265-
this.#dependencies.logger!.debug(`${moduleName}: tx ${counter} dump:`, content.toString('hex'));
266-
await this.#dependencies.txSubmitProvider.submitTx(new Uint8Array(content));
275+
this.#dependencies.logger!.info(`${moduleName}: submitting tx #${counter} id: ${txId}`);
276+
this.#dependencies.logger!.debug(`${moduleName}: tx #${counter} dump:`, content.toString('hex'));
277+
await this.#dependencies.txSubmitProvider.submitTx(txBody);
267278

268-
this.#dependencies.logger!.debug(`${moduleName}: ACKing RabbitMQ message ${counter}`);
279+
this.#dependencies.logger!.debug(`${moduleName}: ACKing RabbitMQ message #${counter}`);
269280
this.#channel?.ack(message);
270281
} catch (error) {
282+
errorDescription = await this.submitTxErrorHandler(error, counter, message);
283+
} finally {
284+
if (
285+
txId && // If there is no error or the error can't be retried
286+
(!errorDescription || !errorDescription.isRetriable)
287+
) {
288+
// Send the response to the original submitter
289+
try {
290+
// An empty response message means succesful submission
291+
const message = errorDescription || {};
292+
await this.#channel!.assertQueue(txId);
293+
this.logError(`${moduleName}: sending response for message #${counter}`);
294+
this.#channel!.sendToQueue(txId, Buffer.from(JSON.stringify(message)));
295+
} catch (error) {
296+
this.logError(`${moduleName}: while sending response for message #${counter}`);
297+
this.logError(error);
298+
}
299+
}
300+
}
301+
}
302+
303+
/**
304+
* The error handler of submitTx method
305+
*/
306+
private async submitTxErrorHandler(err: unknown, counter: number, message: Message) {
307+
let errorDescription: ErrorDescription;
308+
309+
try {
310+
errorDescription = serializeError(err);
311+
} catch (error) {
312+
this.#dependencies.logger!.error(`${moduleName}: serializing tx #${counter} error`);
271313
this.logError(error);
272314

273-
try {
274-
this.#dependencies.logger!.info(`${moduleName}: NACKing RabbitMQ message`);
275-
this.#channel?.nack(message);
276-
// eslint-disable-next-line no-catch-shadow
277-
} catch (error) {
278-
this.logError(error);
279-
}
315+
errorDescription = { message: (err as { message: string }).message || JSON.stringify(err), type: 'unknown' };
280316
}
317+
318+
if (errorDescription.isRetriable) this.#dependencies.logger!.warn(`${moduleName}: submitting tx #${counter}`);
319+
else this.#dependencies.logger!.error(`${moduleName}: submitting tx #${counter}`);
320+
this.logError(err, false, errorDescription.isRetriable);
321+
322+
const action = `${errorDescription.isRetriable ? 'N' : ''}ACKing RabbitMQ message #${counter}`;
323+
324+
try {
325+
this.#dependencies.logger!.info(`${moduleName}: ${action}`);
326+
// In RabbitMQ languange, NACKing a message means to ask to retry for it
327+
// We NACK only those messages which had an error which can be retried
328+
if (errorDescription.isRetriable) this.#channel?.nack(message);
329+
else this.#channel?.ack(message);
330+
} catch (error) {
331+
this.logError(`${moduleName}: while ${action}`);
332+
this.logError(error);
333+
}
334+
335+
return errorDescription;
281336
}
282337
}

0 commit comments

Comments
 (0)