@@ -14,6 +14,7 @@ export class AMQPChannel {
1414 readonly id : number
1515 readonly consumers = new Map < string , AMQPConsumer > ( )
1616 private rpcQueue : Promise < unknown > = Promise . resolve ( true )
17+ private readonly rpcCallbacks : [ ( value ?: unknown ) => void , ( err ?: Error ) => void ] [ ] = [ ]
1718 private readonly unconfirmedPublishes : [ number , ( confirmId : number ) => void , ( err ?: Error ) => void ] [ ] = [ ]
1819 closed = false
1920 confirmId = 0
@@ -747,11 +748,20 @@ export class AMQPChannel {
747748 private sendRpc ( frame : AMQPView , frameSize : number ) : Promise < any > {
748749 return new Promise ( ( resolve , reject ) => {
749750 this . rpcQueue = this . rpcQueue . then ( ( ) => {
750- this . resolveRPC = resolve
751- this . rejectRPC = reject
752- this . connection . send ( new Uint8Array ( frame . buffer , 0 , frameSize ) )
753- . catch ( reject )
754- } )
751+ // Add the callbacks to the queue before sending
752+ this . rpcCallbacks . push ( [ resolve , reject ] )
753+
754+ return this . connection . send ( new Uint8Array ( frame . buffer , 0 , frameSize ) )
755+ . catch ( ( err ) => {
756+ // Remove the callbacks from the queue if send fails
757+ const callbacks = this . rpcCallbacks . pop ( )
758+ if ( callbacks ) {
759+ callbacks [ 1 ] ( err ) // call reject
760+ } else {
761+ reject ( err )
762+ }
763+ } )
764+ } ) . catch ( reject )
755765 } )
756766 }
757767
@@ -770,7 +780,11 @@ export class AMQPChannel {
770780 this . closed = true
771781 this . consumers . forEach ( ( consumer ) => consumer . setClosed ( err ) )
772782 this . consumers . clear ( )
773- this . rejectRPC ( err )
783+
784+ // Reject all pending RPC callbacks
785+ this . rpcCallbacks . forEach ( ( [ , reject ] ) => reject ( err ) )
786+ this . rpcCallbacks . length = 0
787+
774788 // Reject and clear all unconfirmed publishes
775789 this . unconfirmedPublishes . forEach ( ( [ , , reject ] ) => reject ( err ) )
776790 this . unconfirmedPublishes . length = 0
@@ -833,13 +847,25 @@ export class AMQPChannel {
833847 * Resolvs next RPC command
834848 * @ignore
835849 */
836- resolveRPC ( value ?: unknown ) : unknown | void { return value }
850+ resolveRPC ( value ?: unknown ) : unknown | void {
851+ const callbacks = this . rpcCallbacks . shift ( )
852+ if ( callbacks ) {
853+ callbacks [ 0 ] ( value ) // call resolve
854+ }
855+ return value
856+ }
837857
838858 /**
839859 * Reject next RPC command
840860 * @ignore
841861 */
842- rejectRPC ( err ?: Error ) : Error | void { return err }
862+ rejectRPC ( err ?: Error ) : Error | void {
863+ const callbacks = this . rpcCallbacks . shift ( )
864+ if ( callbacks ) {
865+ callbacks [ 1 ] ( err ) // call reject
866+ }
867+ return err
868+ }
843869
844870 /**
845871 * Deliver a message to a consumer
0 commit comments