Skip to content

Fix WebSocket connection loss detection for AMQP channels and consumers#152

Merged
baelter merged 4 commits intomainfrom
copilot/handle-websocket-connection-loss
Sep 11, 2025
Merged

Fix WebSocket connection loss detection for AMQP channels and consumers#152
baelter merged 4 commits intomainfrom
copilot/handle-websocket-connection-loss

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Sep 9, 2025

When using AMQP over WebSockets with a TCP relay, connection loss was not properly detected, leaving connections, channels, and consumers in an inconsistent "not closed" state. This made it impossible for applications to detect when the underlying WebSocket connection was lost and implement proper reconnection logic.

Problem

The issue occurred when the WebSocket TCP relay process stopped or network connectivity was interrupted. While the WebSocket connection would close, the AMQP connection, channels, and consumers would remain in an "open" state, making it appear as if they were still functional when they were actually disconnected.

const amqp = new AMQPWebSocketClient('ws://localhost:15670/ws/amqp')
const conn = await amqp.connect()
const ch = await conn.channel()
const consumer = await queue.subscribe({}, handleMessage)

// TCP relay stops here...
console.log(conn.closed)     // false (incorrect!)
console.log(ch.closed)       // false (incorrect!)
// consumer.wait() never rejects

Solution

Enhanced the AMQPWebSocketClient to properly handle WebSocket connection loss by:

  1. Proper Channel Cleanup: When WebSocket close or error events occur, all active channels are now properly closed with error information
  2. State Consistency: Connection, channel, and consumer closed states are correctly updated
  3. Error Propagation: Errors are propagated through the entire hierarchy (connection → channels → consumers)
  4. Consumer Notification: Consumer wait() promises now properly reject when connection is lost

Usage

Applications can now detect connection loss in multiple ways:

// Connection-level monitoring
amqp.onerror = (error) => {
  console.log('Connection lost:', error.message)
  // Implement reconnection logic
}

// State checking
if (amqp.closed || channel.closed) {
  // Connection/channel is closed, need to reconnect
}

// Consumer monitoring
consumer.wait().catch(err => {
  console.log('Consumer lost connection:', err.message)
})

The fix ensures that when a WebSocket connection is lost:

  • connection.closed becomes true
  • All channels are marked as closed (channel.closed becomes true)
  • All consumers are notified and their wait() promises reject
  • Error callbacks are triggered at all levels
  • Subsequent operations on closed objects throw appropriate errors

Fixes #111.


✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

Copilot AI changed the title [WIP] Connection with websocket in browser : How to handle connection lost Fix WebSocket connection loss detection for AMQP channels and consumers Sep 9, 2025
Copilot AI requested a review from baelter September 9, 2025 11:38
@baelter baelter force-pushed the copilot/handle-websocket-connection-loss branch from 8624e8f to bf124af Compare September 11, 2025 07:32
@baelter baelter force-pushed the copilot/handle-websocket-connection-loss branch from 32fc0eb to 46de92f Compare September 11, 2025 08:41
@baelter baelter marked this pull request as ready for review September 11, 2025 08:41
@baelter baelter merged commit f11c940 into main Sep 11, 2025
6 checks passed
@baelter baelter deleted the copilot/handle-websocket-connection-loss branch September 11, 2025 08:47
Comment on lines +62 to +84
socket.addEventListener('error', (ev: Event) => {
if (!this.closed) {
const err = new AMQPError(ev.toString(), this)
this.closed = true
// Close all channels and their consumers when there's an error
this.channels.forEach((ch) => ch?.setClosed(err))
this.channels = [new AMQPChannel(this, 0)]
this.onerror(err)
}
})
socket.addEventListener('close', (ev: CloseEvent) => {
const clientClosed = this.closed
this.closed = true
if (!ev.wasClean && !clientClosed) this.onerror(new AMQPError(`connection not cleanly closed (${ev.code})`, this))
if (!(ev.wasClean && clientClosed)) {
const err = new AMQPError(`connection not cleanly closed (${ev.code})`, this)
// Close all channels and their consumers when connection is lost
this.channels.forEach((ch) => ch?.setClosed(err))
this.channels = [new AMQPChannel(this, 0)]
this.onerror(err)
} else {
this.channels.forEach((ch) => ch?.setClosed())
this.channels = [new AMQPChannel(this, 0)]
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little repetetive. Don't we want to do this for regular tcp clients as well? Consolidate to the base class maybe?

Comment thread vite-env.d.ts
Copy link
Copy Markdown
Contributor

@antondalgren antondalgren Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

Comment thread vitest.config.browser.ts
}
}
},
envDir: '.'
Copy link
Copy Markdown
Contributor

@antondalgren antondalgren Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Connection with websocket in browser : How to handle connection lost

3 participants