Skip to content

Commit 3580798

Browse files
committed
Support basicCancel from server
Fixes #24
1 parent 92a83bb commit 3580798

2 files changed

Lines changed: 33 additions & 0 deletions

File tree

src/amqp-base-client.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,29 @@ export default abstract class AMQPBaseClient {
408408
channel.resolvePromise(consumerTag)
409409
break
410410
}
411+
case 30: { // cancel
412+
const [consumerTag, len] = view.getShortString(i); i += len
413+
const noWait = view.getUint8(i) === 1; i += 1
414+
415+
const consumer = channel.consumers.get(consumerTag)
416+
if (consumer) {
417+
consumer.setClosed(new AMQPError("Consumer cancelled by the server", this))
418+
channel.consumers.delete(consumerTag)
419+
}
420+
if (!noWait) {
421+
const frame = new AMQPView(new ArrayBuffer(512))
422+
frame.setUint8(j, 1); j += 1 // type: method
423+
frame.setUint16(j, channel.id); j += 2 // channel
424+
frame.setUint32(j, 0); j += 4 // frameSize
425+
frame.setUint16(j, 60); j += 2 // class: basic
426+
frame.setUint16(j, 31); j += 2 // method: cancelOk
427+
j += frame.setShortString(j, consumerTag) // tag
428+
frame.setUint8(j, 206); j += 1 // frame end byte
429+
frame.setUint32(3, j - 8) // update frameSize
430+
this.send(new Uint8Array(frame.buffer, 0, j))
431+
}
432+
break
433+
}
411434
case 31: { // cancelOk
412435
const [consumerTag, len] = view.getShortString(i); i += len
413436
channel.resolvePromise(consumerTag)

test/test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,3 +574,13 @@ test("will raise if socket is closed on send", async t => {
574574
if (amqp.socket) amqp.socket.destroy()
575575
await t.throwsAsync(() => conn.channel())
576576
})
577+
578+
test("can handle cancel from server", async t => {
579+
const amqp = new AMQPClient("amqp://127.0.0.1")
580+
const conn = await amqp.connect()
581+
const ch = await conn.channel()
582+
const q = await ch.queue("")
583+
const consumer = await q.subscribe({}, () => "")
584+
await q.delete()
585+
await t.throwsAsync(() => consumer.wait(), { message: /Consumer cancelled by the server/ })
586+
})

0 commit comments

Comments
 (0)