Skip to content

Commit 1913a67

Browse files
authored
Add AsyncGenerator support to subscribe() for improved DX (#169)
Extends queue.subscribe() to support AsyncGenerator-based message consumption via consumer.messages property when no callback is provided. Benefits: - Natural iteration with for await...of loops - Automatic consumer cancellation on loop exit - Better backpressure control - Access to consumer methods (wait, cancel, tag) Implementation: - New AMQPGeneratorConsumer subclass handles generator logic - subscribe() overloads: with callback returns AMQPConsumer, without callback returns AMQPGeneratorConsumer - consumer.messages property (not method) for cleaner syntax - Reuses existing callback infrastructure for message delivery Fixes #168
1 parent a6a2ff6 commit 1913a67

5 files changed

Lines changed: 294 additions & 13 deletions

File tree

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,40 @@ async function run() {
5050
run()
5151
```
5252

53+
### Using AsyncGenerator for consuming messages
54+
55+
As an alternative to the callback-based approach, you can use an AsyncGenerator for a more modern, iteration-based API:
56+
57+
```javascript
58+
import { AMQPClient } from "@cloudamqp/amqp-client"
59+
60+
async function run() {
61+
try {
62+
const amqp = new AMQPClient("amqp://localhost")
63+
const conn = await amqp.connect()
64+
const ch = await conn.channel()
65+
const q = await ch.queue()
66+
67+
await q.publish("Hello World", { deliveryMode: 2 })
68+
69+
// Subscribe without a callback and use consumer.messages for AsyncGenerator
70+
const consumer = await q.subscribe({ noAck: false })
71+
for await (const msg of consumer.messages) {
72+
console.log(msg.bodyToString())
73+
await msg.ack()
74+
break // breaking automatically cancels the consumer
75+
}
76+
77+
await conn.close()
78+
} catch (e) {
79+
console.error("ERROR", e)
80+
e.connection.close()
81+
}
82+
}
83+
84+
run()
85+
```
86+
5387
## WebSockets
5488

5589
This library can be used in the browser to access an AMQP server over WebSockets. For servers such as RabbitMQ that doesn't support WebSockets natively a [WebSocket TCP relay](https://github.com/cloudamqp/websocket-tcp-relay/) have to be used as a proxy. All CloudAMQP servers has this proxy configured. More information can be found [in this blog post](https://www.cloudamqp.com/blog/cloudamqp-releases-amqp-websockets.html).

src/amqp-channel.ts

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { AMQPError } from "./amqp-error.js"
22
import * as AMQPFrame from "./amqp-frame.js"
33
import { AMQPView } from "./amqp-view.js"
44
import { AMQPQueue } from "./amqp-queue.js"
5-
import { AMQPConsumer } from "./amqp-consumer.js"
5+
import { AMQPConsumer, AMQPGeneratorConsumer } from "./amqp-consumer.js"
66
import type { AMQPMessage } from "./amqp-message.js"
77
import type { AMQPBaseClient } from "./amqp-base-client.js"
88
import type { AMQPProperties } from "./amqp-properties.js"
@@ -13,7 +13,7 @@ import type { AMQPProperties } from "./amqp-properties.js"
1313
export class AMQPChannel {
1414
readonly connection: AMQPBaseClient
1515
readonly id: number
16-
readonly consumers = new Map<string, AMQPConsumer>()
16+
readonly consumers = new Map<string, AMQPConsumer | AMQPGeneratorConsumer>()
1717
private rpcQueue: Promise<unknown> = Promise.resolve(true)
1818
private readonly rpcCallbacks: [(value?: unknown) => void, (err?: Error) => void][] = []
1919
private readonly unconfirmedPublishes: [number, (confirmId: number) => void, (err?: Error) => void][] = []
@@ -144,9 +144,25 @@ export class AMQPChannel {
144144
*/
145145
basicConsume(
146146
queue: string,
147-
{ tag = "", noAck = true, exclusive = false, args = {} } = {},
147+
params: ConsumeParams,
148148
callback: (msg: AMQPMessage) => void | Promise<void>,
149-
): Promise<AMQPConsumer> {
149+
): Promise<AMQPConsumer>
150+
/**
151+
* Consume from a queue. Messages will be delivered asynchronously through an AsyncGenerator at `consumer.messages`.
152+
* @param queue - name of the queue to poll
153+
* @param param
154+
* @param [param.tag=""] - tag of the consumer, will be server generated if left empty
155+
* @param [param.noAck=true] - if messages are removed from the server upon delivery, or have to be acknowledged
156+
* @param [param.exclusive=false] - if this can be the only consumer of the queue, will return an Error if there are other consumers to the queue already
157+
* @param [param.args={}] - custom arguments
158+
* @return {AMQPGeneratorConsumer} - Consumer with an AsyncGenerator for messages at `consumer.messages`
159+
*/
160+
basicConsume(queue: string, params: ConsumeParams): Promise<AMQPGeneratorConsumer>
161+
basicConsume(
162+
queue: string,
163+
{ tag = "", noAck = true, exclusive = false, args = {} }: ConsumeParams = {},
164+
callback?: (msg: AMQPMessage) => void | Promise<void>,
165+
): Promise<AMQPConsumer | AMQPGeneratorConsumer> {
150166
if (this.closed) return this.rejectClosed()
151167
const noWait = false
152168
const noLocal = false
@@ -172,7 +188,12 @@ export class AMQPChannel {
172188
return new Promise((resolve, reject) => {
173189
this.sendRpc(frame)
174190
.then((consumerTag) => {
175-
const consumer = new AMQPConsumer(this, consumerTag, callback)
191+
let consumer: AMQPConsumer | AMQPGeneratorConsumer
192+
if (callback) {
193+
consumer = new AMQPConsumer(this, consumerTag, callback)
194+
} else {
195+
consumer = new AMQPGeneratorConsumer(this, consumerTag)
196+
}
176197
this.consumers.set(consumerTag, consumer)
177198
resolve(consumer)
178199
})

src/amqp-consumer.ts

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ export class AMQPConsumer {
99
readonly channel: AMQPChannel
1010
readonly tag: string
1111
readonly onMessage: (msg: AMQPMessage) => void | Promise<void>
12-
private closed = false
13-
private closedError?: Error
12+
protected closed = false
13+
protected closedError?: Error
1414
private resolveWait?: (value: void) => void
1515
private rejectWait?: (err: Error) => void
1616
private timeoutId?: ReturnType<typeof setTimeout>
@@ -67,3 +67,71 @@ export class AMQPConsumer {
6767
}
6868
}
6969
}
70+
71+
export class AMQPGeneratorConsumer extends AMQPConsumer {
72+
private messageQueue: AMQPMessage[] = []
73+
private messageResolver: ((msg: AMQPMessage) => void) | null = null
74+
private _generator?: AsyncGenerator<AMQPMessage, void, undefined>
75+
76+
constructor(channel: AMQPChannel, tag: string) {
77+
super(channel, tag, (msg: AMQPMessage) => {
78+
// Feed messages to the generator queue
79+
if (this.messageResolver) {
80+
this.messageResolver(msg)
81+
this.messageResolver = null
82+
} else if (this.messageQueue) {
83+
this.messageQueue.push(msg)
84+
}
85+
})
86+
}
87+
88+
/**
89+
* Get an AsyncGenerator for consuming messages.
90+
* @return An AsyncGenerator that yields messages
91+
*/
92+
get messages(): AsyncGenerator<AMQPMessage, void, undefined> {
93+
if (this._generator) {
94+
return this._generator
95+
}
96+
97+
this._generator = this.generateMessages()
98+
return this._generator
99+
}
100+
101+
private async *generateMessages(): AsyncGenerator<AMQPMessage, void, undefined> {
102+
try {
103+
while (!this.closedError && !this.closed) {
104+
if (this.messageQueue.length > 0) {
105+
const msg = this.messageQueue.shift()!
106+
yield msg
107+
} else {
108+
const msg = await new Promise<AMQPMessage>((resolve) => {
109+
this.messageResolver = resolve
110+
})
111+
if (this.closedError || this.closed) break
112+
yield msg
113+
}
114+
}
115+
if (this.closedError) throw this.closedError
116+
} finally {
117+
// Clean up: cancel consumer when generator is done
118+
try {
119+
await this.cancel()
120+
} catch {
121+
// Ignore errors during cleanup
122+
}
123+
}
124+
}
125+
126+
override setClosed(err?: Error): void {
127+
super.setClosed(err)
128+
// Wake up the generator if it's waiting
129+
if (this.messageResolver) {
130+
const resolver = this.messageResolver
131+
this.messageResolver = null
132+
// Resolve the promise with a sentinel value
133+
// The generator will check closedError/closed immediately and break without yielding
134+
resolver(undefined as unknown as AMQPMessage)
135+
}
136+
}
137+
}

src/amqp-queue.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { AMQPMessage } from "./amqp-message.js"
22
import type { AMQPChannel, ConsumeParams } from "./amqp-channel.js"
33
import type { AMQPProperties } from "./amqp-properties.js"
4-
import type { AMQPConsumer } from "./amqp-consumer.js"
4+
import type { AMQPConsumer, AMQPGeneratorConsumer } from "./amqp-consumer.js"
55

66
/**
77
* Convenience class for queues
@@ -69,11 +69,27 @@ export class AMQPQueue {
6969
* @param [params.args={}] - custom arguments
7070
* @param {function(AMQPMessage) : void | Promise<void>} callback - Function to be called for each received message
7171
*/
72-
subscribe(
73-
{ noAck = true, exclusive = false, tag = "", args = {} }: ConsumeParams = {},
74-
callback: (msg: AMQPMessage) => void | Promise<void>,
75-
): Promise<AMQPConsumer> {
76-
return this.channel.basicConsume(this.name, { noAck, exclusive, tag, args }, callback)
72+
async subscribe(params: ConsumeParams, callback: (msg: AMQPMessage) => void | Promise<void>): Promise<AMQPConsumer>
73+
74+
/**
75+
* Subscribe to the queue. Use `consumer.messages` to iterate over messages with an AsyncGenerator.
76+
* @param params
77+
* @param [params.noAck=true] - if messages are removed from the server upon delivery, or have to be acknowledged
78+
* @param [params.exclusive=false] - if this can be the only consumer of the queue, will return an Error if there are other consumers to the queue already
79+
* @param [params.tag=""] - tag of the consumer, will be server generated if left empty
80+
* @param [params.args={}] - custom arguments
81+
* @return {AMQPGeneratorConsumer} - Consumer with an AsyncGenerator for messages at `consumer.messages`
82+
*/
83+
async subscribe(params: ConsumeParams): Promise<AMQPGeneratorConsumer>
84+
85+
async subscribe(
86+
params: ConsumeParams = {},
87+
callback?: (msg: AMQPMessage) => void | Promise<void>,
88+
): Promise<AMQPConsumer | AMQPGeneratorConsumer> {
89+
if (callback) {
90+
return this.channel.basicConsume(this.name, params, callback)
91+
}
92+
return this.channel.basicConsume(this.name, params)
7793
}
7894

7995
/**

test/test.ts

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,148 @@ test("can unsubscribe from a queue", async () => {
113113
await expect(q.unsubscribe(consumer.tag)).resolves.toBeDefined()
114114
})
115115

116+
test("can subscribe using AsyncGenerator", async () => {
117+
const amqp = getNewClient()
118+
const conn = await amqp.connect()
119+
const ch = await conn.channel()
120+
const q = await ch.queue("")
121+
await q.publish("hello world")
122+
123+
const consumer = await q.subscribe({ noAck: true })
124+
const generator = consumer.messages
125+
const result = await generator.next()
126+
127+
expect(result.done).toBe(false)
128+
const value = result.value
129+
if (!value) throw new Error("Expected a message")
130+
expect(value.bodyString()).toEqual("hello world")
131+
132+
await generator.return()
133+
})
134+
135+
test("can consume multiple messages with AsyncGenerator", async () => {
136+
const amqp = getNewClient()
137+
const conn = await amqp.connect()
138+
const ch = await conn.channel()
139+
const q = await ch.queue("")
140+
141+
// Publish 3 messages
142+
await q.publish("message 1")
143+
await q.publish("message 2")
144+
await q.publish("message 3")
145+
146+
const consumer = await q.subscribe({ noAck: true })
147+
const messages: string[] = []
148+
let count = 0
149+
for await (const msg of consumer.messages) {
150+
messages.push(msg.bodyString()!)
151+
count++
152+
if (count === 3) break
153+
}
154+
155+
expect(messages).toEqual(["message 1", "message 2", "message 3"])
156+
})
157+
158+
test("AsyncGenerator with manual acknowledgment", async () => {
159+
const amqp = getNewClient()
160+
const conn = await amqp.connect()
161+
const ch = await conn.channel()
162+
const q = await ch.queue("")
163+
164+
await q.publish("test message")
165+
166+
const consumer = await q.subscribe({ noAck: false })
167+
let ackCalled = false
168+
for await (const msg of consumer.messages) {
169+
expect(msg.bodyString()).toEqual("test message")
170+
await msg.ack()
171+
ackCalled = true
172+
break
173+
}
174+
175+
expect(ackCalled).toBe(true)
176+
})
177+
178+
test("AsyncGenerator with nack", async () => {
179+
const amqp = getNewClient()
180+
const conn = await amqp.connect()
181+
const ch = await conn.channel()
182+
const q = await ch.queue("")
183+
184+
await q.publish("test message")
185+
186+
const consumer = await q.subscribe({ noAck: false })
187+
let nackCalled = false
188+
for await (const msg of consumer.messages) {
189+
expect(msg.bodyString()).toEqual("test message")
190+
await msg.nack(false)
191+
nackCalled = true
192+
break
193+
}
194+
195+
expect(nackCalled).toBe(true)
196+
})
197+
198+
test("AsyncGenerator auto-cancels consumer on break", async () => {
199+
const amqp = getNewClient()
200+
const conn = await amqp.connect()
201+
const ch = await conn.channel()
202+
const q = await ch.queue("")
203+
204+
// Publish multiple messages
205+
for (let i = 0; i < 5; i++) {
206+
await q.publish(`message ${i}`)
207+
}
208+
209+
const consumer = await q.subscribe({ noAck: true })
210+
let receivedCount = 0
211+
for await (const msg of consumer.messages) {
212+
void msg // Intentionally unused in this test
213+
receivedCount++
214+
if (receivedCount === 2) break
215+
}
216+
217+
expect(receivedCount).toBe(2)
218+
})
219+
220+
test("AsyncGenerator works with prefetch", async () => {
221+
const amqp = getNewClient()
222+
const conn = await amqp.connect()
223+
const ch = await conn.channel()
224+
await ch.prefetch(1)
225+
const q = await ch.queue("")
226+
227+
// Publish multiple messages
228+
await q.publish("message 1")
229+
await q.publish("message 2")
230+
await q.publish("message 3")
231+
232+
const consumer = await q.subscribe({ noAck: false })
233+
const messages: string[] = []
234+
for await (const msg of consumer.messages) {
235+
messages.push(msg.bodyString()!)
236+
await msg.ack()
237+
if (messages.length === 3) break
238+
}
239+
240+
expect(messages).toEqual(["message 1", "message 2", "message 3"])
241+
})
242+
243+
test("AsyncGenerator with exclusive consumer", async () => {
244+
const amqp = getNewClient()
245+
const conn = await amqp.connect()
246+
const ch = await conn.channel()
247+
const q = await ch.queue("")
248+
249+
await q.publish("exclusive message")
250+
251+
const consumer = await q.subscribe({ noAck: true, exclusive: true })
252+
for await (const msg of consumer.messages) {
253+
expect(msg.bodyString()).toEqual("exclusive message")
254+
break
255+
}
256+
})
257+
116258
test("can delete a queue", async () => {
117259
const amqp = getNewClient()
118260
const conn = await amqp.connect()

0 commit comments

Comments
 (0)