Skip to content

Commit ebbba24

Browse files
committed
Add the ability to connect without declaring shared objects
This sidesteps the problem in which you need to connect to an object (queue or exchange) that has been declared elsewhere, but with different properties than rabbit.js would use.
1 parent 5dc440f commit ebbba24

1 file changed

Lines changed: 109 additions & 54 deletions

File tree

lib/sockets.js

Lines changed: 109 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,19 @@ PubSocket.prototype.end = end;
179179

180180
PubSocket.prototype.connect = function(destination, callback) {
181181
var self = this, ch = this.ch;
182-
ch.assertExchange(destination,
183-
this.options.routing || 'fanout',
184-
{durable: false, autoDelete: false})
185-
.then(function(ok) {
186-
self.pubs.push(destination);
187-
}).then(callback);
188-
};
182+
if (this.options.noCreate) {
183+
self.pubs.push(destination);
184+
delay(callback);
185+
}
186+
else {
187+
ch.assertExchange(destination,
188+
this.options.routing || 'fanout',
189+
{durable: false, autoDelete: false})
190+
.then(function() {
191+
self.pubs.push(destination);
192+
}).then(callback);
193+
};
194+
}
189195

190196
PubSocket.prototype.publish = function(topic, chunk, encoding) {
191197
var ch = this.ch;
@@ -238,13 +244,19 @@ SubSocket.prototype.connect = function(source, topic, callback) {
238244
} else topic = topic || '';
239245

240246
var ch = this.ch, queue = this.queue, self = this;
241-
ch.assertExchange(source,
242-
this.options.routing || 'fanout',
243-
{durable: false, autoDelete: false})
244-
.then(function(ok) {
245-
return ch.bindQueue(queue, source, topic);
246-
})
247-
.then(callback);
247+
if (this.options.noCreate) {
248+
ch.bindQueue(queue, source, topic)
249+
.then(callback);
250+
}
251+
else {
252+
ch.assertExchange(source,
253+
this.options.routing || 'fanout',
254+
{durable: false, autoDelete: false})
255+
.then(function(ok) {
256+
return ch.bindQueue(queue, source, topic);
257+
})
258+
.then(callback);
259+
}
248260
};
249261

250262
// AMQP and the stream API don't really work well together here. I'm
@@ -267,10 +279,16 @@ PushSocket.prototype.end = end;
267279

268280
PushSocket.prototype.connect = function(destination, callback) {
269281
var self = this, ch = this.ch;
270-
ch.assertQueue(destination, {durable: this.options.persistent})
271-
.then(function(ok) {
272-
self.queues.push(destination);
273-
}).then(callback);
282+
if (this.options.noCreate) {
283+
self.queues.push(destination);
284+
delay(callback);
285+
}
286+
else {
287+
ch.assertQueue(destination, {durable: this.options.persistent})
288+
.then(function(ok) {
289+
self.queues.push(destination);
290+
}).then(callback);
291+
}
274292
};
275293

276294
PushSocket.prototype.write = function(chunk, encoding) {
@@ -301,15 +319,24 @@ PullSocket.prototype.connect = function(source, callback) {
301319
if (callback) delay(callback); return;
302320
}
303321

304-
ch.assertQueue(source, {durable: this.options.persistent})
305-
.then(function(ok) {
306-
return ch.consume(source, function(msg) {
307-
self.push(msg && msg.content);
308-
ch.ack(msg);
309-
}, {noAck:false}).then(function(ok) {
322+
function consume() {
323+
ch.consume(source, function(msg) {
324+
self.push(msg && msg.content);
325+
ch.ack(msg);
326+
}, {noAck:false})
327+
.then(function(ok) {
310328
self.consumers[source] = ok.consumerTag;
311-
});
312-
}).then(callback);
329+
})
330+
.then(callback);
331+
}
332+
333+
if (this.options.noCreate) {
334+
consume();
335+
}
336+
else {
337+
ch.assertQueue(source, {durable: this.options.persistent})
338+
.then(consume);
339+
}
313340
};
314341

315342
PullSocket.prototype._read = ignore;
@@ -330,15 +357,22 @@ WorkerSocket.prototype.connect = function(source, callback) {
330357
if (callback) delay(callback); return;
331358
}
332359

333-
ch.assertQueue(source, {durable: this.options.persistent})
334-
.then(function(ok) {
335-
return ch.consume(source, function(msg) {
336-
if (msg) self.unacked.push(msg);
337-
self.push(msg && msg.content);
338-
}, {noAck:false}).then(function(ok) {
339-
self.consumers[source] = ok.consumerTag;
340-
});
360+
function consume() {
361+
return ch.consume(source, function(msg) {
362+
if (msg) self.unacked.push(msg);
363+
self.push(msg && msg.content);
364+
}, {noAck:false}).then(function(ok) {
365+
self.consumers[source] = ok.consumerTag;
341366
}).then(callback);
367+
}
368+
369+
if (this.options.noCreate) {
370+
consume();
371+
}
372+
else {
373+
ch.assertQueue(source, {durable: this.options.persistent})
374+
.then(consume);
375+
}
342376
};
343377

344378
WorkerSocket.prototype.ack = function() {
@@ -417,10 +451,17 @@ ReqSocket.prototype.handleReply = function(msg) {
417451

418452
ReqSocket.prototype.connect = function(destination, callback) {
419453
var self = this, ch = this.ch;
420-
ch.assertQueue(destination, {durable: this.options.persistent})
421-
.then(function(ok) {
454+
455+
if (this.options.noCreate) {
422456
self.queues.push(ok.queue);
423-
}).then(callback);
457+
delay(callback);
458+
}
459+
else {
460+
ch.assertQueue(destination, {durable: this.options.persistent})
461+
.then(function(ok) {
462+
self.queues.push(ok.queue);
463+
}).then(callback);
464+
}
424465
};
425466

426467
ReqSocket.prototype.write = function(chunk, encoding) {
@@ -460,26 +501,33 @@ RepSocket.prototype.connect = function(source, callback) {
460501
delay(callback); return;
461502
}
462503

463-
ch.assertQueue(source, {durable: this.options.persistent})
464-
.then(function(ok) {
465-
return ch.consume(source, function(msg) {
466-
if (msg !== null) {
467-
self.requests.push(msg);
468-
self.push(msg.content);
469-
}
470-
else self.push(null);
471-
}, {noAck:false}).then(function(ok) {
472-
self.consumers[source] = ok.consumerTag;
473-
});
504+
function consume() {
505+
return ch.consume(source, function(msg) {
506+
if (msg !== null) {
507+
self.requests.push(msg);
508+
self.push(msg.content);
509+
}
510+
else self.push(null);
511+
}, {noAck:false}).then(function(ok) {
512+
self.consumers[source] = ok.consumerTag;
474513
}).then(callback);
514+
}
515+
516+
if (this.options.noCreate) {
517+
consume();
518+
}
519+
else {
520+
ch.assertQueue(source, {durable: this.options.persistent})
521+
.then(consume);
522+
}
475523
};
476524

477525
RepSocket.prototype.write = function(chunk, encoding) {
478526
var ch = this.ch, current = this.requests.shift();
479527

480528
if (!current)
481529
throw new Error('Write with no pending request');
482-
530+
483531
var replyTo = current.properties.replyTo;
484532
var cid = current.properties.correlationId;
485533
// Replies are never persistent, because the queue disappears with
@@ -533,11 +581,18 @@ TaskSocket.prototype.end = end;
533581

534582
TaskSocket.prototype.connect = function(destination, callback) {
535583
var queues = this.queues;
536-
this.ch.assertQueue(destination,
537-
{durable: this.options.persistent})
538-
.then(function(ok) {
539-
queues.push(destination);
540-
}).then(callback);
584+
585+
if (this.options.noCreate) {
586+
queues.push(destination);
587+
delay(callback);
588+
}
589+
else {
590+
this.ch.assertQueue(destination,
591+
{durable: this.options.persistent})
592+
.then(function(ok) {
593+
queues.push(destination);
594+
}).then(callback);
595+
}
541596
};
542597

543598
TaskSocket.prototype.write = function(chunk, encoding) {

0 commit comments

Comments
 (0)