Skip to content

Commit 27aa81d

Browse files
icciccirhyslbw
authored andcommitted
refactor(rabbitmq): now the package respects all the repository standards
1 parent ab1530b commit 27aa81d

File tree

12 files changed

+642
-368
lines changed

12 files changed

+642
-368
lines changed

packages/rabbitmq/package.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,15 @@
3737
"@cardano-sdk/ogmios": "0.3.0",
3838
"@types/amqplib": "^0.8.2",
3939
"get-port-please": "^2.5.0",
40+
"axios": "^0.27.2",
4041
"shx": "^0.3.3",
4142
"ws": "^8.5.0"
4243
},
4344
"dependencies": {
45+
"@cardano-ogmios/schema": "5.1.0",
4446
"@cardano-sdk/core": "0.3.0",
45-
"amqplib": "^0.9.0",
47+
"@cardano-sdk/util": "0.3.0",
48+
"amqplib": "^0.10.0",
4649
"ts-log": "^2.2.4"
4750
},
4851
"files": [
Lines changed: 144 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
/* eslint-disable @typescript-eslint/no-shadow */
2+
import { Cardano, ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core';
23
import { Channel, Connection, Message, connect } from 'amqplib';
34
import { Logger, dummyLogger } from 'ts-log';
4-
import { ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core';
5-
import { TX_SUBMISSION_QUEUE } from './rabbitmqTxSubmitProvider';
5+
import { TX_SUBMISSION_QUEUE, serializeError, waitForPending } from './utils';
66

77
const moduleName = 'TxSubmitWorker';
88

9+
type Optional<T, K extends keyof T> = Pick<Partial<T>, K> & Omit<T, K>;
10+
911
/**
1012
* Configuration options parameters for the TxSubmitWorker
1113
*/
@@ -41,7 +43,7 @@ export interface TxSubmitWorkerDependencies {
4143
/**
4244
* The logger. Default: silent
4345
*/
44-
logger?: Logger;
46+
logger: Logger;
4547

4648
/**
4749
* The provider to use to submit tx
@@ -89,25 +91,34 @@ export class TxSubmitWorker {
8991
*/
9092
#dependencies: TxSubmitWorkerDependencies;
9193

92-
/**
93-
* The function to call to resolve the start method exit Promise
94-
*/
95-
#exitResolver?: () => void;
96-
9794
/**
9895
* The internal worker status
9996
*/
10097
#status: 'connected' | 'connecting' | 'error' | 'idle' = 'idle';
10198

10299
/**
103-
* @param {TxSubmitWorkerConfig} config The configuration options
104-
* @param {TxSubmitWorkerDependencies} dependencies The dependency objects
100+
* @param config The configuration options
101+
* @param dependencies The dependency objects
105102
*/
106-
constructor(config: TxSubmitWorkerConfig, dependencies: TxSubmitWorkerDependencies) {
103+
constructor(config: TxSubmitWorkerConfig, dependencies: Optional<TxSubmitWorkerDependencies, 'logger'>) {
107104
this.#config = { parallelTxs: 3, pollingCycle: 500, ...config };
108105
this.#dependencies = { logger: dummyLogger, ...dependencies };
109106
}
110107

108+
/**
109+
* The common handler for errors
110+
*
111+
* @param isAsync flag to identify asynchronous errors
112+
* @param err the error itself
113+
*/
114+
private async errorHandler(isAsync: boolean, err: unknown) {
115+
if (err) {
116+
this.logError(err, isAsync);
117+
this.#status = 'error';
118+
await this.stop();
119+
}
120+
}
121+
111122
/**
112123
* Get the status of the worker
113124
*
@@ -120,126 +131,115 @@ export class TxSubmitWorker {
120131
/**
121132
* Starts the worker
122133
*/
123-
start() {
124-
return new Promise<void>(async (resolve, reject) => {
125-
const closeHandler = async (isAsync: boolean, err: unknown) => {
126-
if (err) {
127-
this.logError(err, isAsync);
128-
this.#exitResolver = undefined;
129-
this.#status = 'error';
130-
await this.stop();
131-
reject(err);
132-
}
133-
};
134-
135-
try {
136-
this.#dependencies.logger!.info(`${moduleName} init: checking tx submission provider health status`);
134+
async start() {
135+
try {
136+
this.#dependencies.logger.info(`${moduleName} init: checking tx submission provider health status`);
137137

138-
const { ok } = await this.#dependencies.txSubmitProvider.healthCheck();
138+
const { ok } = await this.#dependencies.txSubmitProvider.healthCheck();
139139

140-
if (!ok) throw new ProviderError(ProviderFailure.Unhealthy);
140+
if (!ok) throw new ProviderError(ProviderFailure.Unhealthy);
141141

142-
this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ connection`);
143-
this.#exitResolver = resolve;
144-
this.#status = 'connecting';
145-
this.#connection = await connect(this.#config.rabbitmqUrl.toString());
146-
this.#connection.on('close', (error) => closeHandler(true, error));
142+
this.#dependencies.logger.info(`${moduleName} init: opening RabbitMQ connection`);
143+
this.#status = 'connecting';
144+
this.#connection = await connect(this.#config.rabbitmqUrl.toString());
145+
this.#connection.on('close', (error) => this.errorHandler(true, error));
147146

148-
this.#dependencies.logger!.info(`${moduleName} init: opening RabbitMQ channel`);
149-
this.#channel = await this.#connection.createChannel();
150-
this.#channel.on('close', (error) => closeHandler(true, error));
147+
this.#dependencies.logger.info(`${moduleName} init: opening RabbitMQ channel`);
148+
this.#channel = await this.#connection.createChannel();
149+
this.#channel.on('close', (error) => this.errorHandler(true, error));
151150

152-
this.#dependencies.logger!.info(`${moduleName} init: ensuring RabbitMQ queue`);
153-
await this.#channel.assertQueue(TX_SUBMISSION_QUEUE);
154-
this.#dependencies.logger!.info(`${moduleName}: init completed`);
151+
this.#dependencies.logger.info(`${moduleName} init: ensuring RabbitMQ queue`);
152+
await this.#channel.assertQueue(TX_SUBMISSION_QUEUE);
153+
this.#dependencies.logger.info(`${moduleName}: init completed`);
155154

156-
if (this.#config.parallel) {
157-
this.#dependencies.logger!.info(`${moduleName}: starting parallel mode`);
158-
await this.#channel.prefetch(this.#config.parallelTxs!, true);
155+
if (this.#config.parallel) {
156+
this.#dependencies.logger.info(`${moduleName}: starting parallel mode`);
157+
await this.#channel.prefetch(this.#config.parallelTxs!, true);
159158

160-
const parallelHandler = (message: Message | null) => (message ? this.submitTx(message) : null);
161-
const { consumerTag } = await this.#channel.consume(TX_SUBMISSION_QUEUE, parallelHandler);
159+
const parallelHandler = (message: Message | null) => (message ? this.submitTx(message) : null);
162160

163-
this.#consumerTag = consumerTag;
164-
this.#status = 'connected';
165-
} else {
166-
this.#dependencies.logger!.info(`${moduleName}: starting serial mode`);
167-
await this.infiniteLoop();
168-
}
169-
} catch (error) {
170-
await closeHandler(false, error);
161+
this.#consumerTag = (await this.#channel.consume(TX_SUBMISSION_QUEUE, parallelHandler)).consumerTag;
162+
} else {
163+
this.#dependencies.logger.info(`${moduleName}: starting serial mode`);
164+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
165+
this.infiniteLoop();
171166
}
172-
});
167+
168+
this.#status = 'connected';
169+
} catch (error) {
170+
await this.errorHandler(false, error);
171+
if (error instanceof ProviderError) throw error;
172+
throw new ProviderError(ProviderFailure.ConnectionFailure, error);
173+
}
173174
}
174175

175176
/**
176-
* Stops the worker. Once connection shutdown is completed,
177-
* the Promise returned by the start method is resolved as well
177+
* Stops the worker.
178178
*/
179179
async stop() {
180-
// This method needs to call this.#exitResolver at the end.
181-
// Since it may be called more than once simultaneously,
182-
// we need to ensure this.#exitResolver is called only once,
183-
// so we immediately store its value in a local variable and we reset it
184-
const exitResolver = this.#exitResolver;
185-
this.#exitResolver = undefined;
186-
187-
try {
188-
this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ channel`);
180+
this.#dependencies.logger.info(`${moduleName} shutdown: closing RabbitMQ channel`);
189181

182+
// In case of parallel worker; first of all cancel the consumer
183+
if (this.#consumerTag)
190184
try {
191-
if (this.#consumerTag) {
192-
const consumerTag = this.#consumerTag;
193-
this.#consumerTag = undefined;
185+
// Let's immediately reset this.#consumerTag to be sure the cancel operation is called
186+
// only once even if the this.stop methond is called more than once
187+
const consumerTag = this.#consumerTag;
188+
this.#consumerTag = undefined;
194189

195-
await this.#channel?.cancel(consumerTag);
196-
}
190+
await this.#channel!.cancel(consumerTag);
197191
} catch (error) {
198192
this.logError(error);
199193
}
194+
// In case of serial worker; just instruct the infinite loop it can exit
195+
else this.#continueForever = false;
200196

201-
this.#dependencies.logger!.info(`${moduleName} shutdown: closing RabbitMQ connection`);
197+
// Wait for pending operations before closing
198+
await waitForPending(this.#channel);
202199

203-
try {
204-
await this.#connection?.close();
205-
} catch (error) {
206-
this.logError(error);
207-
}
200+
try {
201+
await this.#channel?.close();
202+
} catch (error) {
203+
this.logError(error);
204+
}
208205

209-
this.#dependencies.logger!.info(`${moduleName}: shutdown completed`);
210-
this.#channel = undefined;
211-
this.#connection = undefined;
212-
this.#consumerTag = undefined;
213-
this.#continueForever = false;
214-
this.#status = 'idle';
215-
} finally {
216-
// Only logging functions could throw an error here...
217-
// Although this is almost impossible, we want to be sure exitResolver is called
218-
exitResolver?.();
206+
this.#dependencies.logger.info(`${moduleName} shutdown: closing RabbitMQ connection`);
207+
208+
try {
209+
await this.#connection?.close();
210+
} catch (error) {
211+
this.logError(error);
219212
}
213+
214+
this.#dependencies.logger.info(`${moduleName}: shutdown completed`);
215+
this.#channel = undefined;
216+
this.#connection = undefined;
217+
this.#status = 'idle';
220218
}
221219

222220
/**
223221
* Wrapper to log errors from try/catch blocks
224222
*
225-
* @param {any} error the error to log
223+
* @param error the error to log
224+
* @param isAsync flag to set in case the error is asynchronous
225+
* @param asWarning flag to log the error with warning loglevel
226226
*/
227-
private logError(error: unknown, isAsync = false) {
227+
private logError(error: unknown, isAsync = false, asWarning = false) {
228228
const errorMessage =
229229
// eslint-disable-next-line prettier/prettier
230230
error instanceof Error ? error.message : (typeof error === 'string' ? error : JSON.stringify(error));
231231
const errorObject = { error: error instanceof Error ? error.name : 'Unknown error', isAsync, module: moduleName };
232232

233-
this.#dependencies.logger!.error(errorObject, errorMessage);
234-
if (error instanceof Error) this.#dependencies.logger!.debug(`${moduleName}:`, error.stack);
233+
if (asWarning) this.#dependencies.logger.warn(errorObject, errorMessage);
234+
else this.#dependencies.logger.error(errorObject, errorMessage);
235+
if (error instanceof Error) this.#dependencies.logger.debug(`${moduleName}:`, error.stack);
235236
}
236237

237238
/**
238239
* The infinite loop to perform serial tx submission
239240
*/
240241
private async infiniteLoop() {
241242
this.#continueForever = true;
242-
this.#status = 'connected';
243243

244244
while (this.#continueForever) {
245245
const message = await this.#channel?.get(TX_SUBMISSION_QUEUE);
@@ -254,29 +254,70 @@ export class TxSubmitWorker {
254254
/**
255255
* Submit a tx to the provider and ack (or nack) the message
256256
*
257-
* @param {Message} message the message containing the transaction
257+
* @param message the message containing the transaction
258258
*/
259259
private async submitTx(message: Message) {
260+
const counter = ++this.#counter;
261+
let isRetriable = false;
262+
let serializableError: unknown;
263+
let txId = '';
264+
260265
try {
261-
const counter = ++this.#counter;
262266
const { content } = message;
267+
const txBody = new Uint8Array(content);
268+
269+
// Register the handling of current transaction
270+
txId = Cardano.util.deserializeTx(txBody).id.toString();
263271

264-
this.#dependencies.logger!.info(`${moduleName}: submitting tx`);
265-
this.#dependencies.logger!.debug(`${moduleName}: tx ${counter} dump:`, content.toString('hex'));
266-
await this.#dependencies.txSubmitProvider.submitTx(new Uint8Array(content));
272+
this.#dependencies.logger.info(`${moduleName}: submitting tx #${counter} id: ${txId}`);
273+
this.#dependencies.logger.debug(`${moduleName}: tx #${counter} dump:`, content.toString('hex'));
274+
await this.#dependencies.txSubmitProvider.submitTx(txBody);
267275

268-
this.#dependencies.logger!.debug(`${moduleName}: ACKing RabbitMQ message ${counter}`);
276+
this.#dependencies.logger.debug(`${moduleName}: ACKing RabbitMQ message #${counter}`);
269277
this.#channel?.ack(message);
270278
} catch (error) {
271-
this.logError(error);
272-
273-
try {
274-
this.#dependencies.logger!.info(`${moduleName}: NACKing RabbitMQ message`);
275-
this.#channel?.nack(message);
276-
// eslint-disable-next-line no-catch-shadow
277-
} catch (error) {
278-
this.logError(error);
279+
({ isRetriable, serializableError } = await this.submitTxErrorHandler(error, counter, message));
280+
} finally {
281+
// If there is no error or the error can't be retried
282+
if (!serializableError || !isRetriable) {
283+
// Send the response to the original submitter
284+
try {
285+
// An empty response message means succesful submission
286+
const message = serializableError || {};
287+
await this.#channel!.assertQueue(txId);
288+
this.logError(`${moduleName}: sending response for message #${counter}`);
289+
this.#channel!.sendToQueue(txId, Buffer.from(JSON.stringify(message)));
290+
} catch (error) {
291+
this.logError(`${moduleName}: while sending response for message #${counter}`);
292+
this.logError(error);
293+
}
279294
}
280295
}
281296
}
297+
298+
/**
299+
* The error handler of submitTx method
300+
*/
301+
private async submitTxErrorHandler(err: unknown, counter: number, message: Message) {
302+
const { isRetriable, serializableError } = serializeError(err);
303+
304+
if (isRetriable) this.#dependencies.logger.warn(`${moduleName}: submitting tx #${counter}`);
305+
else this.#dependencies.logger.error(`${moduleName}: submitting tx #${counter}`);
306+
this.logError(err, false, isRetriable);
307+
308+
const action = `${isRetriable ? 'N' : ''}ACKing RabbitMQ message #${counter}`;
309+
310+
try {
311+
this.#dependencies.logger.info(`${moduleName}: ${action}`);
312+
// In RabbitMQ languange, NACKing a message means to ask to retry for it
313+
// We NACK only those messages which had an error which can be retried
314+
if (isRetriable) this.#channel?.nack(message);
315+
else this.#channel?.ack(message);
316+
} catch (error) {
317+
this.logError(`${moduleName}: while ${action}`);
318+
this.logError(error);
319+
}
320+
321+
return { isRetriable, serializableError };
322+
}
282323
}

packages/rabbitmq/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from './TxSubmitWorker';
22
export * from './rabbitmqTxSubmitProvider';
3+
export * from './utils';

0 commit comments

Comments
 (0)