Skip to content

Add automatic reconnection and consumer recovery#180

Closed
Copilot wants to merge 14 commits intomainfrom
copilot/add-automatic-reconnection
Closed

Add automatic reconnection and consumer recovery#180
Copilot wants to merge 14 commits intomainfrom
copilot/add-automatic-reconnection

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Nov 28, 2025

Summary

  • Add client.start(options?) to both AMQPClient and AMQPWebSocketClient, returning an AMQPSession that manages automatic reconnection with exponential backoff
  • Add session.subscribe() for consumer registration with automatic recovery after reconnection — consumer objects stay valid across reconnections (channel and tag are swapped in-place)
  • Add lifecycle callbacks on session: onconnect, ondisconnect, onreconnecting, onfailed
  • Add ondisconnect callback to AMQPBaseClient so the session layer can detect connection loss
  • Reset connection state (framePos, frameSize, channels) in connect() so re-calling it on an existing client works correctly

New files

  • src/amqp-session.tsAMQPSession class and ReconnectOptions interface

Modified files

  • src/amqp-base-client.ts — add ondisconnect callback
  • src/amqp-socket-client.ts — add start(), fire ondisconnect on close, reset state in connect()
  • src/amqp-websocket-client.ts — add start(), fire ondisconnect on error/close, reset state in connect()
  • src/amqp-consumer.ts — make channel/tag swappable via _update() for reconnection recovery; add _onCancel hook
  • src/index.ts — export AMQPSession and ReconnectOptions
  • README.md — document the high-level API with examples
  • test/reconnect.ts — tests for reconnection, consumer recovery, backoff, max retries, and session lifecycle

Test plan

  • Existing tests pass (npm test)
  • Lint, format, and typecheck pass
  • Verify reconnection behavior against a real RabbitMQ instance (restart broker mid-session)

Fixes #176

Copilot AI and others added 4 commits November 28, 2025 13:01
Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
…ecovery

Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
…mprove queue recovery

Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
Copilot AI changed the title [WIP] Add automatic re-connection logic for clients Add automatic reconnection with AMQPReconnectingClient Nov 28, 2025
Copilot AI requested a review from baelter November 28, 2025 13:22
Copy link
Copy Markdown
Member

@baelter baelter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot I dont like that this is separated out to AMQPReconnectingClient, implement it into the "default" client. We can do breaking changes and a new major release to ship this.

Copilot AI and others added 2 commits November 28, 2025 13:43
…ectingClient

Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
…k order

Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
Copy link
Copy Markdown
Contributor Author

Copilot AI commented Nov 28, 2025

@copilot I dont like that this is separated out to AMQPReconnectingClient, implement it into the "default" client. We can do breaking changes and a new major release to ship this.

Integrated reconnection directly into AMQPClient and AMQPWebSocketClient in commit 824f876. The AMQPReconnectingClient wrapper class has been removed.

Reconnection is now configured via the constructor:

const client = new AMQPClient("amqp://localhost", undefined, undefined, {
  reconnectInterval: 1000,
  maxRetries: 0
})

Use client.subscribe() for automatic consumer recovery after reconnection.

Copilot AI requested a review from baelter November 28, 2025 13:50
@anti-held-333
Copy link
Copy Markdown

anti-held-333 commented Nov 29, 2025

Hey I looked at the proposed code and found the following issues/review:

  • api is different now. The client provides the subscribe method.
    • Default approach is client -> channel -> queue -> subscribe or client -> channel -> basic-consume
  • the subscribe method creates a channel per subscription and the user has no control of which channel is used for subscription
  • reconnect logic it self seems ok (all relevant options (intervals, backoff, ...) are implemented)

In my opinion a channel should maintain the subscriptions. If the client reconnects, the client should automatically setup the previously used channels. This could be implemented as follows:

  • A channel should implement a copy which returns all subscription callbacks.
  • If connection is dropped -> trigger recreation of existing channels (and copy callbacks of the old elements) - use the channel id as base.
  • The returned channel by the client should be like a proxy -> pointing to the actual "online" channel in the client
    • This allows e.g. storing a returned channel in variable.
    • Same should be the case for queues/consumers/(generators?) created by a channel

Open to that discussion:

  • How to deal with ack-messages? messages can only be acknowledged on a specific channel.
  • Subscribing to streams should although be handled differently. (perhaps the last processed offset should be stored, in the case of a reconnect use the last offset. otherwise message will be received multiple times.
    • A simple approach would to change all timebases and offset based offsets to a offset during resubscribing.

@baelter
Copy link
Copy Markdown
Member

baelter commented Dec 5, 2025

@anti-held-333 Thanks for the review! Some clarifications:

The different API is intentional - we're adding a high-level API alongside the existing low-level one, similar to https://github.com/cloudamqp/amqp-client.rb:

  • Low-level (unchanged): client → channel → queue → subscribe - full control, no auto-reconnect
  • High-level (new): client.subscribe() - convenience API with auto-reconnect

Both APIs coexist. The existing channel-based API is untouched.

Your Concerns

  1. Channel per subscription - I dont think this is an issue. Clients normally don't have 1000s of consumers in a single process. If you do you have the low level api to work with. Or we can add pooling or something in the future.

  2. No prefetch control
    Valid. Should add: client.subscribe(queue, params, callback, { prefetch: 10 })

  3. Acknowledgments
    Correct - acks during reconnection fail. Unacked messages get redelivered (standard AMQP). Needs better docs and error handling.

  4. Stream offsets
    Good point. Not currently handled.

Does this address your concerns?

- Add prefetch parameter to client.subscribe() options
- Store prefetch value in ConsumerDefinition for recovery
- Update README with prefetch example and dual-API explanation
- Document reconnection behavior and limitations
- Add tests for prefetch functionality with both callback and AsyncGenerator consumers
When close() was called while scheduleReconnect() was waiting for a
reconnection delay, the setTimeout would be cleared but the Promise
would never resolve, causing the async function to hang indefinitely.

This change tracks the promise resolve callback and explicitly calls
it when close() is invoked, ensuring proper cleanup and allowing tests
to complete normally.

Fixes the issue where test suites would hang forever waiting for
reconnection timers to resolve.
@anti-held-333
Copy link
Copy Markdown

anti-held-333 commented Dec 8, 2025

I summarize my thoughts on that topic:

Perhaps it makes sense to use internally an topology description like:

export interface TopologyDefinition {
  channels?: {
    name: string; // Channel name/identifier (user-friendly)
    prefetch?: number; // QoS prefetch limit
  }[],
  exchanges?: {
    name: string;
    type: "direct" | "topic" | "fanout" | "headers";
  }[],
  queues?: {
    type: "queue";
    channel?: string;
    durable?: boolean;
    exclusive?: boolean;
    autoDelete?: boolean;
    bindings: {
       exchange: string;
       routingKey?: string;
       args?: Record<string, any>;
    }[]
  }[];
  streams?: {
    // See queues but with x-args
    // ...
  }[];
  subscriptions?: {
    id: string,
    queue: string,
    handler: (...args) => void | Promise<void>
  }
}

Using fallbacks it would be possible to provide the approach like described by you.

  • It would require to add interfaces like: addChannel(name: string, prefetch: number) => ..., addQueue(...)
    • These would perform the required calls like creating a channel object etc. and manipulate the topology object.
  • Functions like addQueue could reference to a channel created with addChannel
  • That type of interface should make it possible to use the simple approach like addSubscription("hello", "hello_world_queue", (...args) => console.log(args)); without defining a channel etc! A power user should be able to call addChannel("demo", ...) and addQueue("hello_world", channel="demo") before to add some specifics (or define a custom channel to use). (I know it's not correct ts-syntax but I guess you get the point!)
  • Subscriptions using a function like addSubscription could store the last offset.

@baelter baelter changed the title Add automatic reconnection with AMQPReconnectingClient Add automatic reconnection high level client Dec 11, 2025
@baelter
Copy link
Copy Markdown
Member

baelter commented Dec 12, 2025

@anti-held-333 Thanks for the proposal, we will consider it

Reconnection was enabled by default with infinite retries, causing tests
to hang indefinitely when sockets were destroyed without calling close().

Added reconnectEnabled flag that's only true when reconnect options are
explicitly provided. This makes reconnection opt-in and maintains backward
compatibility for users who don't need automatic reconnection.
Extract reconnection, consumer tracking, and lifecycle callbacks from
AMQPBaseClient into a new AMQPSession class. The protocol layer
(AMQPBaseClient) is now pure AMQP framing with a simple ondisconnect
hook. The high-level API lives in AMQPSession, created via
client.start(options).

AMQPConsumer is now updatable: _update() swaps channel/tag in-place
after reconnection so the user's reference stays valid. _onCancel hook
lets session.subscribe() consumers remove themselves from auto-recovery
when cancelled.

Remove AMQPSubscription — AMQPConsumer serves as both the low-level
and high-level consumer handle.
@baelter baelter changed the title Add automatic reconnection high level client Add automatic reconnection and consumer recovery Feb 20, 2026
@baelter baelter closed this Feb 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add automatic re-connection

3 participants