Skip to content

Commit 3b4eeb4

Browse files
committed
Minor tidy up
1 parent 347c73b commit 3b4eeb4

2 files changed

Lines changed: 37 additions & 20 deletions

File tree

lib/sockets.js

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,21 @@ function bufferify(chunk, encoding) {
2222
: chunk;
2323
}
2424

25+
// To avoid errors being swallowed by the promise machinery, I provide
26+
// an error continuation that will emit the error outside the dynamic
27+
// extent of the promise.
28+
function errorLater(obj) {
29+
return function(err) {
30+
delay(function() {
31+
obj.emit('error', err);
32+
});
33+
}
34+
}
35+
2536
function Context(url) {
2637
EventEmitter.call(this);
2738
var self = this;
28-
var onError = function(e) {
29-
setImmediate(self.emit.bind(self, 'error', e));
30-
}
39+
var onError = errorLater(this);
3140
var onClose = this.emit.bind(this, 'close');
3241
var c = this._connection = amqp.connect(url);
3342
c.then(function(conn) {
@@ -101,28 +110,32 @@ function Socket(setup, options) {
101110
patch(self, ready,
102111
['close', 'write', 'end', 'connect',
103112
'setsockopt', 'ack', 'publish']);
104-
// Apply any options we've been given, in case they have immediate
105-
// effects rather than just being consulted (e.g., prefetch).
106-
ready.then(function() {
107-
for (var opt in options) {
108-
self.setsockopt(opt, options[opt]);
109-
}
110-
});
111113

112114
function closeAndInvalidate(event, err) {
113115
this.readable = this.writable = false;
114-
this.emit(event, err);
116+
setImmediate(this.emit.bind(this, event, err));
115117
}
116118

117119
var close = closeAndInvalidate.bind(this, 'close');
118120
var error = closeAndInvalidate.bind(this, 'error');
119121

122+
// Relay these events from the channel
120123
setup.then(function(ch) {
121124
ch.on('close', close);
122125
ch.on('error', error);
123126
ch.on('drain', self.emit.bind(self, 'drain'));
124127
ch.on('readable', self.emit.bind(self, 'readable'));
125128
});
129+
130+
// Apply any options we've been given, in case they have immediate
131+
// effects rather than just being consulted (e.g., prefetch).
132+
ready.then(function() {
133+
for (var opt in options) {
134+
self.setsockopt(opt, options[opt]);
135+
}
136+
}).then(null, errorLater(this));
137+
// ^ pick up any setup or setsockopt error and reraise it as an
138+
// error event
126139
}
127140

128141
function close() {
@@ -141,7 +154,7 @@ function setsockopt(opt, value) {
141154
case 'expiration':
142155
case 'persistent':
143156
case 'topic':
144-
this.options[opt] = value;
157+
this.options[opt] = value; break;
145158
}
146159
}
147160

test/tests.js

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,11 @@ suite.endAwaitsConnects = testWithContext(function(done, CTX) {
9797
var sock = CTX.socket('REP');
9898
sock.on('error', done);
9999
sock.on('close', done);
100-
sock.connect('testRepEnd');
101-
sock.connect('testRepEnd2');
102-
sock.close();
100+
sock.connect('testRepEnd', function() {
101+
sock.connect('testRepEnd2', function() {
102+
sock.close();
103+
});
104+
});
103105
});
104106

105107
suite.endWithWrite = testWithContext(function(done, CTX) {
@@ -122,7 +124,8 @@ suite.endWithWrite = testWithContext(function(done, CTX) {
122124
suite.testManySockets = testWithContext(function(done, CTX) {
123125
var WINDOW = 10;
124126
var socks = [];
125-
var types = ['PUB', 'SUB', 'PUSH', 'PULL', 'REQ', 'REP'];
127+
var types = ['PUB', 'SUB', 'PUSH', 'PULL',
128+
'REQ', 'REP', 'WORKER'];
126129
var total = WINDOW * 10;
127130
var ended = 0;
128131
function latch() {
@@ -132,9 +135,9 @@ suite.testManySockets = testWithContext(function(done, CTX) {
132135
for (var i = 0; i < total; i++) {
133136
var t = types.shift(); types.push(t);
134137
var s = CTX.socket(t);
135-
s.connect('testManySockets');
136138
s.on('close', latch);
137139
socks.push(s);
140+
s.connect('testManySockets');
138141
if (i > WINDOW) {
139142
socks.shift().close();
140143
}
@@ -145,14 +148,15 @@ suite.testManySockets = testWithContext(function(done, CTX) {
145148
suite.simplestPushPull = testWithContext(function(done, CTX) {
146149
var push = CTX.socket('PUSH');
147150
var pull = CTX.socket('PULL');
151+
var msg = randomString();
148152
pull.setEncoding('utf8');
149-
pull.on('data', function(msg) {
150-
assert.equal('foo', msg);
153+
pull.on('data', function(m) {
154+
assert.equal(msg, m);
151155
done();
152156
});
153157

154158
push.connect('testPushPull', function() {
155-
push.write('foo');
159+
push.write(msg);
156160
pull.connect('testPushPull');
157161
});
158162
});

0 commit comments

Comments
 (0)