Skip to content

feat!: add AMQPSession with reconnection and high-level queue/exchange API#186

Merged
baelter merged 1 commit intomainfrom
feat/session-publish-exchange-queue
Mar 4, 2026
Merged

feat!: add AMQPSession with reconnection and high-level queue/exchange API#186
baelter merged 1 commit intomainfrom
feat/session-publish-exchange-queue

Conversation

@baelter
Copy link
Copy Markdown
Member

@baelter baelter commented Feb 25, 2026

Summary

Introduces `AMQPSession` as the recommended high-level entry point for applications that need automatic reconnection and consumer recovery. Separates the API into two clear layers:

  • Low-level (`AMQPClient` / `AMQPWebSocketClient`): direct channel access — `queueDeclare`, `basicPublish`, `basicConsume`, etc.
  • High-level (`AMQPSession`): `queue()` / `exchange()` handles that are reconnect-safe; subscriptions survive disconnection automatically

New

  • `AMQPSession.connect(url, options?)` — picks TCP or WebSocket transport from the URL scheme; reconnects with configurable exponential backoff (`reconnectInterval`, `maxReconnectInterval`, `backoffMultiplier`, `maxRetries`)
  • `session.queue(name, params?, args?)` → `AMQPQueue` with `publish`, `subscribe`, `get`, `bind`, `unbind`, `purge`, `delete`
  • `session.exchange(name, type, params?, args?)` → `AMQPExchange` with `publish`, `bind`, `unbind`, `delete`
  • Shorthand exchange factories: `directExchange()`, `fanoutExchange()`, `topicExchange()`, `headersExchange()`
  • `AMQPSubscription` / `AMQPGeneratorSubscription` — stable consumer handles that survive reconnection; the generator variant bridges the async iterator without any caller changes
  • `QueueSubscribeParams = ConsumeParams & { prefetch? }` — single options object for subscribe; `prefetch` sets channel QoS before each consume (including after reconnect)
  • `QueuePublishOptions` / `ExchangePublishOptions` — publish options extend `AMQPProperties` with `confirm?`; `ExchangePublishOptions` adds `routingKey?`
  • `session.onconnect` / `session.onfailed` lifecycle hooks
  • `session.closed` — `true` when the underlying connection is closed
  • `session.stop()` — cancels reconnection, clears all subscriptions, closes the connection

Breaking changes

  • `publish()` takes a single options object: `publish(body, { routingKey, confirm, contentType, … })`. `publishAndForget()` is removed — use `publish(body, { confirm: false })` instead.
  • `AMQPChannel.queue()` removed — use `ch.queueDeclare()` + channel methods, or `session.queue()` for the high-level API
  • `AMQPQueue` is now session-only — no longer returned by channel methods; constructor takes `(session, name)`
  • `AMQPQueue` no longer re-exported from `AMQPClient` / `AMQPWebSocketClient`

Architecture

`AMQPSession` is a thin connection/reconnection manager. It exposes only `queue()`, `exchange()`, and the exchange shorthand factories as public API — everything else is either private or `@internal`.

Shared publish logic lives in module-level functions (`publishConfirmed` / `publishNoConfirm` in `amqp-publisher.ts`). `AMQPQueue` and `AMQPExchange` each hold their own `session` reference and call those functions directly — no inheritance.

`AMQPQueue` owns its subscriptions. It opens a dedicated channel per consumer, tracks subscriptions in a private `Set`, and has `@internal` `recover()` and `cancelAll()` methods that the session calls on reconnect and stop respectively. The session tracks queues (not individual subscriptions) and delegates recovery to each queue.

Other fixes

  • WebSocket connections now use the `vhost` session option instead of the URL path
  • Concurrent `getOpsChannel` / `getConfirmChannel` callers no longer open duplicate channels

Test plan

  • All session tests pass
  • All main tests pass
  • TypeScript compiles cleanly (`tsc --noEmit`)

@baelter baelter force-pushed the feat/session-publish-exchange-queue branch 2 times, most recently from 415dc68 to e504ec0 Compare February 25, 2026 10:49
@baelter baelter requested review from Copilot February 25, 2026 15:38

This comment was marked as outdated.

@baelter baelter marked this pull request as ready for review February 25, 2026 17:22

This comment was marked as outdated.

This comment was marked as outdated.

This comment was marked as outdated.

Comment thread test/amqp-session.ts Outdated
Comment thread test/amqp-session.ts Outdated
Comment thread test/amqp-session.ts Outdated
Comment thread src/amqp-session.ts Outdated
Comment thread test/amqp-session.ts Outdated
@baelter baelter force-pushed the feat/session-publish-exchange-queue branch from 549e2a2 to d30970e Compare February 27, 2026 14:04
@baelter baelter changed the title feat: publish, exchange, and queue convenience methods on AMQPSession feat!: add AMQPSession with reconnection and high-level queue/exchange API Feb 27, 2026
@baelter baelter requested a review from Copilot February 27, 2026 14:20

This comment was marked as outdated.

@baelter baelter force-pushed the feat/session-publish-exchange-queue branch from 3c969ec to 4b4f5b2 Compare February 27, 2026 14:41
@antondalgren

This comment was marked as outdated.

@baelter

This comment was marked as outdated.

Comment thread src/amqp-session.ts Outdated
…e API

Introduces AMQPSession as the recommended high-level entry point for
applications that need automatic reconnection and consumer recovery.

New classes and types:
- AMQPSession.connect(url, options?) — picks TCP or WebSocket transport
  from the URL scheme; reconnects with configurable exponential backoff
- AMQPQueue — reconnect-safe queue handle: publish(), subscribe(), get(),
  bind(), unbind(), purge(), delete()
- AMQPExchange — reconnect-safe exchange handle: publish(), bind(),
  unbind(), delete()
- AMQPSubscription / AMQPGeneratorSubscription — stable consumer handles
  that survive reconnection; generator variant is AsyncIterable<AMQPMessage>
- QueueSubscribeParams — ConsumeParams + prefetch? for per-consumer QoS
- QueuePublishOptions / ExchangePublishOptions — AMQPProperties extended
  with confirm? (and routingKey? for exchanges); replaces separate
  positional args and publishAndForget()
- ondisconnect hook on AMQPBaseClient (TCP and WebSocket)

Publish API: queue.publish(body, { confirm, contentType, … }) and
exchange.publish(body, { routingKey, confirm, contentType, … }).
Shared publish logic lives in module-level functions (publishConfirmed /
publishNoConfirm) — no inheritance.

Exchange shortcuts on session: directExchange(), fanoutExchange(),
topicExchange(), headersExchange().

Breaking changes (v3 → v4):
- AMQPChannel.queue() removed; use ch.queueDeclare() or session.queue()
- AMQPQueue is now session-only; no longer returned by channel methods
- AMQPQueue no longer re-exported from AMQPClient / AMQPWebSocketClient
@baelter baelter force-pushed the feat/session-publish-exchange-queue branch from 1df91ef to 3d0913f Compare March 4, 2026 09:51
@baelter baelter merged commit f03013a into main Mar 4, 2026
6 checks passed
@baelter baelter deleted the feat/session-publish-exchange-queue branch March 4, 2026 09:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants