@@ -22,12 +22,21 @@ function bufferify(chunk, encoding) {
2222 : chunk ;
2323}
2424
25+ // To avoid errors being swallowed by the promise machinery, I provide
26+ // an error continuation that will emit the error outside the dynamic
27+ // extent of the promise.
28+ function errorLater ( obj ) {
29+ return function ( err ) {
30+ delay ( function ( ) {
31+ obj . emit ( 'error' , err ) ;
32+ } ) ;
33+ }
34+ }
35+
2536function Context ( url ) {
2637 EventEmitter . call ( this ) ;
2738 var self = this ;
28- var onError = function ( e ) {
29- setImmediate ( self . emit . bind ( self , 'error' , e ) ) ;
30- }
39+ var onError = errorLater ( this ) ;
3140 var onClose = this . emit . bind ( this , 'close' ) ;
3241 var c = this . _connection = amqp . connect ( url ) ;
3342 c . then ( function ( conn ) {
@@ -101,28 +110,32 @@ function Socket(setup, options) {
101110 patch ( self , ready ,
102111 [ 'close' , 'write' , 'end' , 'connect' ,
103112 'setsockopt' , 'ack' , 'publish' ] ) ;
104- // Apply any options we've been given, in case they have immediate
105- // effects rather than just being consulted (e.g., prefetch).
106- ready . then ( function ( ) {
107- for ( var opt in options ) {
108- self . setsockopt ( opt , options [ opt ] ) ;
109- }
110- } ) ;
111113
112114 function closeAndInvalidate ( event , err ) {
113115 this . readable = this . writable = false ;
114- this . emit ( event , err ) ;
116+ setImmediate ( this . emit . bind ( this , event , err ) ) ;
115117 }
116118
117119 var close = closeAndInvalidate . bind ( this , 'close' ) ;
118120 var error = closeAndInvalidate . bind ( this , 'error' ) ;
119121
122+ // Relay these events from the channel
120123 setup . then ( function ( ch ) {
121124 ch . on ( 'close' , close ) ;
122125 ch . on ( 'error' , error ) ;
123126 ch . on ( 'drain' , self . emit . bind ( self , 'drain' ) ) ;
124127 ch . on ( 'readable' , self . emit . bind ( self , 'readable' ) ) ;
125128 } ) ;
129+
130+ // Apply any options we've been given, in case they have immediate
131+ // effects rather than just being consulted (e.g., prefetch).
132+ ready . then ( function ( ) {
133+ for ( var opt in options ) {
134+ self . setsockopt ( opt , options [ opt ] ) ;
135+ }
136+ } ) . then ( null , errorLater ( this ) ) ;
137+ // ^ pick up any setup or setsockopt error and reraise it as an
138+ // error event
126139}
127140
128141function close ( ) {
@@ -141,7 +154,7 @@ function setsockopt(opt, value) {
141154 case 'expiration' :
142155 case 'persistent' :
143156 case 'topic' :
144- this . options [ opt ] = value ;
157+ this . options [ opt ] = value ; break ;
145158 }
146159}
147160
0 commit comments