Skip to content

Commit 2a1ac2e

Browse files
committed
feat: add AMQPSession with automatic reconnection and consumer recovery
1 parent 39cdbba commit 2a1ac2e

4 files changed

Lines changed: 395 additions & 0 deletions

File tree

rollup.config.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,17 @@ export default [
44
{
55
input: "lib/mjs/amqp-websocket-client.js",
66
plugins: [sourcemaps()],
7+
// amqp-socket-client uses Node.js built-ins (net, tls, Buffer) and must
8+
// never be bundled into the browser build. It stays as an external dynamic
9+
// import that only fires for amqp:// URLs — an invalid scheme in browsers.
10+
external: (id) => id.includes("amqp-socket-client"),
711
output: {
812
file: "dist/amqp-websocket-client.mjs",
913
sourcemap: "dist/amqp-websocket-client.mjs.map",
1014
sourcemapExcludeSources: true,
15+
// Dynamic imports (amqp-session re-exports) must be inlined so the
16+
// output remains a single file compatible with output.file.
17+
inlineDynamicImports: true,
1118
},
1219
},
1320
]

src/amqp-session.ts

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
import type { AMQPBaseClient } from "./amqp-base-client.js"
2+
import type { ConsumeParams } from "./amqp-channel.js"
3+
import type { AMQPMessage } from "./amqp-message.js"
4+
import type { AMQPQueue } from "./amqp-queue.js"
5+
import { AMQPSubscription, AMQPGeneratorSubscription } from "./amqp-subscription.js"
6+
import type { ConsumerDefinition, SubscribeOptions } from "./amqp-subscription.js"
7+
import type { AMQPTlsOptions } from "./amqp-tls-options.js"
8+
import type { Logger } from "./types.js"
9+
10+
/**
11+
* Options for {@link AMQPSession.connect}.
12+
*/
13+
export interface AMQPSessionOptions {
14+
/** Initial delay in milliseconds before reconnecting (default: 1000) */
15+
reconnectInterval?: number
16+
/** Maximum delay in milliseconds between reconnection attempts (default: 30000) */
17+
maxReconnectInterval?: number
18+
/** Multiplier for exponential backoff (default: 2) */
19+
backoffMultiplier?: number
20+
/** Maximum number of reconnection attempts — 0 means infinite (default: 0) */
21+
maxRetries?: number
22+
/** TLS options — only used when connecting via amqp/amqps */
23+
tlsOptions?: AMQPTlsOptions
24+
/** Logger instance. Pass `null` to disable logging explicitly. */
25+
logger?: Logger | null
26+
}
27+
28+
export type { SubscribeOptions } from "./amqp-subscription.js"
29+
30+
/**
31+
* High-level session with automatic reconnection and consumer recovery.
32+
*
33+
* Create via `AMQPSession.connect(url, options)`. The session owns its
34+
* underlying transport; use `session.client` only to inspect state, not
35+
* to open channels directly.
36+
*/
37+
export class AMQPSession {
38+
/** Fires after a successful (re)connection and consumer recovery */
39+
onconnect?: () => void
40+
/** Fires when max retries are exhausted */
41+
onfailed?: (error?: Error) => void
42+
43+
/**
44+
* Underlying transport. Exposed for state inspection (e.g. `client.closed`)
45+
* and test access. Do not open channels on this directly, and do not
46+
* overwrite `client.ondisconnect` — the session uses it to drive reconnection.
47+
*/
48+
readonly client: AMQPBaseClient
49+
50+
private readonly options: {
51+
reconnectInterval: number
52+
maxReconnectInterval: number
53+
backoffMultiplier: number
54+
maxRetries: number
55+
}
56+
private readonly consumers = new Set<AMQPSubscription>()
57+
private reconnectAttempts = 0
58+
private reconnectTimer: ReturnType<typeof setTimeout> | undefined
59+
private reconnectResolve: (() => void) | undefined
60+
private reconnecting = false
61+
private stopped = false
62+
63+
private constructor(client: AMQPBaseClient, options?: AMQPSessionOptions) {
64+
this.client = client
65+
this.options = {
66+
reconnectInterval: options?.reconnectInterval ?? 1000,
67+
maxReconnectInterval: options?.maxReconnectInterval ?? 30000,
68+
backoffMultiplier: options?.backoffMultiplier ?? 2,
69+
maxRetries: options?.maxRetries ?? 0,
70+
}
71+
this.client.ondisconnect = () => {
72+
if (!this.stopped && !this.reconnecting) {
73+
void this.reconnectLoop()
74+
}
75+
}
76+
}
77+
78+
/**
79+
* Connect to an AMQP broker and return a session with automatic reconnection.
80+
*
81+
* The transport is chosen from the URL scheme:
82+
* - `amqp://` / `amqps://` → TCP socket (`AMQPClient`)
83+
* - `ws://` / `wss://` → WebSocket (`AMQPWebSocketClient`)
84+
*/
85+
static async connect(url: string, options?: AMQPSessionOptions): Promise<AMQPSession> {
86+
const u = new URL(url)
87+
let client: AMQPBaseClient
88+
if (u.protocol === "ws:" || u.protocol === "wss:") {
89+
const { AMQPWebSocketClient } = await import("./amqp-websocket-client.js")
90+
const vhost = decodeURIComponent(u.pathname.slice(1)) || "/"
91+
const username = decodeURIComponent(u.username) || "guest"
92+
const password = decodeURIComponent(u.password) || "guest"
93+
const wsUrl = `${u.protocol}//${u.host}${u.pathname}${u.search}`
94+
client = new AMQPWebSocketClient({ url: wsUrl, vhost, username, password, logger: options?.logger ?? null })
95+
} else {
96+
const { AMQPClient } = await import("./amqp-socket-client.js")
97+
client = new AMQPClient(url, options?.tlsOptions, options?.logger)
98+
}
99+
await client.connect()
100+
return new AMQPSession(client, options)
101+
}
102+
103+
/**
104+
* Subscribe to a queue with automatic consumer recovery on reconnection.
105+
* Messages will be delivered asynchronously to the callback.
106+
* @param queue - queue name or {@link AMQPQueue} object
107+
* @param consumeParams - consume parameters (noAck, exclusive, tag, args)
108+
* @param callback - called for each delivered message
109+
* @param [options] - queue declaration and prefetch settings for recovery
110+
*/
111+
async subscribe(
112+
queue: string | AMQPQueue,
113+
consumeParams: ConsumeParams,
114+
callback: (msg: AMQPMessage) => void | Promise<void>,
115+
options?: SubscribeOptions,
116+
): Promise<AMQPSubscription>
117+
/**
118+
* Subscribe to a queue with automatic consumer recovery on reconnection.
119+
* Messages will be delivered through an async-iterable subscription that
120+
* continues yielding across reconnections.
121+
* @param queue - queue name or {@link AMQPQueue} object
122+
* @param consumeParams - consume parameters (noAck, exclusive, tag, args)
123+
* @param [options] - queue declaration and prefetch settings for recovery
124+
*/
125+
async subscribe(
126+
queue: string | AMQPQueue,
127+
consumeParams: ConsumeParams,
128+
callback?: undefined,
129+
options?: SubscribeOptions,
130+
): Promise<AMQPGeneratorSubscription>
131+
async subscribe(
132+
queue: string | AMQPQueue,
133+
consumeParams: ConsumeParams,
134+
callback?: (msg: AMQPMessage) => void | Promise<void>,
135+
options?: SubscribeOptions,
136+
): Promise<AMQPSubscription | AMQPGeneratorSubscription> {
137+
const def: ConsumerDefinition = {
138+
queueName: typeof queue === "string" ? queue : queue.name,
139+
consumeParams,
140+
...(callback !== undefined && { callback }),
141+
options: options ?? {},
142+
}
143+
144+
const consumer = await this.bindConsumer(def)
145+
const sub = callback ? new AMQPSubscription(consumer, def) : new AMQPGeneratorSubscription(consumer, def)
146+
147+
this.consumers.add(sub)
148+
sub.onCancel = () => {
149+
this.consumers.delete(sub)
150+
}
151+
152+
return sub
153+
}
154+
155+
/**
156+
* Stop the session: cancel reconnection, clear consumer tracking,
157+
* and close the underlying connection.
158+
*/
159+
async stop(reason?: string): Promise<void> {
160+
this.stopped = true
161+
this.cancelWait()
162+
const subs = [...this.consumers]
163+
this.consumers.clear()
164+
for (const sub of subs) {
165+
sub.cancel().catch(() => {})
166+
}
167+
delete this.client.ondisconnect
168+
if (!this.client.closed) {
169+
await this.client.close(reason)
170+
}
171+
}
172+
173+
private async reconnectLoop(): Promise<void> {
174+
if (this.reconnecting) return
175+
this.reconnecting = true
176+
177+
while (!this.stopped) {
178+
this.reconnectAttempts++
179+
180+
// Give up after maxRetries
181+
if (this.options.maxRetries > 0 && this.reconnectAttempts > this.options.maxRetries) {
182+
this.stopped = true
183+
this.onfailed?.(new Error(`Max reconnection attempts (${this.options.maxRetries}) reached`))
184+
continue
185+
}
186+
187+
// Wait with exponential backoff
188+
const delay = Math.min(
189+
this.options.reconnectInterval * Math.pow(this.options.backoffMultiplier, this.reconnectAttempts - 1),
190+
this.options.maxReconnectInterval,
191+
)
192+
this.client.logger?.debug(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`)
193+
await this.waitBeforeRetry(delay)
194+
if (this.stopped) continue // stop() was called during the wait
195+
196+
// Attempt to connect — retry on failure
197+
try {
198+
await this.client.connect()
199+
} catch (err) {
200+
const error = err instanceof Error ? err : new Error(String(err))
201+
this.client.logger?.warn("AMQP-Client reconnect error:", error.message)
202+
continue
203+
}
204+
205+
// Re-establish consumers on the fresh connection
206+
this.reconnectAttempts = 0
207+
await this.recoverConsumers()
208+
if (this.stopped || this.client.closed) continue // stop() called, or connection dropped during recovery
209+
210+
this.onconnect?.()
211+
break
212+
}
213+
214+
this.reconnecting = false
215+
}
216+
217+
private waitBeforeRetry(ms: number): Promise<void> {
218+
return new Promise<void>((resolve) => {
219+
this.reconnectResolve = resolve
220+
this.reconnectTimer = setTimeout(resolve, ms)
221+
})
222+
}
223+
224+
private cancelWait(): void {
225+
clearTimeout(this.reconnectTimer)
226+
this.reconnectTimer = undefined
227+
this.reconnectResolve?.()
228+
this.reconnectResolve = undefined
229+
}
230+
231+
private async bindConsumer(def: ConsumerDefinition) {
232+
const ch = await this.client.channel()
233+
if (def.options.prefetch !== undefined) {
234+
await ch.basicQos(def.options.prefetch)
235+
}
236+
const q = def.options.queue
237+
? await ch.queue(def.queueName, def.options.queue, def.options.queueArgs || {})
238+
: await ch.queue(def.queueName, { passive: true })
239+
return def.callback ? await q.subscribe(def.consumeParams, def.callback) : await q.subscribe(def.consumeParams)
240+
}
241+
242+
private async recoverConsumers(): Promise<void> {
243+
if (this.client.closed) return
244+
245+
for (const sub of this.consumers) {
246+
try {
247+
const consumer = await this.bindConsumer(sub.def)
248+
sub.setConsumer(consumer)
249+
this.client.logger?.debug(`Recovered consumer for queue: ${sub.def.queueName}`)
250+
} catch (err) {
251+
const error = err instanceof Error ? err : new Error(String(err))
252+
this.client.logger?.warn(`Failed to recover consumer for queue ${sub.def.queueName}:`, error.message)
253+
}
254+
}
255+
}
256+
}

0 commit comments

Comments
 (0)