Skip to content

Commit 8afbe78

Browse files
committed
A punt at a "task socket"
This is an implementation of my example code in issue 57.
1 parent 782c583 commit 8afbe78

1 file changed

Lines changed: 38 additions & 0 deletions

File tree

lib/sockets.js

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ var SOCKETS = {
5858
REQUEST: ReqSocket,
5959
REP: RepSocket,
6060
REPLY: RepSocket,
61+
TASK: TaskSocket,
6162
WORKER: WorkerSocket
6263
};
6364

@@ -154,6 +155,7 @@ function setsockopt(opt, value) {
154155
case 'expiration':
155156
case 'persistent':
156157
case 'topic':
158+
case 'task':
157159
this.options[opt] = value; break;
158160
}
159161
}
@@ -471,3 +473,39 @@ RepSocket.prototype.write = function(chunk, encoding) {
471473
};
472474

473475
RepSocket.prototype._read = ignore;
476+
477+
// A task socket requires you to say which kinds of task you will
478+
// send, then lets you send to any one of them. This is like a PUSH
479+
// socket but instead of it round-robining among its connections, you
480+
// pick one each time (or set it for a few messages with a sockopt).
481+
function TaskSocket(channel, opts) {
482+
Writable.call(this);
483+
Socket.call(this, channel, opts);
484+
this.queues = [];
485+
}
486+
inherits(TaskSocket, Writable);
487+
addSocketMethods(TaskSocket);
488+
TaskSocket.prototype.end = end;
489+
490+
TaskSocket.prototype.connect = function(destination, callback) {
491+
var queues = this.queues;
492+
this.ch.assertQueue(destination,
493+
{durable: this.options.persistent})
494+
.then(function(ok) {
495+
queues.push(destination);
496+
}).then(callback || ignore);
497+
};
498+
499+
TaskSocket.prototype.write = function(chunk, encoding) {
500+
return this.post(false, chunk, encoding);
501+
};
502+
503+
TaskSocket.prototype.post = function(task, chunk, encoding) {
504+
if (!task) task = this.options.task;
505+
if (this.queues.indexOf(task) === -1)
506+
throw new Error('Task "' + task + '" not connected');
507+
var options = {expiration: this.options.expiration,
508+
persistent: this.options.persistent};
509+
return this.ch.sendToQueue(task, bufferify(chunk, encoding),
510+
options);
511+
};

0 commit comments

Comments
 (0)