Skip to content

Add AMQPRPCClient for RPC-style messaging with direct reply-to#191

Merged
baelter merged 3 commits intomainfrom
claude/add-rpc-api-U74PF
Mar 9, 2026
Merged

Add AMQPRPCClient for RPC-style messaging with direct reply-to#191
baelter merged 3 commits intomainfrom
claude/add-rpc-api-U74PF

Conversation

@baelter
Copy link
Copy Markdown
Member

@baelter baelter commented Mar 5, 2026

Summary

  • Add AMQPRPCClient and AMQPRPCServer for request-response messaging over AMQP
  • Add session-level convenience methods: rpcClient(), rpcServer(), rpcCall()
  • RPC clients are tracked for automatic reconnect recovery
  • RPC servers recover through existing queue/subscription machinery
  • session.stop() cleans up all RPC clients, servers, and consumers

RPC Client (src/amqp-rpc-client.ts)

  • Uses direct reply-to pseudo-queue for efficient request-response
  • Tracks pending calls by correlation ID with optional timeout
  • Rejects all pending calls on close or reconnect
  • Recovered automatically by the session on reconnect

RPC Server (src/amqp-rpc-server.ts)

  • Subscribes to a named queue using session-level queue().subscribe() for auto-recovery
  • Invokes a user-provided handler for each request and publishes the reply
  • Validates replyTo before processing; nacks messages without it

Session Integration (src/amqp-session.ts)

  • rpcClient() — create and start an RPC client (tracked for reconnect recovery)
  • rpcServer(queue, handler, prefetch?) — create and start an RPC server
  • rpcCall(queue, body, options?) — one-shot RPC call (creates and disposes a client per call)

Test plan

  • Basic RPC call and reply round-trip
  • Multiple sequential RPC calls
  • One-shot rpcCall() round-trip
  • Timeout handling
  • Pending call rejection on client close
  • Browser tests (WebSocket round-trip and one-shot)

This comment was marked as resolved.

This comment was marked as resolved.

This comment was marked as resolved.

This comment was marked as resolved.

This comment was marked as resolved.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 10 changed files in this pull request and generated 3 comments.

Comment thread src/amqp-rpc-client.ts Outdated
Comment thread src/amqp-session.ts Outdated
Comment thread src/amqp-rpc-client.ts Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 10 changed files in this pull request and generated 4 comments.

Comment thread src/amqp-rpc-client.ts
Comment thread test/test.ts Outdated
Comment thread test/test.ts Outdated
Comment thread test/test.ts Outdated
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 10 changed files in this pull request and generated no new comments.

@baelter baelter force-pushed the claude/add-rpc-api-U74PF branch 2 times, most recently from b4d00f7 to f63a1e3 Compare March 6, 2026 13:39
@baelter baelter marked this pull request as ready for review March 6, 2026 13:44
@baelter baelter requested a review from ThomasSarlin as a code owner March 6, 2026 13:44
Add AMQPRPCClient (direct reply-to) and AMQPRPCServer (queue-based)
for request-response messaging. Both integrate with AMQPSession:

- session.rpcClient() creates a reusable client tracked for reconnect recovery
- session.rpcServer(queue, handler) creates a server using session queue subscribe
- session.rpcCall(queue, body) does a one-shot RPC call

RPC clients get automatic recovery on reconnect. RPC servers recover
through the existing queue/subscription machinery. session.stop()
cleans up everything.
@baelter baelter force-pushed the claude/add-rpc-api-U74PF branch from f63a1e3 to 8ebb094 Compare March 6, 2026 14:43
Comment thread src/amqp-rpc-client.ts
if (this.ch && !this.ch.closed) return this
const ch = await this.session.openChannel()
try {
await ch.basicConsume(DIRECT_REPLY_TO, { noAck: true }, (msg) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the consumer receives a message with a correlationId that doesn’t match any entry in this.pending? As far as I understand, with noAck: true, such messages would just be dropped and never re-delivered—so any message not intended for this particular client is effectively ignored and lost.

Is this the intended behavior?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's by design. DIRECT_REPLY_TO is not a real queue, the broker routes messages directly to the correct consumer.

Comment thread src/amqp-rpc-server.ts Outdated
Comment thread src/amqp-rpc-server.ts Outdated
Comment thread src/amqp-session.ts Outdated
* @param options.timeout - Timeout in milliseconds
* @returns The reply {@link AMQPMessage}
*/
async rpcCall(queue: string, body: Body, options?: AMQPProperties & { timeout?: number }): Promise<AMQPMessage> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I like this oneshot part, at least that it resides in the Session. But I'm not going to fight against it's existence either :D

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is the highest level of abstraction that is the most convenient way of making rpc calls. The reusable client is there for performance reasons. So if anything I think we should hide that api, and direct everyone to use this method.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved the docs on this.

Comment thread src/amqp-session.ts
claude added 2 commits March 6, 2026 18:19
- Simplify RPCHandler signature from (body, msg) to just (msg)
- Remove session.rpcCall() one-shot convenience method
- Add comment explaining unmatched correlationId behavior in RPC client
- Flatten nested if-statements in RPC client consumer callback
- Update README, CHANGELOG, and all tests

https://claude.ai/code/session_016fEPinum7XFHdDBqX54iAn
rpcCall is the simplest entry point for most users. rpcClient is the
optimization for high-throughput scenarios that need to avoid per-call
channel overhead. Updated docs and comments to reflect this.

https://claude.ai/code/session_016fEPinum7XFHdDBqX54iAn
@baelter baelter requested a review from antondalgren March 9, 2026 08:52
@baelter baelter merged commit 4f2e18d into main Mar 9, 2026
6 checks passed
@baelter baelter deleted the claude/add-rpc-api-U74PF branch March 9, 2026 12:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants