-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathamqp-rpc-client.ts
More file actions
147 lines (136 loc) · 4.71 KB
/
amqp-rpc-client.ts
File metadata and controls
147 lines (136 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import type { AMQPChannel } from "./amqp-channel.js"
import type { AMQPMessage } from "./amqp-message.js"
import type { AMQPProperties } from "./amqp-properties.js"
import type { Body } from "./amqp-publisher.js"
import type { AMQPSession } from "./amqp-session.js"
const DIRECT_REPLY_TO = "amq.rabbitmq.reply-to"
/**
* Reusable RPC client using the direct reply-to feature.
*
* @example
* ```ts
* const session = await AMQPSession.connect("amqp://localhost")
* const rpc = await session.rpcClient() // tracked for reconnect recovery
* const reply = await rpc.call("my_queue", "request body")
* console.log(reply.bodyString())
* await rpc.close()
* ```
*/
export class AMQPRPCClient {
private readonly session: AMQPSession
private ch: AMQPChannel | null = null
private correlationId = 0
private readonly pending = new Map<
string,
{
resolve: (msg: AMQPMessage) => void
reject: (err: Error) => void
timer: ReturnType<typeof setTimeout> | undefined
}
>()
private closed = false
/** @internal Use {@link AMQPSession.rpcClient} instead. */
constructor(session: AMQPSession) {
this.session = session
}
/** @internal Called by {@link AMQPSession.rpcClient}. */
async start(): Promise<this> {
if (this.closed) throw new Error("RPC client is closed")
if (this.ch && !this.ch.closed) return this
const ch = await this.session.openChannel()
try {
// Direct reply-to is scoped per-channel by RabbitMQ, so only replies
// for this client arrive here. Messages with an unknown correlationId
// (e.g. late replies after a timeout) are intentionally dropped — with
// noAck: true they are acknowledged on delivery and cannot be requeued.
await ch.basicConsume(DIRECT_REPLY_TO, { noAck: true }, (msg) => {
const id = msg.properties.correlationId
if (id === undefined) return
const entry = this.pending.get(id)
if (!entry) return
this.pending.delete(id)
if (entry.timer) clearTimeout(entry.timer)
entry.resolve(msg)
})
this.ch = ch
return this
} catch (err) {
ch.close().catch(() => {})
throw err
}
}
/**
* Perform an RPC call: publish a message and wait for the response.
*
* @param queue - The queue name (routing key) of the RPC server
* @param body - The request body
* @param options - Optional properties and timeout
* @param options.timeout - Timeout in milliseconds. Rejects with an error if
* no response is received within this time.
* @returns The reply {@link AMQPMessage}
*/
call(
queue: string,
body: Body,
{ timeout, ...properties }: AMQPProperties & { timeout?: number } = {},
): Promise<AMQPMessage> {
if (this.closed) throw new Error("RPC client is closed")
if (!this.ch || this.ch.closed) throw new Error("RPC client not started, call start() first")
const ch = this.ch
const correlationId = (++this.correlationId).toString(36)
return new Promise<AMQPMessage>((resolve, reject) => {
let timer: ReturnType<typeof setTimeout> | undefined
if (timeout !== undefined && timeout > 0) {
timer = setTimeout(() => {
this.pending.delete(correlationId)
reject(new Error(`No response received in ${timeout}ms`))
}, timeout)
}
this.pending.set(correlationId, { resolve, reject, timer })
ch.basicPublish("", queue, body, {
...properties,
replyTo: DIRECT_REPLY_TO,
correlationId,
}).catch((err) => {
const entry = this.pending.get(correlationId)
if (!entry) return
this.pending.delete(correlationId)
if (entry.timer) clearTimeout(entry.timer)
entry.reject(err)
})
})
}
/**
* Re-establish the channel and consumer after a reconnection.
* All pending calls are rejected since the old channel is gone.
* @internal Called by the session's reconnect loop.
*/
async recover(): Promise<void> {
if (this.closed) return
this.rejectAllPending(new Error("RPC client reconnecting"))
this.ch = null
await this.start()
}
/**
* Close the dedicated channel, reject any pending calls, and remove
* this client from the session's reconnect recovery.
*/
async close(): Promise<void> {
if (this.closed) return
this.closed = true
this.session.untrackRPCClient(this)
this.rejectAllPending(new Error("RPC client closed"))
const ch = this.ch
this.ch = null
if (ch && !ch.closed) {
await ch.close()
}
}
private rejectAllPending(err: Error): void {
for (const [id, entry] of this.pending) {
if (entry.timer) clearTimeout(entry.timer)
entry.reject(err)
this.pending.delete(id)
}
}
}