Skip to content

Commit 492c6c0

Browse files
committed
Default deliveryMode to 2 (persistent) in publish
AMQPQueue.publish and AMQPExchange.publish now default `deliveryMode: 2` when not set, matching the Ruby client. The high-level API already defaults `confirm: true` — persistent-by-default is the consistent stance for an API that optimizes for "don't lose data." Explicit `deliveryMode: 1` is honored. Low-level AMQPChannel.basicPublish stays as-is. RPC client/server publish paths are not touched: requests/replies via amq.rabbitmq.reply-to are inherently transient.
1 parent 0a2ae91 commit 492c6c0

3 files changed

Lines changed: 56 additions & 0 deletions

File tree

src/amqp-exchange.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ export class AMQPExchange<
4343
* by the matching parser's `serialize` method. Without parsers, `body` must
4444
* be a string, Buffer, Uint8Array, or null.
4545
*
46+
* Defaults: `confirm: true`, `deliveryMode: 2` (persistent). Pass
47+
* `deliveryMode: 1` to send a transient message.
48+
*
4649
* @param options - routing key, publish properties; set `confirm: false` to skip broker confirmation
4750
* @returns `this` for chaining
4851
*/
@@ -61,6 +64,7 @@ export class AMQPExchange<
6164
properties,
6265
defaults,
6366
)
67+
if (encoded.properties.deliveryMode === undefined) encoded.properties.deliveryMode = 2
6468
const ch = confirm ? await this.session.getConfirmChannel() : await this.session.getOpsChannel()
6569
await ch.basicPublish(this.name, routingKey, encoded.body, encoded.properties)
6670
return this

src/amqp-queue.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ export class AMQPQueue<
6363
* by the matching parser's `serialize` method. Without parsers, `body` must
6464
* be a string, Buffer, Uint8Array, or null.
6565
*
66+
* Defaults: `confirm: true`, `deliveryMode: 2` (persistent). Pass
67+
* `deliveryMode: 1` to send a transient message.
68+
*
6669
* @param options - publish properties; set `confirm: false` to skip broker confirmation
6770
* @returns `this` for chaining
6871
*/
@@ -81,6 +84,7 @@ export class AMQPQueue<
8184
properties,
8285
defaults,
8386
)
87+
if (encoded.properties.deliveryMode === undefined) encoded.properties.deliveryMode = 2
8488
const ch = confirm ? await this.session.getConfirmChannel() : await this.session.getOpsChannel()
8589
await ch.basicPublish("", this.name, encoded.body, encoded.properties)
8690
return this

test/amqp-session.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,54 @@ test("AMQPQueue (session-backed).publish() and get() round-trip", () =>
251251
expect(msg?.bodyString()).toBe("round-trip")
252252
}))
253253

254+
test("AMQPQueue.publish() defaults deliveryMode to 2 (persistent)", () =>
255+
withSession(async (session) => {
256+
const q = await session.queue("test-sq-persist-" + Math.random(), { durable: false, autoDelete: true })
257+
await q.publish("default")
258+
259+
const msg = await q.get({ noAck: true })
260+
expect(msg?.properties.deliveryMode).toBe(2)
261+
}))
262+
263+
test("AMQPQueue.publish() honors explicit deliveryMode: 1 (transient)", () =>
264+
withSession(async (session) => {
265+
const q = await session.queue("test-sq-transient-" + Math.random(), { durable: false, autoDelete: true })
266+
await q.publish("transient", { deliveryMode: 1 })
267+
268+
const msg = await q.get({ noAck: true })
269+
expect(msg?.properties.deliveryMode).toBe(1)
270+
}))
271+
272+
test("AMQPExchange.publish() defaults deliveryMode to 2 (persistent)", () =>
273+
withSession(async (session) => {
274+
const xName = "test-x-persist-" + Math.random()
275+
const qName = "test-xq-persist-" + Math.random()
276+
const x = await session.fanoutExchange(xName, { durable: false, autoDelete: true })
277+
const q = await session.queue(qName, { durable: false, autoDelete: true })
278+
await q.bind(x)
279+
280+
await x.publish("default")
281+
await new Promise((resolve) => setTimeout(resolve, 50))
282+
283+
const msg = await q.get({ noAck: true })
284+
expect(msg?.properties.deliveryMode).toBe(2)
285+
}))
286+
287+
test("AMQPExchange.publish() honors explicit deliveryMode: 1 (transient)", () =>
288+
withSession(async (session) => {
289+
const xName = "test-x-transient-" + Math.random()
290+
const qName = "test-xq-transient-" + Math.random()
291+
const x = await session.fanoutExchange(xName, { durable: false, autoDelete: true })
292+
const q = await session.queue(qName, { durable: false, autoDelete: true })
293+
await q.bind(x)
294+
295+
await x.publish("transient", { deliveryMode: 1 })
296+
await new Promise((resolve) => setTimeout(resolve, 50))
297+
298+
const msg = await q.get({ noAck: true })
299+
expect(msg?.properties.deliveryMode).toBe(1)
300+
}))
301+
254302
test("AMQPQueue (session-backed).subscribe() recovers after reconnect", () =>
255303
withSession(
256304
async (session) => {

0 commit comments

Comments
 (0)