Skip to content

Commit

Permalink
fixup! refactor(rabbitmq): now the package respects all the repositor…
Browse files Browse the repository at this point in the history
…y standards
  • Loading branch information
iccicci committed Jun 17, 2022
1 parent d31d060 commit fa40887
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 83 deletions.
14 changes: 7 additions & 7 deletions packages/rabbitmq/src/TxSubmitWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { Channel, Connection, Message, connect } from 'amqplib';
import { Logger, dummyLogger } from 'ts-log';
import { ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core';
import { SerializedError, TX_SUBMISSION_QUEUE, serializeError, txBodyToId, waitForPending } from './utils';
import { TX_SUBMISSION_QUEUE, serializeError, txBodyToId, waitForPending } from './utils';

const moduleName = 'TxSubmitWorker';

Expand Down Expand Up @@ -262,7 +262,7 @@ export class TxSubmitWorker {
private async submitTx(message: Message) {
const counter = ++this.#counter;
let isRetriable = false;
let serializedError: SerializedError | null = null;
let serializableError: unknown;
let txId = '';

try {
Expand All @@ -279,14 +279,14 @@ export class TxSubmitWorker {
this.#dependencies.logger!.debug(`${moduleName}: ACKing RabbitMQ message #${counter}`);
this.#channel?.ack(message);
} catch (error) {
({ isRetriable, serializedError } = await this.submitTxErrorHandler(error, counter, message));
({ isRetriable, serializableError } = await this.submitTxErrorHandler(error, counter, message));
} finally {
// If there is no error or the error can't be retried
if (!serializedError || !isRetriable) {
if (!serializableError || !isRetriable) {
// Send the response to the original submitter
try {
// An empty response message means succesful submission
const message = serializedError || {};
const message = serializableError || {};
await this.#channel!.assertQueue(txId);
this.logError(`${moduleName}: sending response for message #${counter}`);
this.#channel!.sendToQueue(txId, Buffer.from(JSON.stringify(message)));
Expand All @@ -302,7 +302,7 @@ export class TxSubmitWorker {
* The error handler of submitTx method
*/
private async submitTxErrorHandler(err: unknown, counter: number, message: Message) {
const { isRetriable, serializedError } = serializeError(err);
const { isRetriable, serializableError } = serializeError(err);

if (isRetriable) this.#dependencies.logger!.warn(`${moduleName}: submitting tx #${counter}`);
else this.#dependencies.logger!.error(`${moduleName}: submitting tx #${counter}`);
Expand All @@ -321,6 +321,6 @@ export class TxSubmitWorker {
this.logError(error);
}

return { isRetriable, serializedError };
return { isRetriable, serializableError };
}
}
7 changes: 4 additions & 3 deletions packages/rabbitmq/src/rabbitmqTxSubmitProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { Buffer } from 'buffer';
import { Cardano, HealthCheckResponse, ProviderError, ProviderFailure, TxSubmitProvider } from '@cardano-sdk/core';
import { Channel, Connection, connect } from 'amqplib';
import { Logger, dummyLogger } from 'ts-log';
import { SerializedError, TX_SUBMISSION_QUEUE, deserializeError, txBodyToId, waitForPending } from './utils';
import { TX_SUBMISSION_QUEUE, getErrorPrototype, txBodyToId, waitForPending } from './utils';
import { fromSerializableObject } from '@cardano-sdk/util';

const moduleName = 'RabbitMqTxSubmitProvider';

Expand Down Expand Up @@ -199,12 +200,12 @@ export class RabbitMqTxSubmitProvider implements TxSubmitProvider {
await this.#channel!.deleteQueue(txId);
this.#dependencies.logger!.debug(`${moduleName}: deleted queue: ${txId}`);

const result = JSON.parse(message.content.toString()) as SerializedError;
const result = JSON.parse(message.content.toString());

// An empty result message means submission ok
if (Object.keys(result).length === 0) return done();

done(deserializeError(result));
done(fromSerializableObject(result, { getErrorPrototype }));
} catch (error) {
this.#dependencies.logger!.error(`${moduleName}: while handling response message: ${txId}`);
this.#dependencies.logger!.error(error);
Expand Down
94 changes: 21 additions & 73 deletions packages/rabbitmq/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,59 +1,29 @@
import { CSL, Cardano, cslToCore } from '@cardano-sdk/core';
import { OutsideOfValidityInterval } from '@cardano-ogmios/schema';
import { fromSerializableObject, toSerializableObject } from '@cardano-sdk/util';

export interface DeserializedError {
innerError?: {
name: string;
message: string;
};
name: string;
message: string;
}

export interface SerializedError {
value: {
innerError?: {
name: string;
message: string;
stack?: string;
};
name: string;
message: string;
stack?: string;
};
}
import { toSerializableObject } from '@cardano-sdk/util';

export const TX_SUBMISSION_QUEUE = 'cardano-tx-submit';

/**
* Enriches an error with the right prototype
* Analyzes a serializable error to get the right prototype object
*
* @param err the error to enrich
* @param error the error to analyze
* @returns the right prototype for the error
*/
const addPrototypeToError = (err: DeserializedError) => {
const { name, innerError } = err;
let prototype: object | null = Error.prototype;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const getErrorPrototype = (error: unknown) => {
if (typeof error === 'object') {
const rawError = error as Cardano.TxSubmissionError;

if (name in Cardano.TxSubmissionErrors)
prototype = Cardano.TxSubmissionErrors[name as keyof typeof Cardano.TxSubmissionErrors].prototype;

Object.setPrototypeOf(err, prototype);

if (innerError) addPrototypeToError(innerError);
};
if (typeof rawError.name === 'string' && typeof rawError.message === 'string') {
const txSubmissionErrorName = rawError.name as keyof typeof Cardano.TxSubmissionErrors;
const ErrorClass = Cardano.TxSubmissionErrors[txSubmissionErrorName];

/**
* Deserializes a serialized error
*
* @param serializedError the error to deserialize
*/
export const deserializeError = (serializedError: SerializedError): unknown => {
const ret = fromSerializableObject(serializedError) as DeserializedError;

addPrototypeToError(ret);
if (ErrorClass) return ErrorClass.prototype;
}
}

return ret;
return Error.prototype;
};

/**
Expand All @@ -63,38 +33,16 @@ export const deserializeError = (serializedError: SerializedError): unknown => {
*/
export const serializeError = (err: unknown) => {
let isRetriable = false;
let serializedError: SerializedError | null = null;

const lastChance = () => {
try {
serializedError = { value: { message: JSON.stringify(err), name: 'Error' } };
} catch {
serializedError = { value: { message: 'Unknown', name: 'Error' } };
}
};

try {
serializedError = toSerializableObject(err) as SerializedError;
} catch {
lastChance();
}

if (!serializedError!.value) lastChance();
const serializableError = toSerializableObject(err);

delete serializedError!.value.stack;
if (serializedError!.value.innerError) delete serializedError!.value.innerError.stack;
if (err instanceof Cardano.TxSubmissionErrors.OutsideOfValidityIntervalError) {
const details = JSON.parse(err.message) as OutsideOfValidityInterval['outsideOfValidityInterval'];

if (serializedError!.value.name === 'OutsideOfValidityIntervalError')
try {
const details = JSON.parse(
serializedError!.value.message
) as OutsideOfValidityInterval['outsideOfValidityInterval'];

if (details.interval.invalidBefore && details.currentSlot <= details.interval.invalidBefore) isRetriable = true;
// eslint-disable-next-line no-empty
} catch {}
if (details.interval.invalidBefore && details.currentSlot <= details.interval.invalidBefore) isRetriable = true;
}

return { isRetriable, serializedError };
return { isRetriable, serializableError };
};

export const txBodyToId = (txBody: Buffer | Uint8Array, dummy?: boolean) => {
Expand Down

0 comments on commit fa40887

Please sign in to comment.