Skip to content

Commit 96ede5d

Browse files
committed
Implement proper connection loss handling for WebSocket connections
1 parent fa06e2d commit 96ede5d

2 files changed

Lines changed: 77 additions & 2 deletions

File tree

src/amqp-websocket-client.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,29 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
5959
socket.addEventListener('close', reject)
6060
socket.addEventListener('error', reject)
6161
socket.addEventListener('open', () => {
62-
socket.addEventListener('error', (ev: Event) => this.onerror(new AMQPError(ev.toString(), this)))
62+
socket.addEventListener('error', (ev: Event) => {
63+
if (!this.closed) {
64+
const err = new AMQPError(ev.toString(), this)
65+
this.closed = true
66+
// Close all channels and their consumers when there's an error
67+
this.channels.forEach((ch) => ch?.setClosed(err))
68+
this.channels = [new AMQPChannel(this, 0)]
69+
this.onerror(err)
70+
}
71+
})
6372
socket.addEventListener('close', (ev: CloseEvent) => {
6473
const clientClosed = this.closed
6574
this.closed = true
66-
if (!ev.wasClean && !clientClosed) this.onerror(new AMQPError(`connection not cleanly closed (${ev.code})`, this))
75+
if (!(ev.wasClean && clientClosed)) {
76+
const err = new AMQPError(`connection not cleanly closed (${ev.code})`, this)
77+
// Close all channels and their consumers when connection is lost
78+
this.channels.forEach((ch) => ch?.setClosed(err))
79+
this.channels = [new AMQPChannel(this, 0)]
80+
this.onerror(err)
81+
} else {
82+
this.channels.forEach((ch) => ch?.setClosed())
83+
this.channels = [new AMQPChannel(this, 0)]
84+
}
6785
})
6886
socket.send(new Uint8Array([65, 77, 81, 80, 0, 0, 9, 1]))
6987
})

test-browser/websocket.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,63 @@ test('closed socket closes client', async () => {
226226
expect(amqp.closed).toBe(true)
227227
})
228228

229+
test('connection loss closes channels and consumers', async () => {
230+
const amqp = getNewClient()
231+
const conn = await amqp.connect()
232+
const ch = await conn.channel()
233+
const q = await ch.queue("")
234+
235+
const consumer = await q.subscribe({noAck: false}, () => {})
236+
237+
// Set up error handler to track when consumer is closed
238+
const originalConsumerWait = consumer.wait()
239+
240+
const socket = amqp["socket"]
241+
assert(socket, "Socket must be created")
242+
243+
// Simulate unclean connection loss by closing the socket
244+
const closed = new Promise((resolve) => socket.addEventListener('close', resolve))
245+
socket.close()
246+
await closed
247+
248+
// Check that connection, channel, and consumer are all marked as closed
249+
expect(amqp.closed).toBe(true)
250+
expect(ch.closed).toBe(true)
251+
252+
// Consumer wait should reject with an error
253+
await expect(originalConsumerWait).rejects.toThrow()
254+
255+
// Verify that operations on closed objects throw errors
256+
await expect(ch.queue()).rejects.toThrow(/closed/)
257+
await expect(q.publish("test")).rejects.toThrow()
258+
})
259+
260+
test('connection loss triggers onerror callback', async () => {
261+
const amqp = getNewClient()
262+
const conn = await amqp.connect()
263+
264+
let errorReceived: AMQPError | null = null
265+
conn.onerror = vi.fn((err: AMQPError) => {
266+
errorReceived = err
267+
})
268+
269+
const socket = amqp["socket"]
270+
assert(socket, "Socket must be created")
271+
272+
// Simulate unclean connection loss
273+
const closed = new Promise((resolve) => socket.addEventListener('close', resolve))
274+
socket.close()
275+
await closed
276+
277+
// Check that error callback was called
278+
expect(conn.onerror).toHaveBeenCalled()
279+
expect(errorReceived).toBeTruthy()
280+
if (errorReceived) {
281+
expect((errorReceived as AMQPError).message).toMatch(/connection not cleanly closed/)
282+
}
283+
expect(amqp.closed).toBe(true)
284+
})
285+
229286
test('wait for publish confirms', async () => {
230287
const amqp = getNewClient()
231288
const conn = await amqp.connect()

0 commit comments

Comments
 (0)