-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathamqp-rpc-server.ts
More file actions
66 lines (61 loc) · 2.07 KB
/
amqp-rpc-server.ts
File metadata and controls
66 lines (61 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import type { AMQPMessage } from "./amqp-message.js"
import type { Body } from "./amqp-publisher.js"
import type { AMQPSession } from "./amqp-session.js"
import type { AMQPSubscription } from "./amqp-subscription.js"
/**
* Callback invoked for each incoming RPC request.
* Return the response body to send back to the caller.
*/
export type RPCHandler = (msg: AMQPMessage) => Body | Promise<Body>
/**
* An RPC server that consumes messages from a queue and replies to each caller.
*
* Uses the session's queue and subscribe machinery, so the consumer is
* automatically recovered after a reconnection.
*
* @example
* ```ts
* const session = await AMQPSession.connect("amqp://localhost")
* const server = await session.rpcServer("my_rpc_queue", async (msg) => {
* return `processed:${msg.bodyString()}`
* })
* // later…
* await session.stop()
* ```
*/
export class AMQPRPCServer {
private readonly session: AMQPSession
private subscription: AMQPSubscription | null = null
/** @internal Use {@link AMQPSession.rpcServer} instead. */
constructor(session: AMQPSession) {
this.session = session
}
/** @internal Called by {@link AMQPSession.rpcServer}. */
async start(queue: string, handler: RPCHandler, prefetch = 1): Promise<this> {
if (this.subscription) throw new Error("RPC server already started")
const q = await this.session.queue(queue)
this.subscription = await q.subscribe({ prefetch, noAck: false, requeueOnNack: false }, async (msg) => {
const { replyTo, correlationId } = msg.properties
if (!replyTo) {
await msg.nack(false)
return
}
const result = await handler(msg)
await msg.channel.basicPublish("", replyTo, result, {
...(correlationId !== undefined && { correlationId }),
})
})
return this
}
/**
* Cancel the consumer. The underlying queue remains declared.
*/
async close(): Promise<void> {
const sub = this.subscription
if (!sub) return
this.subscription = null
const ch = sub.channel
await sub.cancel()
if (!ch.closed) await ch.close()
}
}