Skip to content

Commit 19c6017

Browse files
Copilotbaelter
andcommitted
Fix parallel queue binding issue by implementing proper RPC callback queue
Co-authored-by: baelter <1399369+baelter@users.noreply.github.com>
1 parent 1df71d4 commit 19c6017

2 files changed

Lines changed: 57 additions & 8 deletions

File tree

src/amqp-channel.ts

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export class AMQPChannel {
1414
readonly id: number
1515
readonly consumers = new Map<string, AMQPConsumer>()
1616
private rpcQueue: Promise<unknown> = Promise.resolve(true)
17+
private readonly rpcCallbacks: [(value?: unknown) => void, (err?: Error) => void][] = []
1718
private readonly unconfirmedPublishes: [number, (confirmId: number) => void, (err?: Error) => void][] = []
1819
closed = false
1920
confirmId = 0
@@ -747,11 +748,20 @@ export class AMQPChannel {
747748
private sendRpc(frame: AMQPView, frameSize: number): Promise<any> {
748749
return new Promise((resolve, reject) => {
749750
this.rpcQueue = this.rpcQueue.then(() => {
750-
this.resolveRPC = resolve
751-
this.rejectRPC = reject
752-
this.connection.send(new Uint8Array(frame.buffer, 0, frameSize))
753-
.catch(reject)
754-
})
751+
// Add the callbacks to the queue before sending
752+
this.rpcCallbacks.push([resolve, reject])
753+
754+
return this.connection.send(new Uint8Array(frame.buffer, 0, frameSize))
755+
.catch((err) => {
756+
// Remove the callbacks from the queue if send fails
757+
const callbacks = this.rpcCallbacks.pop()
758+
if (callbacks) {
759+
callbacks[1](err) // call reject
760+
} else {
761+
reject(err)
762+
}
763+
})
764+
}).catch(reject)
755765
})
756766
}
757767

@@ -770,7 +780,11 @@ export class AMQPChannel {
770780
this.closed = true
771781
this.consumers.forEach((consumer) => consumer.setClosed(err))
772782
this.consumers.clear()
773-
this.rejectRPC(err)
783+
784+
// Reject all pending RPC callbacks
785+
this.rpcCallbacks.forEach(([, reject]) => reject(err))
786+
this.rpcCallbacks.length = 0
787+
774788
// Reject and clear all unconfirmed publishes
775789
this.unconfirmedPublishes.forEach(([, , reject]) => reject(err))
776790
this.unconfirmedPublishes.length = 0
@@ -833,13 +847,25 @@ export class AMQPChannel {
833847
* Resolvs next RPC command
834848
* @ignore
835849
*/
836-
resolveRPC(value?: unknown) : unknown | void { return value }
850+
resolveRPC(value?: unknown) : unknown | void {
851+
const callbacks = this.rpcCallbacks.shift()
852+
if (callbacks) {
853+
callbacks[0](value) // call resolve
854+
}
855+
return value
856+
}
837857

838858
/**
839859
* Reject next RPC command
840860
* @ignore
841861
*/
842-
rejectRPC(err?: Error) : Error | void { return err }
862+
rejectRPC(err?: Error) : Error | void {
863+
const callbacks = this.rpcCallbacks.shift()
864+
if (callbacks) {
865+
callbacks[1](err) // call reject
866+
}
867+
return err
868+
}
843869

844870
/**
845871
* Deliver a message to a consumer

test/test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,3 +729,26 @@ test('should handle heartbeat timeout correctly', async () => {
729729
expect(error.message).toEqual('Heartbeat timeout')
730730
expect(conn.closed).toBe(true)
731731
})
732+
733+
test('can bind queues in parallel', async () => {
734+
const amqp = getNewClient()
735+
const conn = await amqp.connect()
736+
const ch = await conn.channel()
737+
const q = await ch.queue('test-queue')
738+
await ch.exchangeDeclare('test-exchange', 'fanout')
739+
740+
// This should not hang - parallel binds should work
741+
await Promise.all([
742+
q.bind('test-exchange', 'foo:*'),
743+
q.bind('test-exchange', 'bar:*'),
744+
])
745+
746+
// Test with more binds to stress test the RPC queue
747+
await Promise.all([
748+
q.bind('test-exchange', 'baz:*'),
749+
q.bind('test-exchange', 'qux:*'),
750+
q.bind('test-exchange', 'quux:*'),
751+
])
752+
753+
await expect(Promise.resolve('success')).resolves.toBe('success')
754+
})

0 commit comments

Comments
 (0)