Skip to content

Commit 0a2ae91

Browse files
committed
Accept AMQPExchange handle in AMQPQueue bind/unbind
AMQPExchange.bind/unbind already accept `string | AMQPExchange`. Mirror that on AMQPQueue so users don't have to reach for `exchange.name` after working with the exchange handle. await queue.bind(exchange, "user-id") // now works
1 parent e17645f commit 0a2ae91

2 files changed

Lines changed: 20 additions & 4 deletions

File tree

src/amqp-queue.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { AMQPConsumer, AMQPGeneratorConsumer } from "./amqp-consumer.js"
55
import { AMQPSubscription, AMQPGeneratorSubscription } from "./amqp-subscription.js"
66
import type { ConsumerDefinition } from "./amqp-subscription.js"
77
import type { AMQPSession } from "./amqp-session.js"
8+
import type { AMQPExchange } from "./amqp-exchange.js"
89
import type { ResolveBody } from "./amqp-publisher.js"
910
import { serializeAndEncode, decodeMessage } from "./amqp-codec-registry.js"
1011
import type { ParserMap, CoderMap } from "./amqp-codec-registry.js"
@@ -153,9 +154,14 @@ export class AMQPQueue<
153154
* Bind this queue to an exchange.
154155
* @returns `this` for chaining
155156
*/
156-
async bind(exchange: string, routingKey = "", args: Record<string, unknown> = {}): Promise<AMQPQueue<P, C, KP, KC>> {
157+
async bind(
158+
exchange: string | AMQPExchange<P, C, KP, KC>,
159+
routingKey = "",
160+
args: Record<string, unknown> = {},
161+
): Promise<AMQPQueue<P, C, KP, KC>> {
162+
const exchangeName = typeof exchange === "string" ? exchange : exchange.name
157163
const ch = await this.session.getOpsChannel()
158-
await ch.queueBind(this.name, exchange, routingKey, args)
164+
await ch.queueBind(this.name, exchangeName, routingKey, args)
159165
return this
160166
}
161167

@@ -164,12 +170,13 @@ export class AMQPQueue<
164170
* @returns `this` for chaining
165171
*/
166172
async unbind(
167-
exchange: string,
173+
exchange: string | AMQPExchange<P, C, KP, KC>,
168174
routingKey = "",
169175
args: Record<string, unknown> = {},
170176
): Promise<AMQPQueue<P, C, KP, KC>> {
177+
const exchangeName = typeof exchange === "string" ? exchange : exchange.name
171178
const ch = await this.session.getOpsChannel()
172-
await ch.queueUnbind(this.name, exchange, routingKey, args)
179+
await ch.queueUnbind(this.name, exchangeName, routingKey, args)
173180
return this
174181
}
175182

test/amqp-session.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,15 @@ test("AMQPQueue.bind() and unbind()", () =>
430430
await expect(q.unbind("amq.topic", "test.key")).resolves.toBeInstanceOf(AMQPQueue)
431431
}))
432432

433+
test("AMQPQueue.bind() and unbind() accept an AMQPExchange handle", () =>
434+
withSession(async (session) => {
435+
const x = await session.topicExchange("amq.topic")
436+
const q = await session.queue("test-q-bind-handle-" + Math.random(), { durable: false, autoDelete: true })
437+
438+
await expect(q.bind(x, "test.key")).resolves.toBeInstanceOf(AMQPQueue)
439+
await expect(q.unbind(x, "test.key")).resolves.toBeInstanceOf(AMQPQueue)
440+
}))
441+
433442
test("AMQPQueue.purge() empties the queue", () =>
434443
withSession(async (session) => {
435444
const q = await session.queue("test-q-purge-" + Math.random(), { durable: false, autoDelete: true })

0 commit comments

Comments
 (0)