You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Add AsyncGenerator support to subscribe() for improved DX (#169)
Extends queue.subscribe() to support AsyncGenerator-based message
consumption via consumer.messages property when no callback is provided.
Benefits:
- Natural iteration with for await...of loops
- Automatic consumer cancellation on loop exit
- Better backpressure control
- Access to consumer methods (wait, cancel, tag)
Implementation:
- New AMQPGeneratorConsumer subclass handles generator logic
- subscribe() overloads: with callback returns AMQPConsumer,
without callback returns AMQPGeneratorConsumer
- consumer.messages property (not method) for cleaner syntax
- Reuses existing callback infrastructure for message delivery
Fixes#168
// Subscribe without a callback and use consumer.messages for AsyncGenerator
70
+
constconsumer=awaitq.subscribe({ noAck:false })
71
+
forawait (constmsgofconsumer.messages) {
72
+
console.log(msg.bodyToString())
73
+
awaitmsg.ack()
74
+
break// breaking automatically cancels the consumer
75
+
}
76
+
77
+
awaitconn.close()
78
+
} catch (e) {
79
+
console.error("ERROR", e)
80
+
e.connection.close()
81
+
}
82
+
}
83
+
84
+
run()
85
+
```
86
+
53
87
## WebSockets
54
88
55
89
This library can be used in the browser to access an AMQP server over WebSockets. For servers such as RabbitMQ that doesn't support WebSockets natively a [WebSocket TCP relay](https://github.com/cloudamqp/websocket-tcp-relay/) have to be used as a proxy. All CloudAMQP servers has this proxy configured. More information can be found [in this blog post](https://www.cloudamqp.com/blog/cloudamqp-releases-amqp-websockets.html).
{ tag ="", noAck =true, exclusive =false, args ={}}={},
147
+
params: ConsumeParams,
148
148
callback: (msg: AMQPMessage)=>void|Promise<void>,
149
-
): Promise<AMQPConsumer>{
149
+
): Promise<AMQPConsumer>
150
+
/**
151
+
* Consume from a queue. Messages will be delivered asynchronously through an AsyncGenerator at `consumer.messages`.
152
+
* @param queue - name of the queue to poll
153
+
* @param param
154
+
* @param [param.tag=""] - tag of the consumer, will be server generated if left empty
155
+
* @param [param.noAck=true] - if messages are removed from the server upon delivery, or have to be acknowledged
156
+
* @param [param.exclusive=false] - if this can be the only consumer of the queue, will return an Error if there are other consumers to the queue already
157
+
* @param [param.args={}] - custom arguments
158
+
* @return {AMQPGeneratorConsumer} - Consumer with an AsyncGenerator for messages at `consumer.messages`
* Subscribe to the queue. Use `consumer.messages` to iterate over messages with an AsyncGenerator.
76
+
* @param params
77
+
* @param [params.noAck=true] - if messages are removed from the server upon delivery, or have to be acknowledged
78
+
* @param [params.exclusive=false] - if this can be the only consumer of the queue, will return an Error if there are other consumers to the queue already
79
+
* @param [params.tag=""] - tag of the consumer, will be server generated if left empty
80
+
* @param [params.args={}] - custom arguments
81
+
* @return {AMQPGeneratorConsumer} - Consumer with an AsyncGenerator for messages at `consumer.messages`
0 commit comments