All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
AMQPRPCClient— reusable RPC client using direct reply-to for request-response patterns (#191)start()to begin listening for responses on the direct reply-to pseudo-queuecall(queue, body, options?)to publish an RPC request and await its response- Configurable per-call
timeout; automatic correlation ID tracking close()to reject pending calls and clean up the channel- Automatically recovered by
AMQPSessionon reconnect when created viasession.rpcClient()
AMQPRPCServer— RPC server that consumes from a queue and replies to each caller (#191)- Uses session-level queue subscribe for automatic consumer recovery on reconnect
- Handler receives the full
AMQPMessageand returns the response body
- Session-level RPC convenience methods (#191)
session.rpcCall(queue, body, options?)— simple one-shot RPC call (recommended for most use cases)session.rpcClient()— create a reusableAMQPRPCClientfor high-throughput scenariossession.rpcServer(queue, handler, prefetch?)— create and start anAMQPRPCServer
AMQPSession— high-level client with automatic reconnection and consumer recovery (#185, #186)AMQPSession.connect(url, options?)factory: picks TCP or WebSocket transport from the URL scheme (amqp:///amqps://→ TCP;ws:///wss://→ WebSocket)- Exponential backoff with configurable
reconnectInterval,maxReconnectInterval,backoffMultiplier, andmaxRetries session.queue(name, params?, args?)— declare and return anAMQPQueuehandlesession.exchange(name, type, params?, args?)— declare and return anAMQPExchangehandle- Shorthand exchange factories:
directExchange(),fanoutExchange(),topicExchange(),headersExchange() session.onconnect/session.onfailedlifecycle hookssession.closed—truewhen the underlying connection is closedsession.stop()— cancels reconnection, clears all subscriptions, and closes the connection
AMQPQueue— reconnect-safe queue handle returned bysession.queue(), withpublish(),subscribe(),get(),bind(),unbind(),purge(),delete()(#186)subscribe(callback)/subscribe(params, callback)— auto-acks after the callback returns; nacks and requeues on throw; callmsg.ack()/msg.nack()inside the callback to override; pass{ noAck: true }to skip acking entirely;requeueOnNackcontrols requeue behaviour on error (#189)subscribe()/subscribe(params)— async-iterator form; auto-acks the previous message when the loop advances; the last message (afterbreak) is left unacked; callmsg.ack()/msg.nack()before advancing to override; pass{ noAck: true }to skip acking (#189)- Subscriptions survive reconnection automatically; the async-iterator form continues yielding without any caller changes
AMQPExchange— reconnect-safe exchange handle returned bysession.exchange(), withpublish(),bind(),unbind(),delete()(#186)AMQPSubscription— stable consumer handle across reconnections: exposeschannel,consumerTag, andcancel()AMQPGeneratorSubscription— extendsAMQPSubscriptionwithAsyncIterable<AMQPMessage>supportQueueSubscribeParams— exported type combiningConsumeParamswithprefetch?andrequeueOnNack?(defaulttrue) (#189)QueuePublishOptions/ExchangePublishOptions— exported types for publish options; both extendAMQPPropertieswith aconfirm?flag;ExchangePublishOptionsaddsroutingKey?ondisconnecthook onAMQPBaseClient(TCP and WebSocket) — fires when the connection drops
- Breaking:
AMQPChannel.queue()removed (#186). Usech.queueDeclare()with low-level channel methods, orsession.queue()for the high-level API. See the migration guide below. - Breaking:
AMQPQueueis now a session-only class — no longer returned by channel methods, no longer accepts a channel in its constructor. (#186) - Breaking:
AMQPQueueis no longer re-exported fromAMQPClientorAMQPWebSocketClient. Import from the main package entry point instead. (#186)
The v3 AMQPQueue was tied to a single channel. In v4, AMQPQueue is a session-level handle that is reconnect-safe.
If you were using ch.queue():
-const ch = await conn.channel()
-const q = await ch.queue("my-queue")
-await q.publish("hello")
-const consumer = await q.subscribe({ noAck: false }, (msg) => msg.ack())
-const msg = await q.get()
-await q.bind("amq.topic", "routing.key")
-await q.delete()
+// Low-level (no reconnection)
+const ch = await conn.channel()
+const { name } = await ch.queueDeclare("my-queue")
+await ch.basicPublish("", name, "hello")
+const consumer = await ch.basicConsume(name, { noAck: false }, (msg) => msg.ack())
+const msg = await ch.basicGet(name)
+await ch.queueBind(name, "amq.topic", "routing.key")
+await ch.queueDelete(name)
+// High-level (automatic reconnection)
+const session = await AMQPSession.connect("amqp://localhost")
+const q = await session.queue("my-queue")
+await q.publish("hello")
+const sub = await q.subscribe({ noAck: false }, (msg) => msg.ack())
+const msg = await q.get()
+await q.bind("amq.topic", "routing.key")
+await q.delete()- Improve 'republish in consume block' test reliability - add proper message acknowledgment and increase timeout for high-volume message processing
- Make npm scripts cross-platform compatible (#179)
- Update dependencies: glob 10.4.5→10.5.0, js-yaml 4.1.0→4.1.1 (#173)
- Export AMQPGeneratorConsumer for documentation generation (#172)
- Add AsyncGenerator support to subscribe() for improved DX (#169)
- Improve connection loss handling for WebSocket connections (#152)
- Fix parallel queue binding issues (#154)
- Properly handle heartbeat timeouts (#95)
- Fix TypeScript callback types to support async callbacks (#155)
- Return publish frame buffer to pool after send (#142)
- Logger configuration improvements - console is no longer used as default logger, explicit logger parameter support added (#149)
- Internal code improvements and optimizations (#140)
- Add missing exports to WebSocket client (#147)
- Fat fingers
- Increase min
frameMaxto 8192 (8KB) for compatibility with RabbitMQ 4.1 and large JWT tokens (#134)
- Buffer all publish frames into a single huge buffer and send together
- Properly reject failed connection attempt
- TypeScript 5.7 fixes
- Web Worker compatibility
- Correct version number in
src/amqp-base-client.ts
The version number was not updated in src/amqp-base-client.ts for this release.
- Support for clients to negotiate channel-max (#86)
- Raise when WebSocket is not cleanly closed (#80)
- Make logging configurable (#79)
- Support for connection.update-secret (#77)
- Channel max 0 should be treated as "unlimited" not 0 (#86)
- Close sockets not supporting amqp protocol (#78)
- Throws and rejects with
Erroras per best practice (#81) - Clean ups (#88, #85)
- Package improvements for bundling and tree-shaking (#75)
- New overload for
AMQPWebSocketClientconstructor to allow setting optional parameters through an init object (#71)
- Call socket.destroy() when closing socket to fix intermitent condition where onerror is called when conn is closed by client (#72)
- Pass the correct array buffer to dataview when reading framesize (related to #55)
- Raise
AMQPErrorwhenchannelMaxis reached (fixes #43) - Add
Channel#onerrorcallback (fixes #40) - Correctly handle frame headers split across reads in the WebSocket client (#68, fixes #55)
- Breaking change: Removed support for end-of-life versions of Node.js. A minimum of Node.js 16 is now required. (#70)
- Custom TlsOptions can be passed to the constructor, like:
new AMQPClient(url, { cert, "", key: "", ca: "" })
- AMQPClient#onerror, will be called whenever ther connection is closed, override it to create reconnect logic.
- Export types for queue, exchange and consume parameters
- Only skip TLS certificate validation if the
insecurequery parameter is included in the URL - Use a pool of buffers so that multiple microtasks can publish simultaneously
- Don't set an IP as SNI hostname, only proper hostnames
- Decode username/password in URL properly
- Allow publishing of
nulland let AMQPMessage#body be null when a body is missing
- Heartbeat support
- Channel#closed is now a public property
- Frame errors because frame buffer was reused
- Frame errors because frame buffer was reused
- Don't depend on TextEncoder in AMQPMessage
- Explicit return types on all methods for faster typescript compilation
- No default exports, only named:
import { AMQPClient } from "@cloudamqp/amqp-client" - Much improved browser bundling support (webpack)
- Support basicCancel send from server, AMQPConsumer#wait() will throw an Error if it happens.
- Support custom frameMax values, by URL: amqp://localhost/vhost?frameMax=8192
- Websocket client now supports parsing AMQP frames split over multiple WebSocket frames (could happen with high throughput via websocket-tcp-relay).
- 67% increased publish rate, by reusing frame buffer
- Use Buffer for string encoding/decoding for >100% performance boost
- Use 4096 frameMax for smaller and faster allocations (down from 16KB)
- Reraise RangeErrors with more debug information
- Rewrite in TypeScript
- Queue purged never resolved
- Logging when connection is blocked/unblocked by server
- tls/net.socket onread is buggy in nodejs 16, revert to 'data' event for parsing frames
- nodejs version expanded to 12
- 128KB read buffer
- Avoid copying frame when whole frame is included in one read
- Static textdecoder for faster string decoding in frames
- Error if a frame was split before the first 7 bytes between two reads
- tls connections require node 14 due to tls.connect({ onread })
- Typescript defintions through jsdoc comments