Add AMQPRPCClient for RPC-style messaging with direct reply-to#191
Add AMQPRPCClient for RPC-style messaging with direct reply-to#191
Conversation
b4d00f7 to
f63a1e3
Compare
Add AMQPRPCClient (direct reply-to) and AMQPRPCServer (queue-based) for request-response messaging. Both integrate with AMQPSession: - session.rpcClient() creates a reusable client tracked for reconnect recovery - session.rpcServer(queue, handler) creates a server using session queue subscribe - session.rpcCall(queue, body) does a one-shot RPC call RPC clients get automatic recovery on reconnect. RPC servers recover through the existing queue/subscription machinery. session.stop() cleans up everything.
f63a1e3 to
8ebb094
Compare
| if (this.ch && !this.ch.closed) return this | ||
| const ch = await this.session.openChannel() | ||
| try { | ||
| await ch.basicConsume(DIRECT_REPLY_TO, { noAck: true }, (msg) => { |
There was a problem hiding this comment.
what happens if the consumer receives a message with a correlationId that doesn’t match any entry in this.pending? As far as I understand, with noAck: true, such messages would just be dropped and never re-delivered—so any message not intended for this particular client is effectively ignored and lost.
Is this the intended behavior?
There was a problem hiding this comment.
Yes, it's by design. DIRECT_REPLY_TO is not a real queue, the broker routes messages directly to the correct consumer.
| * @param options.timeout - Timeout in milliseconds | ||
| * @returns The reply {@link AMQPMessage} | ||
| */ | ||
| async rpcCall(queue: string, body: Body, options?: AMQPProperties & { timeout?: number }): Promise<AMQPMessage> { |
There was a problem hiding this comment.
I'm not sure I like this oneshot part, at least that it resides in the Session. But I'm not going to fight against it's existence either :D
There was a problem hiding this comment.
Actually, this is the highest level of abstraction that is the most convenient way of making rpc calls. The reusable client is there for performance reasons. So if anything I think we should hide that api, and direct everyone to use this method.
There was a problem hiding this comment.
Improved the docs on this.
- Simplify RPCHandler signature from (body, msg) to just (msg) - Remove session.rpcCall() one-shot convenience method - Add comment explaining unmatched correlationId behavior in RPC client - Flatten nested if-statements in RPC client consumer callback - Update README, CHANGELOG, and all tests https://claude.ai/code/session_016fEPinum7XFHdDBqX54iAn
rpcCall is the simplest entry point for most users. rpcClient is the optimization for high-throughput scenarios that need to avoid per-call channel overhead. Updated docs and comments to reflect this. https://claude.ai/code/session_016fEPinum7XFHdDBqX54iAn
Summary
AMQPRPCClientandAMQPRPCServerfor request-response messaging over AMQPrpcClient(),rpcServer(),rpcCall()session.stop()cleans up all RPC clients, servers, and consumersRPC Client (
src/amqp-rpc-client.ts)RPC Server (
src/amqp-rpc-server.ts)queue().subscribe()for auto-recoveryreplyTobefore processing; nacks messages without itSession Integration (
src/amqp-session.ts)rpcClient()— create and start an RPC client (tracked for reconnect recovery)rpcServer(queue, handler, prefetch?)— create and start an RPC serverrpcCall(queue, body, options?)— one-shot RPC call (creates and disposes a client per call)Test plan
rpcCall()round-trip