|
| 1 | +import type { AMQPBaseClient } from "./amqp-base-client.js" |
| 2 | +import type { ConsumeParams } from "./amqp-channel.js" |
| 3 | +import type { AMQPMessage } from "./amqp-message.js" |
| 4 | +import type { AMQPQueue } from "./amqp-queue.js" |
| 5 | +import { AMQPSubscription, AMQPGeneratorSubscription } from "./amqp-subscription.js" |
| 6 | +import type { ConsumerDefinition, SubscribeOptions } from "./amqp-subscription.js" |
| 7 | +import type { AMQPTlsOptions } from "./amqp-tls-options.js" |
| 8 | +import type { Logger } from "./types.js" |
| 9 | + |
| 10 | +/** |
| 11 | + * Options for {@link AMQPSession.connect}. |
| 12 | + */ |
| 13 | +export interface AMQPSessionOptions { |
| 14 | + /** Initial delay in milliseconds before reconnecting (default: 1000) */ |
| 15 | + reconnectInterval?: number |
| 16 | + /** Maximum delay in milliseconds between reconnection attempts (default: 30000) */ |
| 17 | + maxReconnectInterval?: number |
| 18 | + /** Multiplier for exponential backoff (default: 2) */ |
| 19 | + backoffMultiplier?: number |
| 20 | + /** Maximum number of reconnection attempts — 0 means infinite (default: 0) */ |
| 21 | + maxRetries?: number |
| 22 | + /** TLS options — only used when connecting via amqp/amqps */ |
| 23 | + tlsOptions?: AMQPTlsOptions |
| 24 | + /** Logger instance. Pass `null` to disable logging explicitly. */ |
| 25 | + logger?: Logger | null |
| 26 | +} |
| 27 | + |
| 28 | +export type { SubscribeOptions } from "./amqp-subscription.js" |
| 29 | + |
| 30 | +/** |
| 31 | + * High-level session with automatic reconnection and consumer recovery. |
| 32 | + * |
| 33 | + * Create via `AMQPSession.connect(url, options)`. The session owns its |
| 34 | + * underlying transport; use `session.client` only to inspect state, not |
| 35 | + * to open channels directly. |
| 36 | + */ |
| 37 | +export class AMQPSession { |
| 38 | + /** Fires after a successful (re)connection and consumer recovery */ |
| 39 | + onconnect?: () => void |
| 40 | + /** Fires when max retries are exhausted */ |
| 41 | + onfailed?: (error?: Error) => void |
| 42 | + |
| 43 | + /** |
| 44 | + * Underlying transport. Exposed for state inspection (e.g. `client.closed`) |
| 45 | + * and test access. Do not open channels on this directly, and do not |
| 46 | + * overwrite `client.ondisconnect` — the session uses it to drive reconnection. |
| 47 | + */ |
| 48 | + readonly client: AMQPBaseClient |
| 49 | + |
| 50 | + private readonly options: { |
| 51 | + reconnectInterval: number |
| 52 | + maxReconnectInterval: number |
| 53 | + backoffMultiplier: number |
| 54 | + maxRetries: number |
| 55 | + } |
| 56 | + private readonly consumers = new Set<AMQPSubscription>() |
| 57 | + private reconnectAttempts = 0 |
| 58 | + private reconnectTimer: ReturnType<typeof setTimeout> | undefined |
| 59 | + private reconnectResolve: (() => void) | undefined |
| 60 | + private reconnecting = false |
| 61 | + private stopped = false |
| 62 | + |
| 63 | + private constructor(client: AMQPBaseClient, options?: AMQPSessionOptions) { |
| 64 | + this.client = client |
| 65 | + this.options = { |
| 66 | + reconnectInterval: options?.reconnectInterval ?? 1000, |
| 67 | + maxReconnectInterval: options?.maxReconnectInterval ?? 30000, |
| 68 | + backoffMultiplier: options?.backoffMultiplier ?? 2, |
| 69 | + maxRetries: options?.maxRetries ?? 0, |
| 70 | + } |
| 71 | + this.client.ondisconnect = () => { |
| 72 | + if (!this.stopped && !this.reconnecting) { |
| 73 | + void this.reconnectLoop() |
| 74 | + } |
| 75 | + } |
| 76 | + } |
| 77 | + |
| 78 | + /** |
| 79 | + * Connect to an AMQP broker and return a session with automatic reconnection. |
| 80 | + * |
| 81 | + * The transport is chosen from the URL scheme: |
| 82 | + * - `amqp://` / `amqps://` → TCP socket (`AMQPClient`) |
| 83 | + * - `ws://` / `wss://` → WebSocket (`AMQPWebSocketClient`) |
| 84 | + */ |
| 85 | + static async connect(url: string, options?: AMQPSessionOptions): Promise<AMQPSession> { |
| 86 | + const u = new URL(url) |
| 87 | + let client: AMQPBaseClient |
| 88 | + if (u.protocol === "ws:" || u.protocol === "wss:") { |
| 89 | + const { AMQPWebSocketClient } = await import("./amqp-websocket-client.js") |
| 90 | + const vhost = decodeURIComponent(u.pathname.slice(1)) || "/" |
| 91 | + const username = decodeURIComponent(u.username) || "guest" |
| 92 | + const password = decodeURIComponent(u.password) || "guest" |
| 93 | + const wsUrl = `${u.protocol}//${u.host}${u.pathname}${u.search}` |
| 94 | + client = new AMQPWebSocketClient({ url: wsUrl, vhost, username, password, logger: options?.logger ?? null }) |
| 95 | + } else { |
| 96 | + const { AMQPClient } = await import("./amqp-socket-client.js") |
| 97 | + client = new AMQPClient(url, options?.tlsOptions, options?.logger) |
| 98 | + } |
| 99 | + await client.connect() |
| 100 | + return new AMQPSession(client, options) |
| 101 | + } |
| 102 | + |
| 103 | + /** |
| 104 | + * Subscribe to a queue with automatic consumer recovery on reconnection. |
| 105 | + * Messages will be delivered asynchronously to the callback. |
| 106 | + * @param queue - queue name or {@link AMQPQueue} object |
| 107 | + * @param consumeParams - consume parameters (noAck, exclusive, tag, args) |
| 108 | + * @param callback - called for each delivered message |
| 109 | + * @param [options] - queue declaration and prefetch settings for recovery |
| 110 | + */ |
| 111 | + async subscribe( |
| 112 | + queue: string | AMQPQueue, |
| 113 | + consumeParams: ConsumeParams, |
| 114 | + callback: (msg: AMQPMessage) => void | Promise<void>, |
| 115 | + options?: SubscribeOptions, |
| 116 | + ): Promise<AMQPSubscription> |
| 117 | + /** |
| 118 | + * Subscribe to a queue with automatic consumer recovery on reconnection. |
| 119 | + * Messages will be delivered through an async-iterable subscription that |
| 120 | + * continues yielding across reconnections. |
| 121 | + * @param queue - queue name or {@link AMQPQueue} object |
| 122 | + * @param consumeParams - consume parameters (noAck, exclusive, tag, args) |
| 123 | + * @param [options] - queue declaration and prefetch settings for recovery |
| 124 | + */ |
| 125 | + async subscribe( |
| 126 | + queue: string | AMQPQueue, |
| 127 | + consumeParams: ConsumeParams, |
| 128 | + callback?: undefined, |
| 129 | + options?: SubscribeOptions, |
| 130 | + ): Promise<AMQPGeneratorSubscription> |
| 131 | + async subscribe( |
| 132 | + queue: string | AMQPQueue, |
| 133 | + consumeParams: ConsumeParams, |
| 134 | + callback?: (msg: AMQPMessage) => void | Promise<void>, |
| 135 | + options?: SubscribeOptions, |
| 136 | + ): Promise<AMQPSubscription | AMQPGeneratorSubscription> { |
| 137 | + const def: ConsumerDefinition = { |
| 138 | + queueName: typeof queue === "string" ? queue : queue.name, |
| 139 | + consumeParams, |
| 140 | + ...(callback !== undefined && { callback }), |
| 141 | + options: options ?? {}, |
| 142 | + } |
| 143 | + |
| 144 | + const consumer = await this.bindConsumer(def) |
| 145 | + const sub = callback ? new AMQPSubscription(consumer, def) : new AMQPGeneratorSubscription(consumer, def) |
| 146 | + |
| 147 | + this.consumers.add(sub) |
| 148 | + sub.onCancel = () => { |
| 149 | + this.consumers.delete(sub) |
| 150 | + } |
| 151 | + |
| 152 | + return sub |
| 153 | + } |
| 154 | + |
| 155 | + /** |
| 156 | + * Stop the session: cancel reconnection, clear consumer tracking, |
| 157 | + * and close the underlying connection. |
| 158 | + */ |
| 159 | + async stop(reason?: string): Promise<void> { |
| 160 | + this.stopped = true |
| 161 | + this.cancelWait() |
| 162 | + const subs = [...this.consumers] |
| 163 | + this.consumers.clear() |
| 164 | + for (const sub of subs) { |
| 165 | + sub.cancel().catch(() => {}) |
| 166 | + } |
| 167 | + delete this.client.ondisconnect |
| 168 | + if (!this.client.closed) { |
| 169 | + await this.client.close(reason) |
| 170 | + } |
| 171 | + } |
| 172 | + |
| 173 | + private async reconnectLoop(): Promise<void> { |
| 174 | + if (this.reconnecting) return |
| 175 | + this.reconnecting = true |
| 176 | + |
| 177 | + while (!this.stopped) { |
| 178 | + this.reconnectAttempts++ |
| 179 | + |
| 180 | + // Give up after maxRetries |
| 181 | + if (this.options.maxRetries > 0 && this.reconnectAttempts > this.options.maxRetries) { |
| 182 | + this.stopped = true |
| 183 | + this.onfailed?.(new Error(`Max reconnection attempts (${this.options.maxRetries}) reached`)) |
| 184 | + continue |
| 185 | + } |
| 186 | + |
| 187 | + // Wait with exponential backoff |
| 188 | + const delay = Math.min( |
| 189 | + this.options.reconnectInterval * Math.pow(this.options.backoffMultiplier, this.reconnectAttempts - 1), |
| 190 | + this.options.maxReconnectInterval, |
| 191 | + ) |
| 192 | + this.client.logger?.debug(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`) |
| 193 | + await this.waitBeforeRetry(delay) |
| 194 | + if (this.stopped) continue // stop() was called during the wait |
| 195 | + |
| 196 | + // Attempt to connect — retry on failure |
| 197 | + try { |
| 198 | + await this.client.connect() |
| 199 | + } catch (err) { |
| 200 | + const error = err instanceof Error ? err : new Error(String(err)) |
| 201 | + this.client.logger?.warn("AMQP-Client reconnect error:", error.message) |
| 202 | + continue |
| 203 | + } |
| 204 | + |
| 205 | + // Re-establish consumers on the fresh connection |
| 206 | + this.reconnectAttempts = 0 |
| 207 | + await this.recoverConsumers() |
| 208 | + if (this.stopped || this.client.closed) continue // stop() called, or connection dropped during recovery |
| 209 | + |
| 210 | + this.onconnect?.() |
| 211 | + break |
| 212 | + } |
| 213 | + |
| 214 | + this.reconnecting = false |
| 215 | + } |
| 216 | + |
| 217 | + private waitBeforeRetry(ms: number): Promise<void> { |
| 218 | + return new Promise<void>((resolve) => { |
| 219 | + this.reconnectResolve = resolve |
| 220 | + this.reconnectTimer = setTimeout(resolve, ms) |
| 221 | + }) |
| 222 | + } |
| 223 | + |
| 224 | + private cancelWait(): void { |
| 225 | + clearTimeout(this.reconnectTimer) |
| 226 | + this.reconnectTimer = undefined |
| 227 | + this.reconnectResolve?.() |
| 228 | + this.reconnectResolve = undefined |
| 229 | + } |
| 230 | + |
| 231 | + private async bindConsumer(def: ConsumerDefinition) { |
| 232 | + const ch = await this.client.channel() |
| 233 | + if (def.options.prefetch !== undefined) { |
| 234 | + await ch.basicQos(def.options.prefetch) |
| 235 | + } |
| 236 | + const q = def.options.queue |
| 237 | + ? await ch.queue(def.queueName, def.options.queue, def.options.queueArgs || {}) |
| 238 | + : await ch.queue(def.queueName, { passive: true }) |
| 239 | + return def.callback ? await q.subscribe(def.consumeParams, def.callback) : await q.subscribe(def.consumeParams) |
| 240 | + } |
| 241 | + |
| 242 | + private async recoverConsumers(): Promise<void> { |
| 243 | + if (this.client.closed) return |
| 244 | + |
| 245 | + for (const sub of this.consumers) { |
| 246 | + try { |
| 247 | + const consumer = await this.bindConsumer(sub.def) |
| 248 | + sub.setConsumer(consumer) |
| 249 | + this.client.logger?.debug(`Recovered consumer for queue: ${sub.def.queueName}`) |
| 250 | + } catch (err) { |
| 251 | + const error = err instanceof Error ? err : new Error(String(err)) |
| 252 | + this.client.logger?.warn(`Failed to recover consumer for queue ${sub.def.queueName}:`, error.message) |
| 253 | + } |
| 254 | + } |
| 255 | + } |
| 256 | +} |
0 commit comments