Skip to content

Fix parallel queue binding by implementing proper RPC callback queue#154

Merged
baelter merged 1 commit intomainfrom
copilot/fix-queue-binding-parallel-issue
Sep 11, 2025
Merged

Fix parallel queue binding by implementing proper RPC callback queue#154
baelter merged 1 commit intomainfrom
copilot/fix-queue-binding-parallel-issue

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Sep 9, 2025

Fixes the issue where parallel queue binding operations would hang indefinitely when using Promise.all(). The problem occurred when multiple q.bind() calls were made simultaneously, such as:

await Promise.all([
  q.bind('test-exchange', 'foo:*'),
  q.bind('test-exchange', 'bar:*'),
])

Root Cause

The issue was in the sendRpc method in AMQPChannel. It used a single set of resolve/reject callbacks (this.resolveRPC and this.rejectRPC) that got overwritten when multiple RPC requests were made concurrently. When RPC responses arrived from the server, only the last request's callbacks were available, leaving all previous requests hanging.

Solution

Implemented a proper FIFO callback queue system:

  • Added rpcCallbacks queue: Stores resolve/reject callback pairs for each RPC request in order
  • Modified sendRpc: Pushes callback pairs to the queue instead of overwriting single callbacks
  • Updated resolveRPC/rejectRPC: Process callbacks from the queue in FIFO order
  • Enhanced error handling: Properly reject all pending callbacks on channel close or send failures

Testing

Added comprehensive test coverage for parallel queue binding and verified the fix handles edge cases correctly:

  • FIFO ordering of concurrent RPC requests
  • Channel closure rejection of pending callbacks
  • Empty queue handling

The fix is minimal (42 lines changed), maintains backward compatibility, and resolves the hanging behavior while preserving all existing functionality.

Fixes #94.


✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

Copilot AI changed the title [WIP] No longer possible to bind queues in parallel Fix parallel queue binding by implementing proper RPC callback queue Sep 9, 2025
Copilot AI requested a review from baelter September 9, 2025 11:41
…queue

Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
@baelter baelter force-pushed the copilot/fix-queue-binding-parallel-issue branch from 7d07f99 to 1e7b3cb Compare September 9, 2025 12:14
@baelter baelter marked this pull request as ready for review September 9, 2025 12:16
@baelter baelter merged commit 19c6017 into main Sep 11, 2025
6 checks passed
@baelter baelter deleted the copilot/fix-queue-binding-parallel-issue branch September 11, 2025 06:42
Comment thread src/amqp-channel.ts
Comment on lines +760 to +762
} else {
reject(err)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if we need this else branch, then we have a race condition somewhere since we are pushing the callbacks to the queue and then pop them above?

Comment thread src/amqp-channel.ts
Comment on lines +756 to +762
// Remove the callbacks from the queue if send fails
const callbacks = this.rpcCallbacks.pop()
if (callbacks) {
callbacks[1](err) // call reject
} else {
reject(err)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably reuse the this.rejectRPC as it's the same code.

Comment thread src/amqp-channel.ts
Comment on lines +850 to +855
resolveRPC(value?: unknown) : unknown | void {
const callbacks = this.rpcCallbacks.shift()
if (callbacks) {
callbacks[0](value) // call resolve
}
return value
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find using object destructuring way easier to read than accessing items through indicies. Another positive sideeffect is that you wont need the comment to explain that it's a resolve.

  resolveRPC(value?: unknown): unknown | void {
    const callbacks = this.rpcCallbacks.shift()
    if (callbacks) {
      const [resolve, _reject] = callbacks
      resolve(value)
    }
    return value
  }

I couldn't find any place where we are using the return value, should we remove that from the function?

Comment thread src/amqp-channel.ts
Comment on lines +862 to +867
rejectRPC(err?: Error) : Error | void {
const callbacks = this.rpcCallbacks.shift()
if (callbacks) {
callbacks[1](err) // call reject
}
return err
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any place where this method is called 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

No longer possible to bind queues in parallel

3 participants