Skip to content

Commit

Permalink
bind callback
Browse files Browse the repository at this point in the history
Merge https://github.com/khrome/node-amqp

Conflicts:
	amqp.js
  • Loading branch information
postwait committed May 17, 2012
2 parents 54a5582 + 98c54c8 commit 05e42ae
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 3 deletions.
22 changes: 19 additions & 3 deletions amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -1551,22 +1551,32 @@ Queue.prototype.shift = function () {
};


Queue.prototype.bind = function (/* [exchange,] routingKey */) {
Queue.prototype.bind = function (/* [exchange,] routingKey [, bindCallback] */) {
var self = this;

// The first argument, exchange is optional.
// If not supplied the connection will use the 'amq.topic'
// exchange.

var exchange, routingKey;

var exchange, routingKey, callback;
if(typeof(arguments[arguments.length-1]) == 'function'){
callback = arguments[arguments.length-1];
}
// Remove callback from args so rest of bind functionality works as before
// Also, defend against cases where a non function callback has been passed as 3rd param
if (callback || arguments.length == 3) {
delete arguments[arguments.length-1];
arguments.length--;
}

if (arguments.length == 2) {
exchange = arguments[0];
routingKey = arguments[1];
} else {
exchange = 'amq.topic';
routingKey = arguments[0];
}
if(callback) this._bindCallback = callback;


var exchangeName = exchange instanceof Exchange ? exchange.name : exchange;
Expand Down Expand Up @@ -1709,6 +1719,12 @@ Queue.prototype._onMethod = function (channel, method, args) {
break;

case methods.queueBindOk:
if (this._bindCallback) {
// setting this._bindCallback to null before calling the callback allows for a subsequent bind within the callback
var cb = this._bindCallback;
this._bindCallback = null;
cb(this);
}
break;

case methods.basicQosOk:
Expand Down
30 changes: 30 additions & 0 deletions test/test-queue-bind-callbacks-cascaded.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
require('./harness');
var testName = __filename.replace(__dirname+'/','').replace('.js','');

connection.addListener('ready', function () {
puts("connected to " + connection.serverProperties.product);
var callbacksCalled = 0;

connection.exchange('node.'+testName+'.exchange', {type: 'topic'}, function(exchange) {
connection.queue( 'node.'+testName+'.queue', { durable: false, autoDelete : true }, function (queue) {
puts("Queue ready");

// main test for callback
queue.bind(exchange, 'node.'+testName+'.topic.bindCallback.outer', function(q) {
puts("First queue bind callback called");
callbacksCalled++;
q.bind(exchange, 'node.'+testName+'.topic.bindCallback.inner', function() {
puts("Second queue bind callback called");
callbacksCalled++;
});
});

setTimeout(function() {
assert.ok(callbacksCalled == 2, "Callback was not called");
puts("Cascaded queue bind callback succeeded");
connection.destroy();},
2000);
});
});
});

30 changes: 30 additions & 0 deletions test/test-queue-bind-callbacks-sequential.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
require('./harness');
var testName = __filename.replace(__dirname+'/','').replace('.js','');

connection.addListener('ready', function () {
puts("connected to " + connection.serverProperties.product);

connection.exchange('node.'+testName+'.exchange', {type: 'topic'}, function(exchange) {
connection.queue( 'node.'+testName+'.queue', { durable: false, autoDelete : true }, function (queue) {
puts("Queue ready");

// main test for sequential callback issue
queue.bind( exchange, 'node.'+testName+'.topic.bindCallback1', function() {
puts("bind callback called");
assert.ok(false, "This callback should not be called unless the sequential bind callback issue has been fixed");}
);
queue.bind( exchange, 'node.'+testName+'.topic.bindCallback2', function() {
puts("bind callback called");
assert.ok(false, "This callback should not be called unless the sequential bind callback issue has been fixed");}
);
queue.bind( exchange, 'node.'+testName+'.topic.bindCallback2', function() {
puts("bind callback called");
assert.ok(true, "This callback should have be called, as the last of the sequential callbacks");}
);

});

setTimeout(function() { connection.destroy();}, 2000);
});
});

33 changes: 33 additions & 0 deletions test/test-queue-bind-callbacks-single.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require('./harness');
var testName = __filename.replace(__dirname+'/','').replace('.js','');
connection.addListener('ready', function () {
puts("connected to " + connection.serverProperties.product);
var callbackCalled = false;

connection.exchange('node.'+testName+'.exchange', {type: 'topic'}, function(exchange) {
connection.queue( 'node.'+testName+'.queue', { durable: false, autoDelete : true }, function (queue) {
puts("Queue ready");

// main test for callback
queue.bind(exchange, 'node.'+testName+'.topic.bindCallback', function() {
puts("Single queue bind callback called");
callbackCalled = true;
});

// nothing to be asserted / checked with these, other than they don't blow up.
queue.bind(exchange, 'node.'+testName+'.topic.nullCallback', null);
queue.bind(exchange, 'node.'+testName+'.topic.undefinedCallback', undefined);
queue.bind(exchange, 'node.'+testName+'.topic.nonFunctionCallback', "Not a callback");

// Regression test for no callback being supplied not blowing up
queue.bind(exchange, 'node.'+testName+'.topic.noCallback');

setTimeout(function() {
assert.ok(callbackCalled, "Callback was not called");
puts("Single queue bind callback succeeded");
connection.destroy();},
2000);
});
});
});

0 comments on commit 05e42ae

Please sign in to comment.