Skip to content

Add automatic reconnection and consumer recovery#185

Merged
baelter merged 5 commits intomainfrom
copilot/add-automatic-reconnection
Feb 25, 2026
Merged

Add automatic reconnection and consumer recovery#185
baelter merged 5 commits intomainfrom
copilot/add-automatic-reconnection

Conversation

@baelter
Copy link
Copy Markdown
Member

@baelter baelter commented Feb 20, 2026

Summary

Adds AMQPSession as the high-level API, modelled after the Ruby client.
The first feature is automatic reconnection with exponential backoff and
recovery of active consumers after each reconnect.

AMQPSession — created via AMQPSession.connect(url, options):

  • Reconnects automatically on disconnect (configurable backoff, max retries)
  • Recovers all active subscriptions after each reconnect
  • onconnect fires after successful reconnection and consumer recovery
  • onfailed fires when maxRetries is exhausted
  • stop() cancels reconnection and closes cleanly

AMQPSubscription — returned by session.subscribe():

  • Stable reference across reconnections — no need to re-subscribe
  • Delivers messages via callback and/or sub.messages async generator
  • cancel() unsubscribes and removes from auto-recovery

The low-level AMQPClient / AMQPWebSocketClient / AMQPChannel API is
unchanged. A new ondisconnect hook and reconnect-safe connect() are added
to the base client to support the session layer.

Test plan

  • New and existing tests pass
  • Lint, format, and typecheck pass
  • Verify reconnection behavior against a real RabbitMQ/LavinMQ instance

Fixes #176

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a higher-level session abstraction to the AMQP client library to support automatic reconnection (with exponential backoff) and consumer recovery after connection loss, addressing the reconnect ergonomics requested in #176.

Changes:

  • Introduces AMQPSession (new src/amqp-session.ts) with reconnection loop, lifecycle callbacks, and session.subscribe() that re-establishes consumers after reconnect.
  • Adds client.start(options?) to both socket and WebSocket clients, and introduces a base-client ondisconnect hook to detect connection loss.
  • Updates consumer internals to allow in-place channel/tag swapping for recovery and adds reconnection-focused tests + README docs.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
src/amqp-session.ts New session/reconnect orchestration + consumer recovery bookkeeping.
src/amqp-socket-client.ts Adds start(), disconnect signaling, and state reset in connect().
src/amqp-websocket-client.ts Adds start(), disconnect signaling, and state reset in connect().
src/amqp-consumer.ts Enables swapping channel/tag in-place and adds cancel hook used by session.
src/amqp-base-client.ts Adds ondisconnect callback to support session reconnect detection.
src/index.ts Exports AMQPSession and ReconnectOptions from the public entrypoint.
README.md Documents the new high-level reconnection/session API and usage examples.
test/reconnect.ts Adds tests for reconnection, consumer recovery, callbacks, and backoff behavior.

Comment thread src/amqp-websocket-client.ts
Comment thread src/amqp-consumer.ts Outdated
Comment thread src/amqp-socket-client.ts
Comment thread test/reconnect.ts Outdated
Comment thread test/reconnect.ts Outdated
Comment thread test/reconnect.ts Outdated
Comment thread src/amqp-session.ts Outdated
Comment thread src/amqp-session.ts Outdated
Comment thread src/amqp-socket-client.ts
Comment thread src/amqp-websocket-client.ts
Comment thread src/amqp-session.ts
Comment thread src/amqp-session.ts
Comment thread src/amqp-session.ts Outdated
Comment thread src/amqp-socket-client.ts
@baelter baelter force-pushed the copilot/add-automatic-reconnection branch from 43ab728 to 1b40b96 Compare February 20, 2026 13:51
@baelter baelter requested a review from Copilot February 20, 2026 13:53
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 23 comments.

Comment thread src/amqp-socket-client.ts
Comment thread src/amqp-session.ts Outdated
Comment thread src/amqp-session.ts Outdated
Comment thread src/amqp-session.ts Outdated
Comment thread src/amqp-session.ts Outdated
Comment thread src/amqp-session.ts
Comment thread test/reconnect.ts Outdated
Comment thread src/amqp-socket-client.ts
Comment thread src/amqp-session.ts Outdated
Comment thread test/amqp-session.ts
@baelter baelter force-pushed the copilot/add-automatic-reconnection branch 2 times, most recently from eac4c91 to d9d1406 Compare February 20, 2026 14:13
@baelter baelter requested a review from Copilot February 20, 2026 14:15
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Comment thread src/amqp-session.ts Outdated
Comment thread test/amqp-session.ts
Comment thread src/amqp-socket-client.ts
@baelter baelter force-pushed the copilot/add-automatic-reconnection branch from d9d1406 to 0782e05 Compare February 20, 2026 14:28
@baelter baelter requested a review from Copilot February 20, 2026 14:29
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 13 comments.

Comment thread src/amqp-consumer.ts Outdated
Comment thread test/reconnect.ts Outdated
Comment thread README.md
Comment thread test/reconnect.ts Outdated
Comment thread test/reconnect.ts Outdated
Comment thread test/amqp-session.ts
Comment thread test/reconnect.ts Outdated
Comment thread test/reconnect.ts Outdated
Comment thread test/reconnect.ts Outdated
Comment thread test/reconnect.ts Outdated
@baelter baelter force-pushed the copilot/add-automatic-reconnection branch from e8702ad to 0e0eee6 Compare February 20, 2026 15:05
baelter added a commit that referenced this pull request Feb 23, 2026
P2 — fixed:
- Race condition: stop() called while client.connect() is in-flight could
  fire onconnect after the session was stopped. Guard with `if (this.stopped)`
  immediately after the connect await resolves.
- Missing test: session.subscribe(queue: AMQPQueue) overload was untested;
  add "session.subscribe accepts an AMQPQueue object".
- Flaky test: "session.onconnect fires after successful reconnection" used a
  fixed 500ms setTimeout instead of the onconnect callback as a signal;
  rewrite to use a proper promise/timeout pair like the recovery test.

P3 — fixed:
- JSDoc on session.client warns that overwriting client.ondisconnect breaks
  reconnection.
- JSDoc on subscribe options clarifies that queueArgs is only used when
  queueParams is also provided.

P3 — dismissed:
- Recovery failures keep the dead consumer in the tracking map (will retry
  on next reconnect). Expected behaviour — user can call consumer.cancel()
  to opt out. Not worth adding complexity for V1.
Comment thread src/amqp-consumer.ts
Comment thread src/amqp-session.ts
@baelter baelter force-pushed the copilot/add-automatic-reconnection branch from 75db01a to 88e4570 Compare February 23, 2026 20:11
Copy link
Copy Markdown
Contributor

@antondalgren antondalgren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small suggestion on the generator parts!

Comment thread src/amqp-session.ts Outdated
Comment thread src/amqp-subscription.ts Outdated
Comment thread src/amqp-subscription.ts Outdated
@baelter baelter force-pushed the copilot/add-automatic-reconnection branch from 76f9203 to 398e772 Compare February 25, 2026 09:46
@baelter baelter merged commit d305cd6 into main Feb 25, 2026
6 checks passed
@baelter baelter deleted the copilot/add-automatic-reconnection branch February 25, 2026 09:52
baelter added a commit that referenced this pull request Feb 27, 2026
…e API

Introduces AMQPSession as the recommended high-level entry point for
applications that need automatic reconnection and consumer recovery.

New types and classes:
- AMQPSession.connect(url, options?) — picks TCP or WebSocket transport
  from the URL scheme; reconnects with configurable exponential backoff
- AMQPQueue — session-backed handle with publish, subscribe, get, bind,
  unbind, purge, delete; all operations are reconnect-safe
- AMQPExchange — session-backed handle with publish, bind, unbind, delete
- AMQPSubscription / AMQPGeneratorSubscription — stable consumer handles
  that survive reconnection; the generator variant bridges the async
  iterator across reconnects without any caller changes
- QueueSubscribeParams — ConsumeParams & { prefetch? }; prefetch sets
  channel QoS before each consume, including after reconnect
- AMQPSessionOptions — reconnectInterval, maxReconnectInterval,
  backoffMultiplier, maxRetries, tlsOptions, vhost, logger

Breaking changes:
- AMQPChannel.queue() removed; use ch.queueDeclare() + channel methods
  or session.queue() for the high-level API
- AMQPQueue is now session-only (constructor takes session + name);
  no longer returned by channel methods
- AMQPQueue no longer re-exported from AMQPClient / AMQPWebSocketClient
- session.client is private; use session.closed to check state
- Session operation methods (subscribe, publish, publishAndForget, get,
  bind, unbind, purge, deleteQueue, deleteExchange, exchangeBind,
  exchangeUnbind) are @internal and absent from .d.ts declarations;
  use AMQPQueue / AMQPExchange handles instead

Other fixes:
- WebSocket connections now use the vhost option instead of the URL path
- Concurrent getOpsChannel / getConfirmChannel callers no longer open
  duplicate channels

Closes #185
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add automatic re-connection

3 participants