Skip to content

Commit 6a21f51

Browse files
committed
test: add session tests; docs: update README and CHANGELOG
1 parent 2a1ac2e commit 6a21f51

3 files changed

Lines changed: 374 additions & 3 deletions

File tree

README.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,90 @@ async function run() {
8484
run()
8585
```
8686

87+
### Automatic Reconnection
88+
89+
Use `AMQPSession.connect(url, options)` to get a session that manages reconnection and consumer recovery. The transport is chosen from the URL scheme (`amqp://` / `amqps://` → TCP socket; `ws://` / `wss://` → WebSocket):
90+
91+
```javascript
92+
import { AMQPSession } from "@cloudamqp/amqp-client"
93+
94+
async function run() {
95+
// connect() picks the right transport and returns a ready session
96+
const session = await AMQPSession.connect("amqp://localhost", {
97+
reconnectInterval: 1000, // Initial delay before reconnecting (ms)
98+
maxReconnectInterval: 30000, // Maximum delay between attempts (ms)
99+
backoffMultiplier: 2, // Exponential backoff multiplier
100+
maxRetries: 0, // 0 = infinite retries
101+
})
102+
103+
// Set up event callbacks on the session
104+
session.onconnect = () => console.log("Reconnected!")
105+
session.onfailed = (err) => console.log("Failed to reconnect:", err?.message)
106+
107+
// Subscribe with a callback — returns an AMQPSubscription.
108+
const sub = await session.subscribe(
109+
"my-queue",
110+
{ noAck: false },
111+
async (msg) => {
112+
console.log("Received:", msg.bodyString())
113+
await msg.ack()
114+
},
115+
{
116+
prefetch: 10, // Set prefetch limit for this consumer
117+
queue: { durable: true }, // Queue declaration parameters for recovery
118+
},
119+
)
120+
121+
// sub.consumerTag and sub.channel reflect the current consumer (updated on reconnect)
122+
// To stop consuming this queue (also removes it from auto-recovery):
123+
// await sub.cancel()
124+
125+
// Subscribe without a callback — returns an AMQPGeneratorSubscription (async-iterable).
126+
// const sub = await session.subscribe("my-queue", { noAck: true })
127+
// for await (const msg of sub) {
128+
// console.log("Received:", msg.bodyString())
129+
// }
130+
// await sub.cancel()
131+
132+
// When done, stop the session (closes connection, stops reconnection)
133+
// await session.stop()
134+
}
135+
136+
run()
137+
```
138+
139+
#### Two APIs for Different Use Cases
140+
141+
This library provides two APIs that coexist:
142+
143+
1. **Low-level API**: `client.connect()``channel()``queue.subscribe()`
144+
- Full control over channels and resources
145+
- No automatic reconnection — you handle connection failures
146+
- Use when you need fine-grained control
147+
148+
2. **High-level API**: `AMQPSession.connect()``session.subscribe()`
149+
- Automatic reconnection and consumer recovery
150+
- Consumer objects stay valid across reconnections
151+
- Use for convenience when you don't need channel-level control
152+
153+
#### Key Features
154+
155+
- **Automatic reconnection**: Reconnects automatically when the connection is lost
156+
- **Exponential backoff**: Configurable delays between reconnection attempts
157+
- **Consumer recovery**: Consumers registered via `session.subscribe()` are automatically re-established after reconnection
158+
- **Event callbacks**: Hooks for connection state changes (`onconnect`, `onfailed`)
159+
- **Prefetch control**: Set per-consumer prefetch limits
160+
161+
#### Reconnection Behavior
162+
163+
When a connection is lost:
164+
165+
- The session reconnects automatically with exponential backoff
166+
- After a successful reconnect, consumers registered via `session.subscribe()` are re-established, then `onconnect` fires
167+
- Messages delivered but not acknowledged before disconnection are redelivered by the broker
168+
- `onfailed` fires and reconnection stops if `maxRetries` is exceeded
169+
- For disconnect detection, set `client.ondisconnect` directly
170+
87171
## WebSockets
88172
89173
This library can be used in the browser to access an AMQP server over WebSockets. For servers such as RabbitMQ that doesn't support WebSockets natively a [WebSocket TCP relay](https://github.com/cloudamqp/websocket-tcp-relay/) have to be used as a proxy. All CloudAMQP servers has this proxy configured. More information can be found [in this blog post](https://www.cloudamqp.com/blog/cloudamqp-releases-amqp-websockets.html).

test/amqp-session.ts

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
import { expect, test, vi, beforeEach } from "vitest"
2+
import { AMQPClient } from "../src/amqp-socket-client.js"
3+
import { AMQPSession } from "../src/amqp-session.js"
4+
5+
beforeEach(() => {
6+
expect.hasAssertions()
7+
})
8+
9+
test("AMQPSession.connect() returns a session", async () => {
10+
const session = await AMQPSession.connect("amqp://127.0.0.1", { reconnectInterval: 500 })
11+
expect(session).toBeInstanceOf(AMQPSession)
12+
13+
await session.stop()
14+
})
15+
16+
test("session.subscribe delivers messages via callback", async () => {
17+
const session = await AMQPSession.connect("amqp://127.0.0.1")
18+
19+
const queueName = "test-queue-" + Math.random()
20+
const ch = await session.client.channel()
21+
await ch.queue(queueName, { durable: false, autoDelete: true })
22+
23+
let messageReceived = false
24+
const sub = await session.subscribe(queueName, { noAck: true }, async () => {
25+
messageReceived = true
26+
})
27+
28+
const q2 = await ch.queue(queueName, { passive: true })
29+
await q2.publish("test")
30+
await new Promise((resolve) => setTimeout(resolve, 100))
31+
32+
expect(messageReceived).toBe(true)
33+
await sub.cancel()
34+
await session.stop()
35+
})
36+
37+
test("session.subscribe accepts an AMQPQueue object", async () => {
38+
const session = await AMQPSession.connect("amqp://127.0.0.1")
39+
40+
const ch = await session.client.channel()
41+
const q = await ch.queue("", { durable: false, autoDelete: true })
42+
43+
let messageReceived = false
44+
const sub = await session.subscribe(q, { noAck: true }, async () => {
45+
messageReceived = true
46+
})
47+
48+
await q.publish("test")
49+
await new Promise((resolve) => setTimeout(resolve, 100))
50+
51+
expect(messageReceived).toBe(true)
52+
await sub.cancel()
53+
await session.stop()
54+
})
55+
56+
test("subscription.cancel() removes it from session recovery", async () => {
57+
const session = await AMQPSession.connect("amqp://127.0.0.1")
58+
59+
const ch = await session.client.channel()
60+
const q = await ch.queue("")
61+
const sub = await session.subscribe(q.name, { noAck: true }, () => {})
62+
63+
await expect(sub.cancel()).resolves.toBeUndefined()
64+
65+
await session.stop()
66+
})
67+
68+
test("session.subscribe supports prefetch option", async () => {
69+
const session = await AMQPSession.connect("amqp://127.0.0.1")
70+
71+
const queueName = "test-prefetch-queue-" + Math.random()
72+
const ch = await session.client.channel()
73+
await ch.queue(queueName, { durable: false, autoDelete: true })
74+
75+
let messagesReceived = 0
76+
const sub = await session.subscribe(
77+
queueName,
78+
{ noAck: false },
79+
async (msg) => {
80+
messagesReceived++
81+
if (messagesReceived === 2) {
82+
await msg.ack()
83+
}
84+
},
85+
{ prefetch: 1 },
86+
)
87+
88+
const q2 = await ch.queue(queueName, { passive: true })
89+
await q2.publish("message 1")
90+
await q2.publish("message 2")
91+
await q2.publish("message 3")
92+
93+
await new Promise((resolve) => setTimeout(resolve, 200))
94+
95+
// With prefetch=1, only 1 message is delivered until acked
96+
expect(messagesReceived).toBe(1)
97+
98+
await sub.cancel()
99+
await session.stop()
100+
})
101+
102+
test("session.subscribe yields messages via async generator", async () => {
103+
const session = await AMQPSession.connect("amqp://127.0.0.1")
104+
105+
const queueName = "test-generator-" + Math.random()
106+
const ch = await session.client.channel()
107+
await ch.queue(queueName, { durable: false, autoDelete: true })
108+
109+
const sub = await session.subscribe(queueName, { noAck: true })
110+
111+
const q2 = await ch.queue(queueName, { passive: true })
112+
await q2.publish("msg1")
113+
await q2.publish("msg2")
114+
115+
const received: string[] = []
116+
for await (const msg of sub) {
117+
received.push(msg.bodyString()!)
118+
if (received.length >= 2) break
119+
}
120+
121+
expect(received).toEqual(["msg1", "msg2"])
122+
await sub.cancel()
123+
await session.stop()
124+
})
125+
126+
test("session.onfailed fires when maxRetries exhausted", async () => {
127+
const session = await AMQPSession.connect("amqp://127.0.0.1", {
128+
reconnectInterval: 50,
129+
maxRetries: 2,
130+
})
131+
132+
const onfailed = vi.fn()
133+
session.onfailed = onfailed
134+
135+
const connectSpy = vi.spyOn(session.client, "connect").mockRejectedValue(new Error("forced failure"))
136+
137+
;(session.client as AMQPClient).socket?.destroy()
138+
139+
// Wait long enough for 2 retries + backoff (50ms + 100ms) with buffer
140+
await new Promise((resolve) => setTimeout(resolve, 500))
141+
142+
expect(onfailed).toHaveBeenCalledTimes(1)
143+
expect(onfailed.mock.calls[0]?.[0]).toBeInstanceOf(Error)
144+
expect(connectSpy).toHaveBeenCalledTimes(2)
145+
146+
connectSpy.mockRestore()
147+
await session.stop()
148+
})
149+
150+
test("session.onconnect fires after successful reconnection", async () => {
151+
const session = await AMQPSession.connect("amqp://127.0.0.1", {
152+
reconnectInterval: 50,
153+
maxRetries: 5,
154+
})
155+
156+
let reconnectCount = 0
157+
const reconnected = new Promise<void>((resolve, reject) => {
158+
const timeout = setTimeout(() => reject(new Error("onconnect did not fire within 5s")), 5_000)
159+
session.onconnect = () => {
160+
clearTimeout(timeout)
161+
reconnectCount++
162+
resolve()
163+
}
164+
})
165+
166+
;(session.client as AMQPClient).socket?.destroy()
167+
168+
await reconnected
169+
expect(reconnectCount).toBe(1)
170+
171+
await session.stop()
172+
})
173+
174+
test("subscription recovers and receives messages after reconnection", async () => {
175+
const session = await AMQPSession.connect("amqp://127.0.0.1", {
176+
reconnectInterval: 50,
177+
maxRetries: 5,
178+
})
179+
180+
const queueName = "test-recovery-" + Math.random()
181+
182+
// Pre-declare the queue so it survives reconnection (non-exclusive, autoDelete: false)
183+
const setupCh = await session.client.channel()
184+
await setupCh.queue(queueName, { durable: false, autoDelete: false })
185+
await setupCh.close()
186+
187+
const received: string[] = []
188+
const sub = await session.subscribe(
189+
queueName,
190+
{ noAck: true },
191+
(msg) => {
192+
received.push(msg.bodyString() || "")
193+
},
194+
{ queue: { durable: false, autoDelete: false } },
195+
)
196+
197+
// Publish a message before disconnect
198+
const ch1 = await session.client.channel()
199+
const q1 = await ch1.queue(queueName, { passive: true })
200+
await q1.publish("before-disconnect")
201+
await new Promise((resolve) => setTimeout(resolve, 100))
202+
203+
const reconnected = new Promise<void>((resolve, reject) => {
204+
const timeout = setTimeout(() => reject(new Error("Reconnection timed out")), 10_000)
205+
session.onconnect = () => {
206+
clearTimeout(timeout)
207+
resolve()
208+
}
209+
})
210+
211+
;(session.client as AMQPClient).socket?.destroy()
212+
213+
await reconnected
214+
215+
// Publish a message after reconnection
216+
const ch2 = await session.client.channel()
217+
const q2 = await ch2.queue(queueName, { passive: true })
218+
await q2.publish("after-reconnect")
219+
await new Promise((resolve) => setTimeout(resolve, 200))
220+
221+
expect(received).toContain("before-disconnect")
222+
expect(received).toContain("after-reconnect")
223+
224+
// Verify the subscription object is the same reference and channel is live
225+
expect(sub.channel.closed).toBe(false)
226+
227+
await sub.cancel()
228+
229+
const cleanCh = await session.client.channel()
230+
await cleanCh.queueDelete(queueName)
231+
232+
await session.stop()
233+
})
234+
235+
test("session.stop() during reconnection stops the loop", async () => {
236+
const session = await AMQPSession.connect("amqp://127.0.0.1", {
237+
reconnectInterval: 200,
238+
maxRetries: 10,
239+
})
240+
241+
const connectSpy = vi.spyOn(session.client, "connect").mockRejectedValue(new Error("forced failure"))
242+
243+
;(session.client as AMQPClient).socket?.destroy()
244+
245+
// Stop before the first reconnection attempt fires
246+
await new Promise((resolve) => setTimeout(resolve, 50))
247+
await session.stop()
248+
249+
// Confirm no further reconnection attempts
250+
await new Promise((resolve) => setTimeout(resolve, 500))
251+
252+
expect(connectSpy.mock.calls.length).toBeLessThanOrEqual(1)
253+
connectSpy.mockRestore()
254+
})
255+
256+
test("session.stop() when already disconnected does not throw", async () => {
257+
const session = await AMQPSession.connect("amqp://127.0.0.1", { maxRetries: 1 })
258+
259+
;(session.client as AMQPClient).socket?.destroy()
260+
await new Promise((resolve) => setTimeout(resolve, 50))
261+
262+
await expect(session.stop()).resolves.toBeUndefined()
263+
})

0 commit comments

Comments
 (0)