@@ -58,6 +58,7 @@ var SOCKETS = {
5858 REQUEST : ReqSocket ,
5959 REP : RepSocket ,
6060 REPLY : RepSocket ,
61+ TASK : TaskSocket ,
6162 WORKER : WorkerSocket
6263} ;
6364
@@ -154,6 +155,7 @@ function setsockopt(opt, value) {
154155 case 'expiration' :
155156 case 'persistent' :
156157 case 'topic' :
158+ case 'task' :
157159 this . options [ opt ] = value ; break ;
158160 }
159161}
@@ -471,3 +473,39 @@ RepSocket.prototype.write = function(chunk, encoding) {
471473} ;
472474
473475RepSocket . prototype . _read = ignore ;
476+
477+ // A task socket requires you to say which kinds of task you will
478+ // send, then lets you send to any one of them. This is like a PUSH
479+ // socket but instead of it round-robining among its connections, you
480+ // pick one each time (or set it for a few messages with a sockopt).
481+ function TaskSocket ( channel , opts ) {
482+ Writable . call ( this ) ;
483+ Socket . call ( this , channel , opts ) ;
484+ this . queues = [ ] ;
485+ }
486+ inherits ( TaskSocket , Writable ) ;
487+ addSocketMethods ( TaskSocket ) ;
488+ TaskSocket . prototype . end = end ;
489+
490+ TaskSocket . prototype . connect = function ( destination , callback ) {
491+ var queues = this . queues ;
492+ this . ch . assertQueue ( destination ,
493+ { durable : this . options . persistent } )
494+ . then ( function ( ok ) {
495+ queues . push ( destination ) ;
496+ } ) . then ( callback || ignore ) ;
497+ } ;
498+
499+ TaskSocket . prototype . write = function ( chunk , encoding ) {
500+ return this . post ( false , chunk , encoding ) ;
501+ } ;
502+
503+ TaskSocket . prototype . post = function ( task , chunk , encoding ) {
504+ if ( ! task ) task = this . options . task ;
505+ if ( this . queues . indexOf ( task ) === - 1 )
506+ throw new Error ( 'Task "' + task + '" not connected' ) ;
507+ var options = { expiration : this . options . expiration ,
508+ persistent : this . options . persistent } ;
509+ return this . ch . sendToQueue ( task , bufferify ( chunk , encoding ) ,
510+ options ) ;
511+ } ;
0 commit comments