Skip to content

Commit eb6e76b

Browse files
committed
feat: auto-ack and idempotent ack in AMQPQueue.subscribe()
Callback form: wraps the user callback to ack on return and nack+requeue on throw. Pass `{ noAck: true }` to opt out; `requeueOnNack: false` to discard on error instead of requeuing. Iterator form: acks the previous message when the loop advances to the next. The last message (after `break`) is left unacked. Call `msg.ack()` / `msg.nack()` before advancing to override. AMQPMessage.ack/nack/reject are now idempotent — calling them more than once is safe and sends only one wire frame. This lets callback and library both ack without coordinating. AMQPConsumer.cancel() is also idempotent via a memoised promise, preventing a double-basicCancel race when generator cleanup and sub.cancel() overlap.
1 parent f03013a commit eb6e76b

8 files changed

Lines changed: 272 additions & 33 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919
- `session.closed``true` when the underlying connection is closed
2020
- `session.stop()` — cancels reconnection, clears all subscriptions, and closes the connection
2121
- `AMQPQueue` — reconnect-safe queue handle returned by `session.queue()`, with `publish()`, `subscribe()`, `get()`, `bind()`, `unbind()`, `purge()`, `delete()` ([#186](https://github.com/cloudamqp/amqp-client.js/pull/186))
22-
- `subscribe(params?, callback?)` accepts `QueueSubscribeParams``ConsumeParams` plus an optional `prefetch` that sets channel QoS before each consume, including after reconnect
22+
- `subscribe(callback)` / `subscribe(params, callback)` — auto-acks after the callback returns; nacks and requeues on throw; call `msg.ack()` / `msg.nack()` inside the callback to override; pass `{ noAck: true }` to skip acking entirely; `requeueOnNack` controls requeue behaviour on error ([#189](https://github.com/cloudamqp/amqp-client.js/pull/189))
23+
- `subscribe()` / `subscribe(params)` — async-iterator form; auto-acks the previous message when the loop advances; the last message (after `break`) is left unacked; call `msg.ack()` / `msg.nack()` before advancing to override; pass `{ noAck: true }` to skip acking ([#189](https://github.com/cloudamqp/amqp-client.js/pull/189))
2324
- Subscriptions survive reconnection automatically; the async-iterator form continues yielding without any caller changes
2425
- `AMQPExchange` — reconnect-safe exchange handle returned by `session.exchange()`, with `publish()`, `bind()`, `unbind()`, `delete()` ([#186](https://github.com/cloudamqp/amqp-client.js/pull/186))
2526
- `AMQPSubscription` — stable consumer handle across reconnections: exposes `channel`, `consumerTag`, and `cancel()`
2627
- `AMQPGeneratorSubscription` — extends `AMQPSubscription` with `AsyncIterable<AMQPMessage>` support
27-
- `QueueSubscribeParams` — exported type combining `ConsumeParams` with `prefetch?`
28+
- `QueueSubscribeParams` — exported type combining `ConsumeParams` with `prefetch?` and `requeueOnNack?` (default `true`) ([#189](https://github.com/cloudamqp/amqp-client.js/pull/189))
2829
- `QueuePublishOptions` / `ExchangePublishOptions` — exported types for publish options; both extend `AMQPProperties` with a `confirm?` flag; `ExchangePublishOptions` adds `routingKey?`
2930
- `ondisconnect` hook on `AMQPBaseClient` (TCP and WebSocket) — fires when the connection drops
3031

README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,18 @@ const session = await AMQPSession.connect("amqp://localhost")
4040
const q = await session.queue("my-queue")
4141
await q.publish("Hello World", { deliveryMode: 2 })
4242

43-
// Subscribe with a callback — consumer recovers automatically on reconnect
44-
const sub = await q.subscribe({ noAck: false }, async (msg) => {
43+
// Subscribe with a callback — consumer recovers automatically on reconnect.
44+
// Messages are acked after the callback returns. If it throws, the message is
45+
// nacked and requeued. Call msg.ack() / msg.nack() yourself to override.
46+
const sub = await q.subscribe(async (msg) => {
4547
console.log(msg.bodyString())
46-
await msg.ack()
4748
})
4849

49-
// Or subscribe with an async iterator
50-
const iterSub = await q.subscribe({ noAck: false })
50+
// Or subscribe with an async iterator — messages are acked when the loop advances.
51+
// Call msg.ack() / msg.nack() before the next iteration to override.
52+
const iterSub = await q.subscribe()
5153
for await (const msg of iterSub) {
5254
console.log(msg.bodyString())
53-
await msg.ack()
5455
}
5556

5657
// Exchanges work the same way
@@ -81,8 +82,8 @@ Subscriptions created via `queue.subscribe()` are automatically re-established a
8182
8283
```javascript
8384
const q = await session.queue("my-queue", { durable: true })
84-
const sub = await q.subscribe({ noAck: false, prefetch: 10 }, async (msg) => {
85-
await msg.ack()
85+
const sub = await q.subscribe({ prefetch: 10 }, async (msg) => {
86+
// process msg — acked on return, nacked and requeued on throw
8687
})
8788

8889
// sub.consumerTag and sub.channel reflect the current consumer (updated on reconnect)

package-lock.json

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/amqp-consumer.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,18 @@ export class AMQPConsumer {
4747
/**
4848
* Cancel/abort/stop the consumer. No more messages will be deliviered to the consumer.
4949
* Note that any unacked messages are still unacked as they belong to the channel and not the consumer.
50+
* Safe to call multiple times — concurrent calls share the same underlying wire operation.
5051
*/
5152
cancel() {
5253
if (this.channel.closed) return Promise.resolve(this.channel)
53-
return this.channel.basicCancel(this.tag)
54+
if (!this._cancelPromise) {
55+
this._cancelPromise = this.channel.basicCancel(this.tag)
56+
}
57+
return this._cancelPromise
5458
}
5559

60+
private _cancelPromise?: Promise<AMQPChannel>
61+
5662
/**
5763
* @ignore
5864
* @param [err] - why the consumer was closed

src/amqp-message.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ export class AMQPMessage {
3030
messageCount?: number
3131
replyCode?: number
3232
replyText?: string
33+
#acked = false
34+
35+
/** True if the message has already been acked, nacked, or rejected. */
36+
get isAcked(): boolean {
37+
return this.#acked
38+
}
3339

3440
/**
3541
* @param channel - Channel this message was delivered on
@@ -56,16 +62,22 @@ export class AMQPMessage {
5662

5763
/** Acknowledge the message */
5864
ack(multiple = false) {
65+
if (this.#acked) return Promise.resolve()
66+
this.#acked = true
5967
return this.channel.basicAck(this.deliveryTag, multiple)
6068
}
6169

6270
/** Negative acknowledgment (same as reject) */
6371
nack(requeue = false, multiple = false) {
72+
if (this.#acked) return Promise.resolve()
73+
this.#acked = true
6474
return this.channel.basicNack(this.deliveryTag, requeue, multiple)
6575
}
6676

67-
/** Rejected the message */
77+
/** Reject the message */
6878
reject(requeue = false) {
79+
if (this.#acked) return Promise.resolve()
80+
this.#acked = true
6981
return this.channel.basicReject(this.deliveryTag, requeue)
7082
}
7183

src/amqp-queue.ts

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import { publishConfirmed, publishNoConfirm, type Body } from "./amqp-publisher.
1414
export type QueueSubscribeParams = ConsumeParams & {
1515
/** Per-consumer prefetch limit (sets QoS on the channel before consuming). */
1616
prefetch?: number
17+
/**
18+
* Whether to requeue messages that are nacked due to a callback error.
19+
* Defaults to `true`.
20+
*/
21+
requeueOnNack?: boolean
1722
}
1823

1924
/** Options for {@link AMQPQueue#publish}. */
@@ -56,27 +61,48 @@ export class AMQPQueue {
5661
return this
5762
}
5863

59-
/**
60-
* Subscribe to this queue with automatic consumer recovery on reconnection.
61-
* @param params - consume and prefetch parameters
62-
* @param callback - called for each delivered message
63-
*/
64+
/** Subscribe with a callback. Messages are acked after the callback returns, nacked on error. */
65+
subscribe(callback: (msg: AMQPMessage) => void | Promise<void>): Promise<AMQPSubscription>
66+
/** Subscribe with a callback and custom params. */
6467
subscribe(
6568
params: QueueSubscribeParams,
6669
callback: (msg: AMQPMessage) => void | Promise<void>,
6770
): Promise<AMQPSubscription>
6871
/**
69-
* Subscribe to this queue with automatic consumer recovery on reconnection.
70-
* Messages are delivered through an async-iterable subscription that continues
71-
* across reconnections.
72-
* @param [params] - consume and prefetch parameters
72+
* Subscribe via an async iterator. Messages continue yielding across reconnections.
73+
* @example
74+
* ```ts
75+
* for await (const msg of await q.subscribe()) {
76+
* console.log(msg.bodyString())
77+
* await msg.ack()
78+
* }
79+
* ```
7380
*/
7481
subscribe(params?: QueueSubscribeParams): Promise<AMQPGeneratorSubscription>
7582
async subscribe(
76-
params?: QueueSubscribeParams,
83+
params?: QueueSubscribeParams | ((msg: AMQPMessage) => void | Promise<void>),
7784
callback?: (msg: AMQPMessage) => void | Promise<void>,
7885
): Promise<AMQPSubscription | AMQPGeneratorSubscription> {
79-
const { prefetch, ...consumeParams } = params ?? {}
86+
if (typeof params === "function") [callback, params] = [params, undefined]
87+
const { prefetch, requeueOnNack = true, ...consumeParams } = params ?? {}
88+
// When auto-acking (callback or generator), force noAck: false so the server
89+
// tracks delivery tags. basicConsume defaults noAck to true, so we must be explicit.
90+
if (callback !== undefined && !consumeParams.noAck) {
91+
consumeParams.noAck = false
92+
const userCallback = callback
93+
callback = async (msg: AMQPMessage) => {
94+
try {
95+
await userCallback(msg)
96+
await msg.ack()
97+
} catch {
98+
await msg.nack(requeueOnNack)
99+
}
100+
}
101+
}
102+
// Generator form: also force noAck: false for auto-ack unless caller opted out.
103+
if (callback === undefined && consumeParams.noAck !== true) {
104+
consumeParams.noAck = false
105+
}
80106
const def: ConsumerDefinition = {
81107
queueName: this.name,
82108
consumeParams,

src/amqp-subscription.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ export class AMQPGeneratorSubscription extends AMQPSubscription implements Async
9393
}
9494

9595
async *[Symbol.asyncIterator](): AsyncGenerator<AMQPMessage, void, undefined> {
96+
const autoAck = !this.def.consumeParams.noAck
97+
let prev: AMQPMessage | undefined
9698
while (!this.stopped) {
9799
const consumer = this.consumer
98100
if (!(consumer instanceof AMQPGeneratorConsumer)) {
@@ -101,11 +103,15 @@ export class AMQPGeneratorSubscription extends AMQPSubscription implements Async
101103
try {
102104
for await (const msg of consumer.messages) {
103105
if (this.stopped) return
106+
if (autoAck) await prev?.ack()
107+
prev = msg
104108
yield msg
105109
}
106110
} catch {
107111
// Consumer's channel was closed — wait for reconnect to provide a new consumer
108112
}
113+
// Reset on disconnect; unacked messages are requeued by the server when the channel closes
114+
prev = undefined
109115
if (!this.stopped) {
110116
await new Promise<void>((resolve) => {
111117
this.consumerReady = resolve

0 commit comments

Comments
 (0)