-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathamqp-exchange.ts
More file actions
85 lines (78 loc) · 2.83 KB
/
amqp-exchange.ts
File metadata and controls
85 lines (78 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import type { AMQPProperties } from "./amqp-properties.js"
import type { AMQPSession } from "./amqp-session.js"
import { publishConfirmed, publishNoConfirm, type Body } from "./amqp-publisher.js"
/** Options for {@link AMQPExchange#publish}. */
export type ExchangePublishOptions = AMQPProperties & {
/** Routing key. Defaults to `""`. */
routingKey?: string
/** Wait for broker confirmation. Defaults to `true`. */
confirm?: boolean
}
/**
* Session-level exchange handle returned by {@link AMQPSession#exchange} and its
* convenience variants ({@link AMQPSession#directExchange}, etc.).
*
* All operations are reconnect-safe: they acquire a session channel on each
* call. `publish` waits for a broker confirm; pass `{ confirm: false }` to skip the wait.
*/
export class AMQPExchange {
readonly name: string
private readonly session: AMQPSession
/** @internal */
constructor(session: AMQPSession, name: string) {
this.session = session
this.name = name
}
/**
* Publish a message to this exchange.
* @param options - routing key, publish properties; set `confirm: false` to skip broker confirmation
* @returns `this` for chaining
*/
async publish(body: Body, options: ExchangePublishOptions = {}): Promise<AMQPExchange> {
const { confirm = true, routingKey = "", ...properties } = options
if (confirm) {
await publishConfirmed(this.session, this.name, routingKey, body, properties)
} else {
await publishNoConfirm(this.session, this.name, routingKey, body, properties)
}
return this
}
/**
* Bind this exchange to a source exchange.
* @param source - name or {@link AMQPExchange} to bind from
* @returns `this` for chaining
*/
async bind(
source: string | AMQPExchange,
routingKey = "",
args: Record<string, unknown> = {},
): Promise<AMQPExchange> {
const sourceName = typeof source === "string" ? source : source.name
const ch = await this.session.getOpsChannel()
await ch.exchangeBind(this.name, sourceName, routingKey, args)
return this
}
/**
* Remove a binding between this exchange and a source exchange.
* @param source - name or {@link AMQPExchange} to unbind from
* @returns `this` for chaining
*/
async unbind(
source: string | AMQPExchange,
routingKey = "",
args: Record<string, unknown> = {},
): Promise<AMQPExchange> {
const sourceName = typeof source === "string" ? source : source.name
const ch = await this.session.getOpsChannel()
await ch.exchangeUnbind(this.name, sourceName, routingKey, args)
return this
}
/**
* Delete this exchange.
* @param [params.ifUnused=false] - only delete if the exchange has no bindings
*/
async delete(params?: { ifUnused?: boolean }): Promise<void> {
const ch = await this.session.getOpsChannel()
await ch.exchangeDelete(this.name, params)
}
}