@@ -179,13 +179,19 @@ PubSocket.prototype.end = end;
179179
180180PubSocket . prototype . connect = function ( destination , callback ) {
181181 var self = this , ch = this . ch ;
182- ch . assertExchange ( destination ,
183- this . options . routing || 'fanout' ,
184- { durable : false , autoDelete : false } )
185- . then ( function ( ok ) {
186- self . pubs . push ( destination ) ;
187- } ) . then ( callback ) ;
188- } ;
182+ if ( this . options . noCreate ) {
183+ self . pubs . push ( destination ) ;
184+ delay ( callback ) ;
185+ }
186+ else {
187+ ch . assertExchange ( destination ,
188+ this . options . routing || 'fanout' ,
189+ { durable : false , autoDelete : false } )
190+ . then ( function ( ) {
191+ self . pubs . push ( destination ) ;
192+ } ) . then ( callback ) ;
193+ } ;
194+ }
189195
190196PubSocket . prototype . publish = function ( topic , chunk , encoding ) {
191197 var ch = this . ch ;
@@ -238,13 +244,19 @@ SubSocket.prototype.connect = function(source, topic, callback) {
238244 } else topic = topic || '' ;
239245
240246 var ch = this . ch , queue = this . queue , self = this ;
241- ch . assertExchange ( source ,
242- this . options . routing || 'fanout' ,
243- { durable : false , autoDelete : false } )
244- . then ( function ( ok ) {
245- return ch . bindQueue ( queue , source , topic ) ;
246- } )
247- . then ( callback ) ;
247+ if ( this . options . noCreate ) {
248+ ch . bindQueue ( queue , source , topic )
249+ . then ( callback ) ;
250+ }
251+ else {
252+ ch . assertExchange ( source ,
253+ this . options . routing || 'fanout' ,
254+ { durable : false , autoDelete : false } )
255+ . then ( function ( ok ) {
256+ return ch . bindQueue ( queue , source , topic ) ;
257+ } )
258+ . then ( callback ) ;
259+ }
248260} ;
249261
250262// AMQP and the stream API don't really work well together here. I'm
@@ -267,10 +279,16 @@ PushSocket.prototype.end = end;
267279
268280PushSocket . prototype . connect = function ( destination , callback ) {
269281 var self = this , ch = this . ch ;
270- ch . assertQueue ( destination , { durable : this . options . persistent } )
271- . then ( function ( ok ) {
272- self . queues . push ( destination ) ;
273- } ) . then ( callback ) ;
282+ if ( this . options . noCreate ) {
283+ self . queues . push ( destination ) ;
284+ delay ( callback ) ;
285+ }
286+ else {
287+ ch . assertQueue ( destination , { durable : this . options . persistent } )
288+ . then ( function ( ok ) {
289+ self . queues . push ( destination ) ;
290+ } ) . then ( callback ) ;
291+ }
274292} ;
275293
276294PushSocket . prototype . write = function ( chunk , encoding ) {
@@ -301,15 +319,24 @@ PullSocket.prototype.connect = function(source, callback) {
301319 if ( callback ) delay ( callback ) ; return ;
302320 }
303321
304- ch . assertQueue ( source , { durable : this . options . persistent } )
305- . then ( function ( ok ) {
306- return ch . consume ( source , function ( msg ) {
307- self . push ( msg && msg . content ) ;
308- ch . ack ( msg ) ;
309- } , { noAck : false } ) . then ( function ( ok ) {
322+ function consume ( ) {
323+ ch . consume ( source , function ( msg ) {
324+ self . push ( msg && msg . content ) ;
325+ ch . ack ( msg ) ;
326+ } , { noAck : false } )
327+ . then ( function ( ok ) {
310328 self . consumers [ source ] = ok . consumerTag ;
311- } ) ;
312- } ) . then ( callback ) ;
329+ } )
330+ . then ( callback ) ;
331+ }
332+
333+ if ( this . options . noCreate ) {
334+ consume ( ) ;
335+ }
336+ else {
337+ ch . assertQueue ( source , { durable : this . options . persistent } )
338+ . then ( consume ) ;
339+ }
313340} ;
314341
315342PullSocket . prototype . _read = ignore ;
@@ -330,15 +357,22 @@ WorkerSocket.prototype.connect = function(source, callback) {
330357 if ( callback ) delay ( callback ) ; return ;
331358 }
332359
333- ch . assertQueue ( source , { durable : this . options . persistent } )
334- . then ( function ( ok ) {
335- return ch . consume ( source , function ( msg ) {
336- if ( msg ) self . unacked . push ( msg ) ;
337- self . push ( msg && msg . content ) ;
338- } , { noAck :false } ) . then ( function ( ok ) {
339- self . consumers [ source ] = ok . consumerTag ;
340- } ) ;
360+ function consume ( ) {
361+ return ch . consume ( source , function ( msg ) {
362+ if ( msg ) self . unacked . push ( msg ) ;
363+ self . push ( msg && msg . content ) ;
364+ } , { noAck :false } ) . then ( function ( ok ) {
365+ self . consumers [ source ] = ok . consumerTag ;
341366 } ) . then ( callback ) ;
367+ }
368+
369+ if ( this . options . noCreate ) {
370+ consume ( ) ;
371+ }
372+ else {
373+ ch . assertQueue ( source , { durable : this . options . persistent } )
374+ . then ( consume ) ;
375+ }
342376} ;
343377
344378WorkerSocket . prototype . ack = function ( ) {
@@ -417,10 +451,17 @@ ReqSocket.prototype.handleReply = function(msg) {
417451
418452ReqSocket . prototype . connect = function ( destination , callback ) {
419453 var self = this , ch = this . ch ;
420- ch . assertQueue ( destination , { durable : this . options . persistent } )
421- . then ( function ( ok ) {
454+
455+ if ( this . options . noCreate ) {
422456 self . queues . push ( ok . queue ) ;
423- } ) . then ( callback ) ;
457+ delay ( callback ) ;
458+ }
459+ else {
460+ ch . assertQueue ( destination , { durable : this . options . persistent } )
461+ . then ( function ( ok ) {
462+ self . queues . push ( ok . queue ) ;
463+ } ) . then ( callback ) ;
464+ }
424465} ;
425466
426467ReqSocket . prototype . write = function ( chunk , encoding ) {
@@ -460,26 +501,33 @@ RepSocket.prototype.connect = function(source, callback) {
460501 delay ( callback ) ; return ;
461502 }
462503
463- ch . assertQueue ( source , { durable : this . options . persistent } )
464- . then ( function ( ok ) {
465- return ch . consume ( source , function ( msg ) {
466- if ( msg !== null ) {
467- self . requests . push ( msg ) ;
468- self . push ( msg . content ) ;
469- }
470- else self . push ( null ) ;
471- } , { noAck :false } ) . then ( function ( ok ) {
472- self . consumers [ source ] = ok . consumerTag ;
473- } ) ;
504+ function consume ( ) {
505+ return ch . consume ( source , function ( msg ) {
506+ if ( msg !== null ) {
507+ self . requests . push ( msg ) ;
508+ self . push ( msg . content ) ;
509+ }
510+ else self . push ( null ) ;
511+ } , { noAck :false } ) . then ( function ( ok ) {
512+ self . consumers [ source ] = ok . consumerTag ;
474513 } ) . then ( callback ) ;
514+ }
515+
516+ if ( this . options . noCreate ) {
517+ consume ( ) ;
518+ }
519+ else {
520+ ch . assertQueue ( source , { durable : this . options . persistent } )
521+ . then ( consume ) ;
522+ }
475523} ;
476524
477525RepSocket . prototype . write = function ( chunk , encoding ) {
478526 var ch = this . ch , current = this . requests . shift ( ) ;
479527
480528 if ( ! current )
481529 throw new Error ( 'Write with no pending request' ) ;
482-
530+
483531 var replyTo = current . properties . replyTo ;
484532 var cid = current . properties . correlationId ;
485533 // Replies are never persistent, because the queue disappears with
@@ -533,11 +581,18 @@ TaskSocket.prototype.end = end;
533581
534582TaskSocket . prototype . connect = function ( destination , callback ) {
535583 var queues = this . queues ;
536- this . ch . assertQueue ( destination ,
537- { durable : this . options . persistent } )
538- . then ( function ( ok ) {
539- queues . push ( destination ) ;
540- } ) . then ( callback ) ;
584+
585+ if ( this . options . noCreate ) {
586+ queues . push ( destination ) ;
587+ delay ( callback ) ;
588+ }
589+ else {
590+ this . ch . assertQueue ( destination ,
591+ { durable : this . options . persistent } )
592+ . then ( function ( ok ) {
593+ queues . push ( destination ) ;
594+ } ) . then ( callback ) ;
595+ }
541596} ;
542597
543598TaskSocket . prototype . write = function ( chunk , encoding ) {
0 commit comments