@@ -111,7 +111,7 @@ function Socket(setup, options) {
111111 var ready = setup . then ( function ( ch ) { self . ch = ch ; } ) ;
112112 patch ( self , ready ,
113113 [ 'close' , 'write' , 'end' , 'connect' ,
114- 'setsockopt' , 'ack' , 'publish' ] ) ;
114+ 'setsockopt' , 'ack' , 'requeue' , 'discard' , ' publish'] ) ;
115115
116116 function closeAndInvalidate ( event , err ) {
117117 this . readable = this . writable = false ;
@@ -347,6 +347,26 @@ WorkerSocket.prototype.ack = function() {
347347 this . ch . ack ( msg ) ;
348348} ;
349349
350+
351+ WorkerSocket . prototype . requeue = function ( ) {
352+ var msg = this . unacked . shift ( ) ;
353+ if ( ! msg ) {
354+ throw new Error ( "requeue called with no unacknowledged messages" ) ;
355+ }
356+ this . ch . nack ( msg ) ;
357+ } ;
358+
359+
360+ WorkerSocket . prototype . discard = function ( ) {
361+ var msg = this . unacked . shift ( ) ;
362+ if ( ! msg ) {
363+ throw new Error ( "discard called with no unacknowledged messages" ) ;
364+ }
365+
366+ this . ch . reject ( msg , false ) ;
367+ } ;
368+
369+
350370WorkerSocket . prototype . _read = ignore ;
351371
352372
@@ -473,6 +493,27 @@ RepSocket.prototype.write = function(chunk, encoding) {
473493 return res ;
474494} ;
475495
496+
497+ RepSocket . prototype . requeue = function ( ) {
498+ var ch = this . ch , current = this . requests . shift ( ) ;
499+
500+ if ( ! current )
501+ throw new Error ( 'Requeue with no pending request' ) ;
502+
503+ ch . nack ( current ) ;
504+ } ;
505+
506+
507+ RepSocket . prototype . discard = function ( ) {
508+ var ch = this . ch , current = this . requests . shift ( ) ;
509+
510+ if ( ! current )
511+ throw new Error ( 'Discard with no pending request' ) ;
512+
513+ ch . reject ( current , false ) ;
514+ } ;
515+
516+
476517RepSocket . prototype . _read = ignore ;
477518
478519// A task socket requires you to say which kinds of task you will
0 commit comments