Skip to content

Commit f03013a

Browse files
committed
feat!: add AMQPSession with reconnection and high-level queue/exchange API
Introduces AMQPSession as the recommended high-level entry point for applications that need automatic reconnection and consumer recovery. New classes and types: - AMQPSession.connect(url, options?) — picks TCP or WebSocket transport from the URL scheme; reconnects with configurable exponential backoff - AMQPQueue — reconnect-safe queue handle: publish(), subscribe(), get(), bind(), unbind(), purge(), delete() - AMQPExchange — reconnect-safe exchange handle: publish(), bind(), unbind(), delete() - AMQPSubscription / AMQPGeneratorSubscription — stable consumer handles that survive reconnection; generator variant is AsyncIterable<AMQPMessage> - QueueSubscribeParams — ConsumeParams + prefetch? for per-consumer QoS - QueuePublishOptions / ExchangePublishOptions — AMQPProperties extended with confirm? (and routingKey? for exchanges); replaces separate positional args and publishAndForget() - ondisconnect hook on AMQPBaseClient (TCP and WebSocket) Publish API: queue.publish(body, { confirm, contentType, … }) and exchange.publish(body, { routingKey, confirm, contentType, … }). Shared publish logic lives in module-level functions (publishConfirmed / publishNoConfirm) — no inheritance. Exchange shortcuts on session: directExchange(), fanoutExchange(), topicExchange(), headersExchange(). Breaking changes (v3 → v4): - AMQPChannel.queue() removed; use ch.queueDeclare() or session.queue() - AMQPQueue is now session-only; no longer returned by channel methods - AMQPQueue no longer re-exported from AMQPClient / AMQPWebSocketClient
1 parent 7f8894a commit f03013a

17 files changed

Lines changed: 1281 additions & 678 deletions

CHANGELOG.md

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,65 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12-
- `AMQPSession` — high-level client with automatic reconnection and consumer recovery ([#185](https://github.com/cloudamqp/amqp-client.js/pull/185))
13-
- `AMQPSession.connect(url, options?)` factory: picks TCP or WebSocket transport from the URL scheme
12+
- `AMQPSession` — high-level client with automatic reconnection and consumer recovery ([#185](https://github.com/cloudamqp/amqp-client.js/pull/185), [#186](https://github.com/cloudamqp/amqp-client.js/pull/186))
13+
- `AMQPSession.connect(url, options?)` factory: picks TCP or WebSocket transport from the URL scheme (`amqp://` / `amqps://` → TCP; `ws://` / `wss://` → WebSocket)
1414
- Exponential backoff with configurable `reconnectInterval`, `maxReconnectInterval`, `backoffMultiplier`, and `maxRetries`
15-
- `session.subscribe(queue, params, callback?, options?)` — returns an `AMQPSubscription` (or `AMQPGeneratorSubscription` for async-iterable usage) that survives reconnections
15+
- `session.queue(name, params?, args?)` — declare and return an `AMQPQueue` handle
16+
- `session.exchange(name, type, params?, args?)` — declare and return an `AMQPExchange` handle
17+
- Shorthand exchange factories: `directExchange()`, `fanoutExchange()`, `topicExchange()`, `headersExchange()`
1618
- `session.onconnect` / `session.onfailed` lifecycle hooks
19+
- `session.closed``true` when the underlying connection is closed
1720
- `session.stop()` — cancels reconnection, clears all subscriptions, and closes the connection
18-
- `AMQPSubscription` — stable handle across reconnections: exposes `channel`, `consumerTag`, and `cancel()`
19-
- `AMQPGeneratorSubscription` — extends `AMQPSubscription` with `AsyncIterable<AMQPMessage>` support; bridges the iterator across reconnects
21+
- `AMQPQueue` — reconnect-safe queue handle returned by `session.queue()`, with `publish()`, `subscribe()`, `get()`, `bind()`, `unbind()`, `purge()`, `delete()` ([#186](https://github.com/cloudamqp/amqp-client.js/pull/186))
22+
- `subscribe(params?, callback?)` accepts `QueueSubscribeParams``ConsumeParams` plus an optional `prefetch` that sets channel QoS before each consume, including after reconnect
23+
- Subscriptions survive reconnection automatically; the async-iterator form continues yielding without any caller changes
24+
- `AMQPExchange` — reconnect-safe exchange handle returned by `session.exchange()`, with `publish()`, `bind()`, `unbind()`, `delete()` ([#186](https://github.com/cloudamqp/amqp-client.js/pull/186))
25+
- `AMQPSubscription` — stable consumer handle across reconnections: exposes `channel`, `consumerTag`, and `cancel()`
26+
- `AMQPGeneratorSubscription` — extends `AMQPSubscription` with `AsyncIterable<AMQPMessage>` support
27+
- `QueueSubscribeParams` — exported type combining `ConsumeParams` with `prefetch?`
28+
- `QueuePublishOptions` / `ExchangePublishOptions` — exported types for publish options; both extend `AMQPProperties` with a `confirm?` flag; `ExchangePublishOptions` adds `routingKey?`
2029
- `ondisconnect` hook on `AMQPBaseClient` (TCP and WebSocket) — fires when the connection drops
2130

31+
### Changed
32+
33+
- **Breaking:** `AMQPChannel.queue()` removed ([#186](https://github.com/cloudamqp/amqp-client.js/pull/186)). Use `ch.queueDeclare()` with low-level channel methods, or `session.queue()` for the high-level API. See the migration guide below.
34+
- **Breaking:** `AMQPQueue` is now a session-only class — no longer returned by channel methods, no longer accepts a channel in its constructor. ([#186](https://github.com/cloudamqp/amqp-client.js/pull/186))
35+
- **Breaking:** `AMQPQueue` is no longer re-exported from `AMQPClient` or `AMQPWebSocketClient`. Import from the main package entry point instead. ([#186](https://github.com/cloudamqp/amqp-client.js/pull/186))
36+
37+
### Migration guide
38+
39+
The v3 `AMQPQueue` was tied to a single channel. In v4, `AMQPQueue` is a session-level handle that is reconnect-safe.
40+
41+
If you were using `ch.queue()`:
42+
43+
```diff
44+
-const ch = await conn.channel()
45+
-const q = await ch.queue("my-queue")
46+
-await q.publish("hello")
47+
-const consumer = await q.subscribe({ noAck: false }, (msg) => msg.ack())
48+
-const msg = await q.get()
49+
-await q.bind("amq.topic", "routing.key")
50+
-await q.delete()
51+
52+
+// Low-level (no reconnection)
53+
+const ch = await conn.channel()
54+
+const { name } = await ch.queueDeclare("my-queue")
55+
+await ch.basicPublish("", name, "hello")
56+
+const consumer = await ch.basicConsume(name, { noAck: false }, (msg) => msg.ack())
57+
+const msg = await ch.basicGet(name)
58+
+await ch.queueBind(name, "amq.topic", "routing.key")
59+
+await ch.queueDelete(name)
60+
61+
+// High-level (automatic reconnection)
62+
+const session = await AMQPSession.connect("amqp://localhost")
63+
+const q = await session.queue("my-queue")
64+
+await q.publish("hello")
65+
+const sub = await q.subscribe({ noAck: false }, (msg) => msg.ack())
66+
+const msg = await q.get()
67+
+await q.bind("amq.topic", "routing.key")
68+
+await q.delete()
69+
```
70+
2271
## [3.4.1] - 2025-11-28
2372

2473
### Fixed

README.md

Lines changed: 97 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -22,159 +22,130 @@ Start node with `--enable-source-maps` to get proper stacktraces as the library
2222

2323
## Example usage
2424

25-
Using AMQP in Node.js:
25+
This library provides two APIs:
2626

27-
```javascript
28-
import { AMQPClient } from "@cloudamqp/amqp-client"
27+
- **High-level** (`AMQPSession`): automatic reconnection, consumer recovery — use `queue()` / `exchange()` handles for reconnect-safe operations
28+
- **Low-level** (`AMQPClient` / `AMQPWebSocketClient`): direct channel access with `queueDeclare`, `basicPublish`, `basicConsume`, etc.
2929

30-
async function run() {
31-
try {
32-
const amqp = new AMQPClient("amqp://localhost")
33-
const conn = await amqp.connect()
34-
const ch = await conn.channel()
35-
const q = await ch.queue()
36-
const consumer = await q.subscribe({ noAck: true }, async (msg) => {
37-
console.log(msg.bodyToString())
38-
await consumer.cancel()
39-
})
40-
await q.publish("Hello World", { deliveryMode: 2 })
41-
await consumer.wait() // will block until consumer is canceled or throw an error if server closed channel/connection
42-
await conn.close()
43-
} catch (e) {
44-
console.error("ERROR", e)
45-
e.connection.close()
46-
setTimeout(run, 1000) // will try to reconnect in 1s
47-
}
48-
}
30+
### High-level API (recommended)
4931

50-
run()
51-
```
32+
Use `AMQPSession.connect(url, options)` to get a session with automatic reconnection and consumer recovery. The transport is chosen from the URL scheme (`amqp://` / `amqps://` → TCP; `ws://` / `wss://` → WebSocket):
5233

53-
### Using AsyncGenerator for consuming messages
34+
```javascript
35+
import { AMQPSession } from "@cloudamqp/amqp-client"
5436

55-
As an alternative to the callback-based approach, you can use an AsyncGenerator for a more modern, iteration-based API:
37+
const session = await AMQPSession.connect("amqp://localhost")
5638

57-
```javascript
58-
import { AMQPClient } from "@cloudamqp/amqp-client"
39+
// Declare a queue and publish a message (waits for broker confirmation)
40+
const q = await session.queue("my-queue")
41+
await q.publish("Hello World", { deliveryMode: 2 })
42+
43+
// Subscribe with a callback — consumer recovers automatically on reconnect
44+
const sub = await q.subscribe({ noAck: false }, async (msg) => {
45+
console.log(msg.bodyString())
46+
await msg.ack()
47+
})
5948

60-
async function run() {
61-
try {
62-
const amqp = new AMQPClient("amqp://localhost")
63-
const conn = await amqp.connect()
64-
const ch = await conn.channel()
65-
const q = await ch.queue()
66-
67-
await q.publish("Hello World", { deliveryMode: 2 })
68-
69-
// Subscribe without a callback and use consumer.messages for AsyncGenerator
70-
const consumer = await q.subscribe({ noAck: false })
71-
for await (const msg of consumer.messages) {
72-
console.log(msg.bodyToString())
73-
await msg.ack()
74-
break // breaking automatically cancels the consumer
75-
}
76-
77-
await conn.close()
78-
} catch (e) {
79-
console.error("ERROR", e)
80-
e.connection.close()
81-
}
49+
// Or subscribe with an async iterator
50+
const iterSub = await q.subscribe({ noAck: false })
51+
for await (const msg of iterSub) {
52+
console.log(msg.bodyString())
53+
await msg.ack()
8254
}
8355

84-
run()
85-
```
56+
// Exchanges work the same way
57+
const x = await session.topicExchange("events")
58+
await x.publish("user signed up", { routingKey: "events.user.created" })
8659

87-
### Automatic Reconnection
60+
// When done
61+
await session.stop()
62+
```
8863

89-
Use `AMQPSession.connect(url, options)` to get a session that manages reconnection and consumer recovery. The transport is chosen from the URL scheme (`amqp://` / `amqps://` → TCP socket; `ws://` / `wss://` → WebSocket):
64+
#### Reconnection options
9065

9166
```javascript
92-
import { AMQPSession } from "@cloudamqp/amqp-client"
93-
94-
async function run() {
95-
// connect() picks the right transport and returns a ready session
96-
const session = await AMQPSession.connect("amqp://localhost", {
97-
reconnectInterval: 1000, // Initial delay before reconnecting (ms)
98-
maxReconnectInterval: 30000, // Maximum delay between attempts (ms)
99-
backoffMultiplier: 2, // Exponential backoff multiplier
100-
maxRetries: 0, // 0 = infinite retries
101-
})
102-
103-
// Set up event callbacks on the session
104-
session.onconnect = () => console.log("Reconnected!")
105-
session.onfailed = (err) => console.log("Failed to reconnect:", err?.message)
106-
107-
// Subscribe with a callback — returns an AMQPSubscription.
108-
const sub = await session.subscribe(
109-
"my-queue",
110-
{ noAck: false },
111-
async (msg) => {
112-
console.log("Received:", msg.bodyString())
113-
await msg.ack()
114-
},
115-
{
116-
prefetch: 10, // Set prefetch limit for this consumer
117-
queue: { durable: true }, // Queue declaration parameters for recovery
118-
},
119-
)
120-
121-
// sub.consumerTag and sub.channel reflect the current consumer (updated on reconnect)
122-
// To stop consuming this queue (also removes it from auto-recovery):
123-
// await sub.cancel()
124-
125-
// Subscribe without a callback — returns an AMQPGeneratorSubscription (async-iterable).
126-
// const sub = await session.subscribe("my-queue", { noAck: true })
127-
// for await (const msg of sub) {
128-
// console.log("Received:", msg.bodyString())
129-
// }
130-
// await sub.cancel()
131-
132-
// When done, stop the session (closes connection, stops reconnection)
133-
// await session.stop()
134-
}
135-
136-
run()
67+
const session = await AMQPSession.connect("amqp://localhost", {
68+
reconnectInterval: 1000, // initial delay before reconnecting (ms)
69+
maxReconnectInterval: 30000, // maximum delay between attempts (ms)
70+
backoffMultiplier: 2, // exponential backoff multiplier
71+
maxRetries: 0, // 0 = infinite retries
72+
})
73+
74+
session.onconnect = () => console.log("Reconnected!")
75+
session.onfailed = (err) => console.error("Gave up:", err?.message)
13776
```
13877
139-
#### Two APIs for Different Use Cases
78+
#### Consumer recovery
14079
141-
This library provides two APIs that coexist:
80+
Subscriptions created via `queue.subscribe()` are automatically re-established after reconnection. Include `prefetch` in the subscribe params to set QoS on each connection:
14281
143-
1. **Low-level API**: `client.connect()``channel()``queue.subscribe()`
144-
- Full control over channels and resources
145-
- No automatic reconnection — you handle connection failures
146-
- Use when you need fine-grained control
82+
```javascript
83+
const q = await session.queue("my-queue", { durable: true })
84+
const sub = await q.subscribe({ noAck: false, prefetch: 10 }, async (msg) => {
85+
await msg.ack()
86+
})
14787

148-
2. **High-level API**: `AMQPSession.connect()``session.subscribe()`
149-
- Automatic reconnection and consumer recovery
150-
- Consumer objects stay valid across reconnections
151-
- Use for convenience when you don't need channel-level control
88+
// sub.consumerTag and sub.channel reflect the current consumer (updated on reconnect)
89+
// await sub.cancel() // stops consuming and removes from auto-recovery
90+
```
15291
153-
#### Key Features
92+
### Low-level API
15493
155-
- **Automatic reconnection**: Reconnects automatically when the connection is lost
156-
- **Exponential backoff**: Configurable delays between reconnection attempts
157-
- **Consumer recovery**: Consumers registered via `session.subscribe()` are automatically re-established after reconnection
158-
- **Event callbacks**: Hooks for connection state changes (`onconnect`, `onfailed`)
159-
- **Prefetch control**: Set per-consumer prefetch limits
94+
For full control over channels and resources, use the transport clients directly:
16095
161-
#### Reconnection Behavior
96+
```javascript
97+
import { AMQPClient } from "@cloudamqp/amqp-client"
16298

163-
When a connection is lost:
99+
const amqp = new AMQPClient("amqp://localhost")
100+
const conn = await amqp.connect()
101+
const ch = await conn.channel()
102+
103+
// Declare a queue
104+
const q = await ch.queueDeclare("my-queue")
105+
106+
// Publish
107+
await ch.basicPublish("", q.name, "Hello World", { deliveryMode: 2 })
108+
109+
// Consume with a callback
110+
const consumer = await ch.basicConsume(q.name, { noAck: false }, async (msg) => {
111+
console.log(msg.bodyToString())
112+
await msg.ack()
113+
await consumer.cancel()
114+
})
115+
await consumer.wait()
116+
117+
// Or consume with an async iterator
118+
const consumer = await ch.basicConsume(q.name, { noAck: false })
119+
for await (const msg of consumer.messages) {
120+
console.log(msg.bodyToString())
121+
await msg.ack()
122+
break // breaking automatically cancels the consumer
123+
}
164124

165-
- The session reconnects automatically with exponential backoff
166-
- After a successful reconnect, consumers registered via `session.subscribe()` are re-established, then `onconnect` fires
167-
- Messages delivered but not acknowledged before disconnection are redelivered by the broker
168-
- `onfailed` fires and reconnection stops if `maxRetries` is exceeded
169-
- For disconnect detection, set `client.ondisconnect` directly
125+
await conn.close()
126+
```
170127
171128
## WebSockets
172129
173130
This library can be used in the browser to access an AMQP server over WebSockets. For servers such as RabbitMQ that doesn't support WebSockets natively a [WebSocket TCP relay](https://github.com/cloudamqp/websocket-tcp-relay/) have to be used as a proxy. All CloudAMQP servers has this proxy configured. More information can be found [in this blog post](https://www.cloudamqp.com/blog/cloudamqp-releases-amqp-websockets.html).
174131
175132
For web browsers a [compiled](https://www.typescriptlang.org/) and [rolled up](https://www.rollupjs.org/) version is available at <https://github.com/cloudamqp/amqp-client.js/releases>.
176133
177-
Using AMQP over WebSockets in a browser:
134+
`AMQPSession` works with WebSocket URLs out of the box — pass a `ws://` or `wss://` URL and transport is chosen automatically:
135+
136+
```javascript
137+
import { AMQPSession } from "@cloudamqp/amqp-client"
138+
139+
const session = await AMQPSession.connect("wss://my.cloudamqp.com/ws/", {
140+
vhost: "my-vhost",
141+
})
142+
const q = await session.queue("my-queue")
143+
const sub = await q.subscribe({ noAck: true }, (msg) => {
144+
console.log(msg.bodyString())
145+
})
146+
```
147+
148+
For lower-level control without reconnection, use `AMQPWebSocketClient` directly:
178149

179150
```html
180151
<!DOCTYPE html>
@@ -195,9 +166,9 @@ Using AMQP over WebSockets in a browser:
195166
const conn = await amqp.connect()
196167
const ch = await conn.channel()
197168
attachPublish(ch)
198-
const q = await ch.queue("")
199-
await q.bind("amq.fanout")
200-
const consumer = await q.subscribe({ noAck: false }, (msg) => {
169+
const q = await ch.queueDeclare("")
170+
await ch.queueBind(q.name, "amq.fanout", "")
171+
const consumer = await ch.basicConsume(q.name, { noAck: false }, (msg) => {
201172
console.log(msg)
202173
textarea.value += msg.bodyToString() + "\n"
203174
msg.ack()

src/amqp-channel.ts

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { AMQPError } from "./amqp-error.js"
22
import * as AMQPFrame from "./amqp-frame.js"
33
import { AMQPView } from "./amqp-view.js"
4-
import { AMQPQueue } from "./amqp-queue.js"
54
import { AMQPConsumer, AMQPGeneratorConsumer } from "./amqp-consumer.js"
65
import type { AMQPMessage } from "./amqp-message.js"
76
import type { AMQPBaseClient } from "./amqp-base-client.js"
@@ -56,21 +55,6 @@ export class AMQPChannel {
5655
return this.sendRpc(channelOpen)
5756
}
5857

59-
/**
60-
* Declare a queue and return an AMQPQueue instance.
61-
*/
62-
queue(
63-
name = "",
64-
{ passive = false, durable = name !== "", autoDelete = name === "", exclusive = name === "" }: QueueParams = {},
65-
args = {},
66-
): Promise<AMQPQueue> {
67-
return new Promise((resolve, reject) => {
68-
this.queueDeclare(name, { passive, durable, autoDelete, exclusive }, args)
69-
.then(({ name }) => resolve(new AMQPQueue(this, name)))
70-
.catch(reject)
71-
})
72-
}
73-
7458
/**
7559
* Alias for basicQos
7660
* @param prefetchCount - max inflight messages

src/amqp-client.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
export { AMQPClient } from "./amqp-socket-client.js"
22
export { AMQPChannel } from "./amqp-channel.js"
3-
export { AMQPQueue } from "./amqp-queue.js"
43
export { AMQPConsumer } from "./amqp-consumer.js"
54
export { AMQPError } from "./amqp-error.js"
65
export { AMQPMessage } from "./amqp-message.js"

0 commit comments

Comments
 (0)