Fix parallel queue binding by implementing proper RPC callback queue#154
Merged
Fix parallel queue binding by implementing proper RPC callback queue#154
Conversation
Copilot
AI
changed the title
[WIP] No longer possible to bind queues in parallel
Fix parallel queue binding by implementing proper RPC callback queue
Sep 9, 2025
…queue Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
7d07f99 to
1e7b3cb
Compare
baelter
approved these changes
Sep 11, 2025
Comment on lines
+760
to
+762
| } else { | ||
| reject(err) | ||
| } |
Contributor
There was a problem hiding this comment.
I think that if we need this else branch, then we have a race condition somewhere since we are pushing the callbacks to the queue and then pop them above?
Comment on lines
+756
to
+762
| // Remove the callbacks from the queue if send fails | ||
| const callbacks = this.rpcCallbacks.pop() | ||
| if (callbacks) { | ||
| callbacks[1](err) // call reject | ||
| } else { | ||
| reject(err) | ||
| } |
Contributor
There was a problem hiding this comment.
We should probably reuse the this.rejectRPC as it's the same code.
Comment on lines
+850
to
+855
| resolveRPC(value?: unknown) : unknown | void { | ||
| const callbacks = this.rpcCallbacks.shift() | ||
| if (callbacks) { | ||
| callbacks[0](value) // call resolve | ||
| } | ||
| return value |
Contributor
There was a problem hiding this comment.
I find using object destructuring way easier to read than accessing items through indicies. Another positive sideeffect is that you wont need the comment to explain that it's a resolve.
resolveRPC(value?: unknown): unknown | void {
const callbacks = this.rpcCallbacks.shift()
if (callbacks) {
const [resolve, _reject] = callbacks
resolve(value)
}
return value
}I couldn't find any place where we are using the return value, should we remove that from the function?
Comment on lines
+862
to
+867
| rejectRPC(err?: Error) : Error | void { | ||
| const callbacks = this.rpcCallbacks.shift() | ||
| if (callbacks) { | ||
| callbacks[1](err) // call reject | ||
| } | ||
| return err |
Contributor
There was a problem hiding this comment.
I couldn't find any place where this method is called 🤔
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes the issue where parallel queue binding operations would hang indefinitely when using
Promise.all(). The problem occurred when multipleq.bind()calls were made simultaneously, such as:Root Cause
The issue was in the
sendRpcmethod inAMQPChannel. It used a single set of resolve/reject callbacks (this.resolveRPCandthis.rejectRPC) that got overwritten when multiple RPC requests were made concurrently. When RPC responses arrived from the server, only the last request's callbacks were available, leaving all previous requests hanging.Solution
Implemented a proper FIFO callback queue system:
rpcCallbacksqueue: Stores resolve/reject callback pairs for each RPC request in ordersendRpc: Pushes callback pairs to the queue instead of overwriting single callbacksresolveRPC/rejectRPC: Process callbacks from the queue in FIFO orderTesting
Added comprehensive test coverage for parallel queue binding and verified the fix handles edge cases correctly:
The fix is minimal (42 lines changed), maintains backward compatibility, and resolves the hanging behavior while preserving all existing functionality.
Fixes #94.
✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.