Skip to content

Commit bbae37a

Browse files
Added requeue and discard functionality to socket
1 parent ec6ecfe commit bbae37a

1 file changed

Lines changed: 42 additions & 1 deletion

File tree

lib/sockets.js

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ function Socket(setup, options) {
111111
var ready = setup.then(function(ch) { self.ch = ch; });
112112
patch(self, ready,
113113
['close', 'write', 'end', 'connect',
114-
'setsockopt', 'ack', 'publish']);
114+
'setsockopt', 'ack', 'requeue', 'discard', 'publish']);
115115

116116
function closeAndInvalidate(event, err) {
117117
this.readable = this.writable = false;
@@ -347,6 +347,26 @@ WorkerSocket.prototype.ack = function() {
347347
this.ch.ack(msg);
348348
};
349349

350+
351+
WorkerSocket.prototype.requeue = function() {
352+
var msg = this.unacked.shift();
353+
if (!msg) {
354+
throw new Error("requeue called with no unacknowledged messages");
355+
}
356+
this.ch.nack(msg);
357+
};
358+
359+
360+
WorkerSocket.prototype.discard = function() {
361+
var msg = this.unacked.shift();
362+
if (!msg) {
363+
throw new Error("discard called with no unacknowledged messages");
364+
}
365+
366+
this.ch.reject(msg, false);
367+
};
368+
369+
350370
WorkerSocket.prototype._read = ignore;
351371

352372

@@ -473,6 +493,27 @@ RepSocket.prototype.write = function(chunk, encoding) {
473493
return res;
474494
};
475495

496+
497+
RepSocket.prototype.requeue = function() {
498+
var ch = this.ch, current = this.requests.shift();
499+
500+
if (!current)
501+
throw new Error('Requeue with no pending request');
502+
503+
ch.nack(current);
504+
};
505+
506+
507+
RepSocket.prototype.discard = function() {
508+
var ch = this.ch, current = this.requests.shift();
509+
510+
if (!current)
511+
throw new Error('Discard with no pending request');
512+
513+
ch.reject(current, false);
514+
};
515+
516+
476517
RepSocket.prototype._read = ignore;
477518

478519
// A task socket requires you to say which kinds of task you will

0 commit comments

Comments
 (0)