Skip to content

Commit 9b68965

Browse files
committed
Provide a 3-arg form of connect instead of subscribe
`subscribe` is a breaking change, and I don't see any particular advantage to it over having another form of `connect`.
1 parent 08420ab commit 9b68965

2 files changed

Lines changed: 26 additions & 19 deletions

File tree

lib/sockets.js

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,7 @@ PubSocket.prototype.connect = function(destination, callback) {
161161
var self = this, ch = this.ch;
162162
ch.assertExchange(destination,
163163
this.options.routing || 'fanout',
164-
{durable: false,
165-
autoDelete: true})
164+
{durable: false, autoDelete: false})
166165
.then(function(ok) {
167166
self.pubs.push(destination);
168167
}).then(callback || ignore);
@@ -216,29 +215,22 @@ function SubSocket(channel, opts) {
216215
inherits(SubSocket, Readable);
217216
addSocketMethods(SubSocket);
218217

219-
SubSocket.prototype.connect = function(source, callback) {
218+
SubSocket.prototype.connect = function(source, topic, callback) {
219+
// Support the general form of connect
220+
if (callback === undefined && typeof topic === 'function') {
221+
callback = topic;
222+
topic = '';
223+
}
220224
var ch = this.ch, queue = this.queue, self = this;
221225
ch.assertExchange(source,
222226
this.options.routing || 'fanout',
223-
{autoDelete: true})
227+
{durable: false, autoDelete: false})
224228
.then(function(ok) {
225-
self.subs.push(source);
226-
self.patterns.forEach(function(pat) {
227-
ch.bindQueue(queue, source, pat);
228-
});
229+
return ch.bindQueue(queue, source, topic);
229230
})
230231
.then(callback || ignore);
231232
};
232233

233-
SubSocket.prototype.subscribe = function(pattern) {
234-
var ch = this.ch; queue = this.queue;
235-
pattern = pattern || '';
236-
this.subs.forEach(function(sub) {
237-
ch.bindQueue(queue, sub, pattern);
238-
});
239-
this.patterns.push(pattern);
240-
};
241-
242234
// AMQP and the stream API don't really work well together here. I'm
243235
// supposed to initiate reads when this method is called, then not
244236
// push any more once I get `false` back from `#push`; but how do I do

test/tests.js

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,29 @@ suite.simplestPubSub = testWithContext(function(done, CTX) {
167167
});
168168

169169
sub.connect('testPubSub', function() {
170-
sub.subscribe();
171170
pub.connect('testPubSub', function() {
172171
pub.write('foo');
173172
});
174173
});
175174
});
176175

176+
suite.topicPubSub = testWithContext(function(done, CTX) {
177+
var pub = CTX.socket('PUB', {routing: 'topic'});
178+
var sub = CTX.socket('SUB', {routing: 'topic'});
179+
sub.setEncoding('utf8');
180+
var content = randomString();
181+
sub.on('data', function(msg) {
182+
try { assert.equal(content, msg); done(); }
183+
catch (e) { done(e); }
184+
});
185+
186+
sub.connect('testTopicPubSub', 'foo.*', function() {
187+
pub.connect('testTopicPubSub', function() {
188+
pub.publish('foo.bar', content, 'utf8');
189+
});
190+
});
191+
});
192+
177193
suite.simplestWorker = testWithContext(function(done, CTX) {
178194
var work = CTX.socket('WORKER');
179195
work.setEncoding('utf8');
@@ -285,7 +301,6 @@ suite.allSubs = testWithContext(function(done, CTX) {
285301
if (latch === 0) done();
286302
});
287303
sub.connect('testMultiSub', function() {
288-
sub.subscribe();
289304
doSub(i+1);
290305
});
291306
}

0 commit comments

Comments
 (0)