Add automatic reconnection and consumer recovery#180
Conversation
Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
…ecovery Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
…mprove queue recovery Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
…ectingClient Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
…k order Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
Integrated reconnection directly into Reconnection is now configured via the constructor: const client = new AMQPClient("amqp://localhost", undefined, undefined, {
reconnectInterval: 1000,
maxRetries: 0
})Use |
|
Hey I looked at the proposed code and found the following issues/review:
In my opinion a channel should maintain the subscriptions. If the client reconnects, the client should automatically setup the previously used channels. This could be implemented as follows:
Open to that discussion:
|
|
@anti-held-333 Thanks for the review! Some clarifications: The different API is intentional - we're adding a high-level API alongside the existing low-level one, similar to https://github.com/cloudamqp/amqp-client.rb:
Both APIs coexist. The existing channel-based API is untouched. Your Concerns
Does this address your concerns? |
- Add prefetch parameter to client.subscribe() options - Store prefetch value in ConsumerDefinition for recovery - Update README with prefetch example and dual-API explanation - Document reconnection behavior and limitations - Add tests for prefetch functionality with both callback and AsyncGenerator consumers
When close() was called while scheduleReconnect() was waiting for a reconnection delay, the setTimeout would be cleared but the Promise would never resolve, causing the async function to hang indefinitely. This change tracks the promise resolve callback and explicitly calls it when close() is invoked, ensuring proper cleanup and allowing tests to complete normally. Fixes the issue where test suites would hang forever waiting for reconnection timers to resolve.
|
I summarize my thoughts on that topic: Perhaps it makes sense to use internally an topology description like: Using fallbacks it would be possible to provide the approach like described by you.
|
|
@anti-held-333 Thanks for the proposal, we will consider it |
Reconnection was enabled by default with infinite retries, causing tests to hang indefinitely when sockets were destroyed without calling close(). Added reconnectEnabled flag that's only true when reconnect options are explicitly provided. This makes reconnection opt-in and maintains backward compatibility for users who don't need automatic reconnection.
Extract reconnection, consumer tracking, and lifecycle callbacks from AMQPBaseClient into a new AMQPSession class. The protocol layer (AMQPBaseClient) is now pure AMQP framing with a simple ondisconnect hook. The high-level API lives in AMQPSession, created via client.start(options). AMQPConsumer is now updatable: _update() swaps channel/tag in-place after reconnection so the user's reference stays valid. _onCancel hook lets session.subscribe() consumers remove themselves from auto-recovery when cancelled. Remove AMQPSubscription — AMQPConsumer serves as both the low-level and high-level consumer handle.
Summary
client.start(options?)to bothAMQPClientandAMQPWebSocketClient, returning anAMQPSessionthat manages automatic reconnection with exponential backoffsession.subscribe()for consumer registration with automatic recovery after reconnection — consumer objects stay valid across reconnections (channel and tag are swapped in-place)onconnect,ondisconnect,onreconnecting,onfailedondisconnectcallback toAMQPBaseClientso the session layer can detect connection lossframePos,frameSize,channels) inconnect()so re-calling it on an existing client works correctlyNew files
src/amqp-session.ts—AMQPSessionclass andReconnectOptionsinterfaceModified files
src/amqp-base-client.ts— addondisconnectcallbacksrc/amqp-socket-client.ts— addstart(), fireondisconnecton close, reset state inconnect()src/amqp-websocket-client.ts— addstart(), fireondisconnecton error/close, reset state inconnect()src/amqp-consumer.ts— makechannel/tagswappable via_update()for reconnection recovery; add_onCancelhooksrc/index.ts— exportAMQPSessionandReconnectOptionsREADME.md— document the high-level API with examplestest/reconnect.ts— tests for reconnection, consumer recovery, backoff, max retries, and session lifecycleTest plan
npm test)Fixes #176