Add automatic reconnection and consumer recovery#185
Merged
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a higher-level session abstraction to the AMQP client library to support automatic reconnection (with exponential backoff) and consumer recovery after connection loss, addressing the reconnect ergonomics requested in #176.
Changes:
- Introduces
AMQPSession(newsrc/amqp-session.ts) with reconnection loop, lifecycle callbacks, andsession.subscribe()that re-establishes consumers after reconnect. - Adds
client.start(options?)to both socket and WebSocket clients, and introduces a base-clientondisconnecthook to detect connection loss. - Updates consumer internals to allow in-place channel/tag swapping for recovery and adds reconnection-focused tests + README docs.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
src/amqp-session.ts |
New session/reconnect orchestration + consumer recovery bookkeeping. |
src/amqp-socket-client.ts |
Adds start(), disconnect signaling, and state reset in connect(). |
src/amqp-websocket-client.ts |
Adds start(), disconnect signaling, and state reset in connect(). |
src/amqp-consumer.ts |
Enables swapping channel/tag in-place and adds cancel hook used by session. |
src/amqp-base-client.ts |
Adds ondisconnect callback to support session reconnect detection. |
src/index.ts |
Exports AMQPSession and ReconnectOptions from the public entrypoint. |
README.md |
Documents the new high-level reconnection/session API and usage examples. |
test/reconnect.ts |
Adds tests for reconnection, consumer recovery, callbacks, and backoff behavior. |
baelter
commented
Feb 20, 2026
43ab728 to
1b40b96
Compare
eac4c91 to
d9d1406
Compare
d9d1406 to
0782e05
Compare
e8702ad to
0e0eee6
Compare
baelter
added a commit
that referenced
this pull request
Feb 23, 2026
P2 — fixed: - Race condition: stop() called while client.connect() is in-flight could fire onconnect after the session was stopped. Guard with `if (this.stopped)` immediately after the connect await resolves. - Missing test: session.subscribe(queue: AMQPQueue) overload was untested; add "session.subscribe accepts an AMQPQueue object". - Flaky test: "session.onconnect fires after successful reconnection" used a fixed 500ms setTimeout instead of the onconnect callback as a signal; rewrite to use a proper promise/timeout pair like the recovery test. P3 — fixed: - JSDoc on session.client warns that overwriting client.ondisconnect breaks reconnection. - JSDoc on subscribe options clarifies that queueArgs is only used when queueParams is also provided. P3 — dismissed: - Recovery failures keep the dead consumer in the tracking map (will retry on next reconnect). Expected behaviour — user can call consumer.cancel() to opt out. Not worth adding complexity for V1.
75db01a to
88e4570
Compare
antondalgren
approved these changes
Feb 25, 2026
Contributor
antondalgren
left a comment
There was a problem hiding this comment.
Just a small suggestion on the generator parts!
76f9203 to
398e772
Compare
baelter
added a commit
that referenced
this pull request
Feb 27, 2026
…e API
Introduces AMQPSession as the recommended high-level entry point for
applications that need automatic reconnection and consumer recovery.
New types and classes:
- AMQPSession.connect(url, options?) — picks TCP or WebSocket transport
from the URL scheme; reconnects with configurable exponential backoff
- AMQPQueue — session-backed handle with publish, subscribe, get, bind,
unbind, purge, delete; all operations are reconnect-safe
- AMQPExchange — session-backed handle with publish, bind, unbind, delete
- AMQPSubscription / AMQPGeneratorSubscription — stable consumer handles
that survive reconnection; the generator variant bridges the async
iterator across reconnects without any caller changes
- QueueSubscribeParams — ConsumeParams & { prefetch? }; prefetch sets
channel QoS before each consume, including after reconnect
- AMQPSessionOptions — reconnectInterval, maxReconnectInterval,
backoffMultiplier, maxRetries, tlsOptions, vhost, logger
Breaking changes:
- AMQPChannel.queue() removed; use ch.queueDeclare() + channel methods
or session.queue() for the high-level API
- AMQPQueue is now session-only (constructor takes session + name);
no longer returned by channel methods
- AMQPQueue no longer re-exported from AMQPClient / AMQPWebSocketClient
- session.client is private; use session.closed to check state
- Session operation methods (subscribe, publish, publishAndForget, get,
bind, unbind, purge, deleteQueue, deleteExchange, exchangeBind,
exchangeUnbind) are @internal and absent from .d.ts declarations;
use AMQPQueue / AMQPExchange handles instead
Other fixes:
- WebSocket connections now use the vhost option instead of the URL path
- Concurrent getOpsChannel / getConfirmChannel callers no longer open
duplicate channels
Closes #185
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds
AMQPSessionas the high-level API, modelled after the Ruby client.The first feature is automatic reconnection with exponential backoff and
recovery of active consumers after each reconnect.
AMQPSession— created viaAMQPSession.connect(url, options):onconnectfires after successful reconnection and consumer recoveryonfailedfires whenmaxRetriesis exhaustedstop()cancels reconnection and closes cleanlyAMQPSubscription— returned bysession.subscribe():sub.messagesasync generatorcancel()unsubscribes and removes from auto-recoveryThe low-level
AMQPClient/AMQPWebSocketClient/AMQPChannelAPI isunchanged. A new
ondisconnecthook and reconnect-safeconnect()are addedto the base client to support the session layer.
Test plan
Fixes #176