Skip to content

Commit 4f2e18d

Browse files
baelterclaude
andauthored
Add AMQPRPCClient for RPC-style messaging with direct reply-to (#191)
* Add RPC client and server with session integration Add AMQPRPCClient (direct reply-to) and AMQPRPCServer (queue-based) for request-response messaging. Both integrate with AMQPSession: - session.rpcClient() creates a reusable client tracked for reconnect recovery - session.rpcServer(queue, handler) creates a server using session queue subscribe - session.rpcCall(queue, body) does a one-shot RPC call RPC clients get automatic recovery on reconnect. RPC servers recover through the existing queue/subscription machinery. session.stop() cleans up everything. * Address review feedback: simplify RPC handler, remove oneshot rpcCall - Simplify RPCHandler signature from (body, msg) to just (msg) - Remove session.rpcCall() one-shot convenience method - Add comment explaining unmatched correlationId behavior in RPC client - Flatten nested if-statements in RPC client consumer callback - Update README, CHANGELOG, and all tests https://claude.ai/code/session_016fEPinum7XFHdDBqX54iAn * Restore rpcCall as the recommended high-level RPC API rpcCall is the simplest entry point for most users. rpcClient is the optimization for high-throughput scenarios that need to avoid per-call channel overhead. Updated docs and comments to reflect this. https://claude.ai/code/session_016fEPinum7XFHdDBqX54iAn --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 2da3fae commit 4f2e18d

9 files changed

Lines changed: 433 additions & 13 deletions

File tree

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- `AMQPRPCClient` — reusable RPC client using direct reply-to for request-response patterns ([#191](https://github.com/cloudamqp/amqp-client.js/pull/191))
13+
- `start()` to begin listening for responses on the direct reply-to pseudo-queue
14+
- `call(queue, body, options?)` to publish an RPC request and await its response
15+
- Configurable per-call `timeout`; automatic correlation ID tracking
16+
- `close()` to reject pending calls and clean up the channel
17+
- Automatically recovered by `AMQPSession` on reconnect when created via `session.rpcClient()`
18+
- `AMQPRPCServer` — RPC server that consumes from a queue and replies to each caller ([#191](https://github.com/cloudamqp/amqp-client.js/pull/191))
19+
- Uses session-level queue subscribe for automatic consumer recovery on reconnect
20+
- Handler receives the full `AMQPMessage` and returns the response body
21+
- Session-level RPC convenience methods ([#191](https://github.com/cloudamqp/amqp-client.js/pull/191))
22+
- `session.rpcCall(queue, body, options?)` — simple one-shot RPC call (recommended for most use cases)
23+
- `session.rpcClient()` — create a reusable `AMQPRPCClient` for high-throughput scenarios
24+
- `session.rpcServer(queue, handler, prefetch?)` — create and start an `AMQPRPCServer`
1225
- `AMQPSession` — high-level client with automatic reconnection and consumer recovery ([#185](https://github.com/cloudamqp/amqp-client.js/pull/185), [#186](https://github.com/cloudamqp/amqp-client.js/pull/186))
1326
- `AMQPSession.connect(url, options?)` factory: picks TCP or WebSocket transport from the URL scheme (`amqp://` / `amqps://` → TCP; `ws://` / `wss://` → WebSocket)
1427
- Exponential backoff with configurable `reconnectInterval`, `maxReconnectInterval`, `backoffMultiplier`, and `maxRetries`

README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,35 @@ const sub = await q.subscribe({ prefetch: 10 }, async (msg) => {
9090
// await sub.cancel() // stops consuming and removes from auto-recovery
9191
```
9292
93+
#### RPC (Remote Procedure Call)
94+
95+
The session provides built-in RPC support using the direct reply-to feature:
96+
97+
```javascript
98+
import { AMQPSession } from "@cloudamqp/amqp-client"
99+
100+
const session = await AMQPSession.connect("amqp://localhost")
101+
102+
// Start an RPC server that listens on a queue
103+
const server = await session.rpcServer("my_rpc_queue", async (msg) => {
104+
return `processed:${msg.bodyString()}`
105+
})
106+
107+
// Simple RPC call — creates a temporary client per call
108+
const reply = await session.rpcCall("my_rpc_queue", "hello", { timeout: 5000 })
109+
console.log(reply.bodyToString()) // "processed:hello"
110+
111+
// For high-throughput scenarios, reuse a client to avoid per-call channel overhead
112+
const rpc = await session.rpcClient()
113+
const r1 = await rpc.call("my_rpc_queue", "a")
114+
const r2 = await rpc.call("my_rpc_queue", "b")
115+
await rpc.close()
116+
117+
await session.stop() // closes all RPC clients, servers, and consumers
118+
```
119+
120+
RPC servers and reusable clients created via `session.rpcClient()` are automatically recovered after a reconnection.
121+
93122
### Low-level API
94123
95124
For full control over channels and resources, use the transport clients directly:

package-lock.json

Lines changed: 0 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/amqp-rpc-client.ts

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import type { AMQPChannel } from "./amqp-channel.js"
2+
import type { AMQPMessage } from "./amqp-message.js"
3+
import type { AMQPProperties } from "./amqp-properties.js"
4+
import type { Body } from "./amqp-publisher.js"
5+
import type { AMQPSession } from "./amqp-session.js"
6+
7+
const DIRECT_REPLY_TO = "amq.rabbitmq.reply-to"
8+
9+
/**
10+
* Reusable RPC client using the direct reply-to feature.
11+
*
12+
* @example
13+
* ```ts
14+
* const session = await AMQPSession.connect("amqp://localhost")
15+
* const rpc = await session.rpcClient() // tracked for reconnect recovery
16+
* const reply = await rpc.call("my_queue", "request body")
17+
* console.log(reply.bodyString())
18+
* await rpc.close()
19+
* ```
20+
*/
21+
export class AMQPRPCClient {
22+
private readonly session: AMQPSession
23+
private ch: AMQPChannel | null = null
24+
private correlationId = 0
25+
private readonly pending = new Map<
26+
string,
27+
{
28+
resolve: (msg: AMQPMessage) => void
29+
reject: (err: Error) => void
30+
timer: ReturnType<typeof setTimeout> | undefined
31+
}
32+
>()
33+
private closed = false
34+
35+
/** @internal Use {@link AMQPSession.rpcClient} instead. */
36+
constructor(session: AMQPSession) {
37+
this.session = session
38+
}
39+
40+
/** @internal Called by {@link AMQPSession.rpcClient}. */
41+
async start(): Promise<this> {
42+
if (this.closed) throw new Error("RPC client is closed")
43+
if (this.ch && !this.ch.closed) return this
44+
const ch = await this.session.openChannel()
45+
try {
46+
// Direct reply-to is scoped per-channel by RabbitMQ, so only replies
47+
// for this client arrive here. Messages with an unknown correlationId
48+
// (e.g. late replies after a timeout) are intentionally dropped — with
49+
// noAck: true they are acknowledged on delivery and cannot be requeued.
50+
await ch.basicConsume(DIRECT_REPLY_TO, { noAck: true }, (msg) => {
51+
const id = msg.properties.correlationId
52+
if (id === undefined) return
53+
const entry = this.pending.get(id)
54+
if (!entry) return
55+
this.pending.delete(id)
56+
if (entry.timer) clearTimeout(entry.timer)
57+
entry.resolve(msg)
58+
})
59+
this.ch = ch
60+
return this
61+
} catch (err) {
62+
ch.close().catch(() => {})
63+
throw err
64+
}
65+
}
66+
67+
/**
68+
* Perform an RPC call: publish a message and wait for the response.
69+
*
70+
* @param queue - The queue name (routing key) of the RPC server
71+
* @param body - The request body
72+
* @param options - Optional properties and timeout
73+
* @param options.timeout - Timeout in milliseconds. Rejects with an error if
74+
* no response is received within this time.
75+
* @returns The reply {@link AMQPMessage}
76+
*/
77+
call(
78+
queue: string,
79+
body: Body,
80+
{ timeout, ...properties }: AMQPProperties & { timeout?: number } = {},
81+
): Promise<AMQPMessage> {
82+
if (this.closed) throw new Error("RPC client is closed")
83+
if (!this.ch || this.ch.closed) throw new Error("RPC client not started, call start() first")
84+
const ch = this.ch
85+
const correlationId = (++this.correlationId).toString(36)
86+
87+
return new Promise<AMQPMessage>((resolve, reject) => {
88+
let timer: ReturnType<typeof setTimeout> | undefined
89+
if (timeout !== undefined && timeout > 0) {
90+
timer = setTimeout(() => {
91+
this.pending.delete(correlationId)
92+
reject(new Error(`No response received in ${timeout}ms`))
93+
}, timeout)
94+
}
95+
96+
this.pending.set(correlationId, { resolve, reject, timer })
97+
98+
ch.basicPublish("", queue, body, {
99+
...properties,
100+
replyTo: DIRECT_REPLY_TO,
101+
correlationId,
102+
}).catch((err) => {
103+
const entry = this.pending.get(correlationId)
104+
if (!entry) return
105+
this.pending.delete(correlationId)
106+
if (entry.timer) clearTimeout(entry.timer)
107+
entry.reject(err)
108+
})
109+
})
110+
}
111+
112+
/**
113+
* Re-establish the channel and consumer after a reconnection.
114+
* All pending calls are rejected since the old channel is gone.
115+
* @internal Called by the session's reconnect loop.
116+
*/
117+
async recover(): Promise<void> {
118+
if (this.closed) return
119+
this.rejectAllPending(new Error("RPC client reconnecting"))
120+
this.ch = null
121+
await this.start()
122+
}
123+
124+
/**
125+
* Close the dedicated channel, reject any pending calls, and remove
126+
* this client from the session's reconnect recovery.
127+
*/
128+
async close(): Promise<void> {
129+
if (this.closed) return
130+
this.closed = true
131+
this.session.untrackRPCClient(this)
132+
this.rejectAllPending(new Error("RPC client closed"))
133+
const ch = this.ch
134+
this.ch = null
135+
if (ch && !ch.closed) {
136+
await ch.close()
137+
}
138+
}
139+
140+
private rejectAllPending(err: Error): void {
141+
for (const [id, entry] of this.pending) {
142+
if (entry.timer) clearTimeout(entry.timer)
143+
entry.reject(err)
144+
this.pending.delete(id)
145+
}
146+
}
147+
}

src/amqp-rpc-server.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import type { AMQPMessage } from "./amqp-message.js"
2+
import type { Body } from "./amqp-publisher.js"
3+
import type { AMQPSession } from "./amqp-session.js"
4+
import type { AMQPSubscription } from "./amqp-subscription.js"
5+
6+
/**
7+
* Callback invoked for each incoming RPC request.
8+
* Return the response body to send back to the caller.
9+
*/
10+
export type RPCHandler = (msg: AMQPMessage) => Body | Promise<Body>
11+
12+
/**
13+
* An RPC server that consumes messages from a queue and replies to each caller.
14+
*
15+
* Uses the session's queue and subscribe machinery, so the consumer is
16+
* automatically recovered after a reconnection.
17+
*
18+
* @example
19+
* ```ts
20+
* const session = await AMQPSession.connect("amqp://localhost")
21+
* const server = await session.rpcServer("my_rpc_queue", async (msg) => {
22+
* return `processed:${msg.bodyString()}`
23+
* })
24+
* // later…
25+
* await session.stop()
26+
* ```
27+
*/
28+
export class AMQPRPCServer {
29+
private readonly session: AMQPSession
30+
private subscription: AMQPSubscription | null = null
31+
32+
/** @internal Use {@link AMQPSession.rpcServer} instead. */
33+
constructor(session: AMQPSession) {
34+
this.session = session
35+
}
36+
37+
/** @internal Called by {@link AMQPSession.rpcServer}. */
38+
async start(queue: string, handler: RPCHandler, prefetch = 1): Promise<this> {
39+
if (this.subscription) throw new Error("RPC server already started")
40+
const q = await this.session.queue(queue)
41+
this.subscription = await q.subscribe({ prefetch, noAck: false, requeueOnNack: false }, async (msg) => {
42+
const { replyTo, correlationId } = msg.properties
43+
if (!replyTo) {
44+
await msg.nack(false)
45+
return
46+
}
47+
const result = await handler(msg)
48+
await msg.channel.basicPublish("", replyTo, result, {
49+
...(correlationId !== undefined && { correlationId }),
50+
})
51+
})
52+
return this
53+
}
54+
55+
/**
56+
* Cancel the consumer. The underlying queue remains declared.
57+
*/
58+
async close(): Promise<void> {
59+
const sub = this.subscription
60+
if (!sub) return
61+
this.subscription = null
62+
const ch = sub.channel
63+
await sub.cancel()
64+
if (!ch.closed) await ch.close()
65+
}
66+
}

0 commit comments

Comments
 (0)