Skip to content

Commit

Permalink
Fix issues with replying to RPC messages using Promises (#44)
Browse files Browse the repository at this point in the history
* Fix issues with replying to RPC messages using Promises

Issue 1: If a message is received with the replyTo field set (an RPC
message) and the onMessage function that's passed to activateConsumer()
or startConsumer() returns a Promise and the Promise isn't an ES6
Promise, a JSON version of the Promise itself may be returned in the
reply message instead of the Promise's resolved value.
https://stackoverflow.com/questions/27746304/how-do-i-tell-if-an-object-is-a-promise

Issue 2: If a message is received with the replyTo field set (an RPC
message) and the onMessage function that's passed to startConsumer()
returns a Promise, the Promise's resolved value isn't returned in the
reply message. Instead a JSON version of the entire Promise is returned.

* Rebuild transpiled library from Typescript source

Update version information in package file to match latest released version

* Allow the onMessage callback that's passed to activateConsumer() and
startConsumer() to return a promise regardless of whether or not the
replyTo field is set.

startConsumer() can automatically ack the message. If noAck == false and
manualAck == false and onMessage returns a promise, wait until the
promise is resolved before acknowledging the message.

* Rebuild transpiled library from Typescript source
  • Loading branch information
austin-beer authored and abreits committed Oct 10, 2019
1 parent fb19802 commit 0ca8250
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 96 deletions.
13 changes: 7 additions & 6 deletions lib/amqp-ts.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* methods and properties starting with '_' signify that the scope of the item should be limited to
* the inside of the enclosing namespace.
*/
/// <reference types="node" />
import * as AmqpLib from "amqplib/callback_api";
import * as Promise from "bluebird";
import * as winston from "winston";
Expand All @@ -31,8 +32,8 @@ export declare class Connection extends EventEmitter {
[id: string]: Binding;
};
constructor(url?: string, socketOptions?: any, reconnectStrategy?: Connection.ReconnectStrategy);
private rebuildConnection();
private tryToConnect(thisConnection, retry, callback);
private rebuildConnection;
private tryToConnect;
_rebuildAll(err: Error): Promise<void>;
close(): Promise<void>;
/**
Expand All @@ -48,7 +49,7 @@ export declare class Connection extends EventEmitter {
declareExchange(name: string, type?: string, options?: Exchange.DeclarationOptions): Exchange;
declareQueue(name: string, options?: Queue.DeclarationOptions): Queue;
declareTopology(topology: Connection.Topology): Promise<any>;
getConnection: AmqpLib.Connection;
readonly getConnection: AmqpLib.Connection;
}
export declare namespace Connection {
interface ReconnectStrategy {
Expand Down Expand Up @@ -99,8 +100,8 @@ export declare class Exchange {
_options: Exchange.DeclarationOptions;
_deleting: Promise<void>;
_closing: Promise<void>;
name: string;
type: string;
readonly name: string;
readonly type: string;
constructor(connection: Connection, name: string, type?: string, options?: Exchange.DeclarationOptions);
_initialize(): void;
/**
Expand Down Expand Up @@ -149,7 +150,7 @@ export declare class Queue {
_consumerStopping: boolean;
_deleting: Promise<Queue.DeleteResult>;
_closing: Promise<void>;
name: string;
readonly name: string;
constructor(connection: Connection, name: string, options?: Queue.DeclarationOptions);
_initialize(): void;
static _packMessageContent(content: any, options: any): Buffer;
Expand Down
132 changes: 64 additions & 68 deletions lib/amqp-ts.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 18 additions & 22 deletions src/amqp-ts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -941,23 +941,21 @@ export class Queue {
}
var payload = Queue._unpackMessageContent(msg);
var result = this._consumer(payload);
if (!(result instanceof Promise)) {
result = Promise.resolve(result);
}
result.then((resultValue) => {
// check if there is a reply-to
if (msg.properties.replyTo) {
var options: any = {};
resultValue = Queue._packMessageContent(resultValue, options);
this._channel.sendToQueue(msg.properties.replyTo, resultValue, options);
}

// convert the result to a promise if it isn't one already
Promise.resolve(result).then((resultValue) => {
// check if there is a reply-to
if (msg.properties.replyTo) {
var options: any = {};
resultValue = Queue._packMessageContent(resultValue, options);
this._channel.sendToQueue(msg.properties.replyTo, resultValue, options);
}
// 'hack' added to allow better manual ack control by client (less elegant, but should work)
if (this._consumerOptions.manualAck !== true && this._consumerOptions.noAck !== true) {
if (this._consumerOptions.manualAck !== true && this._consumerOptions.noAck !== true) {
this._channel.ack(msg);
}}).catch((err) => {
log.log("error", "Queue.onMessage RPC promise returned error: " + err.message, { module: "amqp-ts" });
});
}
}).catch((err) => {
log.log("error", "Queue.onMessage RPC promise returned error: " + err.message, { module: "amqp-ts" });
});
} catch (err) {
/* istanbul ignore next */
log.log("error", "Queue.onMessage consumer function returned error: " + err.message, { module: "amqp-ts" });
Expand All @@ -980,11 +978,9 @@ export class Queue {
message._message = msg;
message._channel = this._channel;
var result = this._consumer(message);
if (!(result instanceof Promise)) {
result = Promise.resolve(result);
}
result.then((resultValue) => {
// check if there is a reply-to
// convert the result to a promise if it isn't one already
Promise.resolve(result).then((resultValue) => {
// check if there is a reply-to
if (msg.properties.replyTo) {
if (!(resultValue instanceof Message)) {
resultValue = new Message(resultValue, {});
Expand All @@ -993,8 +989,8 @@ export class Queue {
this._channel.sendToQueue(msg.properties.replyTo, resultValue.content, resultValue.properties);
}
}).catch((err) => {
log.log("error", "Queue.onMessage RPC promise returned error: " + err.message, { module: "amqp-ts" });
});
log.log("error", "Queue.onMessage RPC promise returned error: " + err.message, { module: "amqp-ts" });
});
} catch (err) {
/* istanbul ignore next */
log.log("error", "Queue.onMessage consumer function returned error: " + err.message, { module: "amqp-ts" });
Expand Down

0 comments on commit 0ca8250

Please sign in to comment.