Skip to content

Commit d464ef2

Browse files
authored
Chain RPC commands, so that responses can't be intermingled (#58)
This forces methods on a channel to be serialized, a new request won't be sent until the response from first is received. Before responses couldn't have come back out of order. Fixes #57
1 parent 224f3ca commit d464ef2

4 files changed

Lines changed: 55 additions & 83 deletions

File tree

src/amqp-base-client.ts

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -70,21 +70,7 @@ export abstract class AMQPBaseClient {
7070

7171
const channel = new AMQPChannel(this, id)
7272
this.channels[id] = channel
73-
74-
let j = 0
75-
const channelOpen = new AMQPView(new ArrayBuffer(13))
76-
channelOpen.setUint8(j, 1); j += 1 // type: method
77-
channelOpen.setUint16(j, id); j += 2 // channel id
78-
channelOpen.setUint32(j, 5); j += 4 // frameSize
79-
channelOpen.setUint16(j, 20); j += 2 // class: channel
80-
channelOpen.setUint16(j, 10); j += 2 // method: open
81-
channelOpen.setUint8(j, 0); j += 1 // reserved1
82-
channelOpen.setUint8(j, 206); j += 1 // frame end byte
83-
return new Promise((resolve, reject) => {
84-
this.send(new Uint8Array(channelOpen.buffer, 0, 13))
85-
.then(() => channel.promises.push([resolve, reject]))
86-
.catch(reject)
87-
})
73+
return channel.open()
8874
}
8975

9076
/**
@@ -315,12 +301,12 @@ export abstract class AMQPBaseClient {
315301
switch (methodId) {
316302
case 11: { // openok
317303
i += 4 // reserved1 (long string)
318-
channel.resolvePromise(channel)
304+
channel.resolveRPC(channel)
319305
break
320306
}
321307
case 21: { // flowOk
322308
const active = view.getUint8(i) !== 0; i += 1
323-
channel.resolvePromise(active)
309+
channel.resolveRPC(active)
324310
break
325311
}
326312
case 40: { // close
@@ -349,7 +335,7 @@ export abstract class AMQPBaseClient {
349335
case 41: { // closeOk
350336
channel.setClosed()
351337
delete this.channels[channelId]
352-
channel.resolvePromise()
338+
channel.resolveRPC()
353339
break
354340
}
355341
default:
@@ -364,7 +350,7 @@ export abstract class AMQPBaseClient {
364350
case 21: // deleteOk
365351
case 31: // bindOk
366352
case 51: { // unbindOk
367-
channel.resolvePromise()
353+
channel.resolveRPC()
368354
break
369355
}
370356
default:
@@ -379,25 +365,25 @@ export abstract class AMQPBaseClient {
379365
const [name, strLen] = view.getShortString(i); i += strLen
380366
const messageCount = view.getUint32(i); i += 4
381367
const consumerCount = view.getUint32(i); i += 4
382-
channel.resolvePromise({ name, messageCount, consumerCount })
368+
channel.resolveRPC({ name, messageCount, consumerCount })
383369
break
384370
}
385371
case 21: { // bindOk
386-
channel.resolvePromise()
372+
channel.resolveRPC()
387373
break
388374
}
389375
case 31: { // purgeOk
390376
const messageCount = view.getUint32(i); i += 4
391-
channel.resolvePromise({ messageCount })
377+
channel.resolveRPC({ messageCount })
392378
break
393379
}
394380
case 41: { // deleteOk
395381
const messageCount = view.getUint32(i); i += 4
396-
channel.resolvePromise({ messageCount })
382+
channel.resolveRPC({ messageCount })
397383
break
398384
}
399385
case 51: { // unbindOk
400-
channel.resolvePromise()
386+
channel.resolveRPC()
401387
break
402388
}
403389
default:
@@ -409,12 +395,12 @@ export abstract class AMQPBaseClient {
409395
case 60: { // basic
410396
switch (methodId) {
411397
case 11: { // qosOk
412-
channel.resolvePromise()
398+
channel.resolveRPC()
413399
break
414400
}
415401
case 21: { // consumeOk
416402
const [consumerTag, len] = view.getShortString(i); i += len
417-
channel.resolvePromise(consumerTag)
403+
channel.resolveRPC(consumerTag)
418404
break
419405
}
420406
case 30: { // cancel
@@ -442,7 +428,7 @@ export abstract class AMQPBaseClient {
442428
}
443429
case 31: { // cancelOk
444430
const [consumerTag, len] = view.getShortString(i); i += len
445-
channel.resolvePromise(consumerTag)
431+
channel.resolveRPC(consumerTag)
446432
break
447433
}
448434
case 50: { // return
@@ -490,7 +476,7 @@ export abstract class AMQPBaseClient {
490476
}
491477
case 72: { // getEmpty
492478
const [ , len]= view.getShortString(i); i += len // reserved1
493-
channel.resolvePromise(null)
479+
channel.resolveRPC(null)
494480
break
495481
}
496482
case 80: { // confirm ack
@@ -500,7 +486,7 @@ export abstract class AMQPBaseClient {
500486
break
501487
}
502488
case 111: { // recoverOk
503-
channel.resolvePromise()
489+
channel.resolveRPC()
504490
break
505491
}
506492
case 120: { // confirm nack
@@ -519,7 +505,7 @@ export abstract class AMQPBaseClient {
519505
switch (methodId) {
520506
case 11: { // selectOk
521507
channel.confirmId = 1
522-
channel.resolvePromise()
508+
channel.resolveRPC()
523509
break
524510
}
525511
default:
@@ -533,7 +519,7 @@ export abstract class AMQPBaseClient {
533519
case 11: // selectOk
534520
case 21: // commitOk
535521
case 31: { // rollbackOk
536-
channel.resolvePromise()
522+
channel.resolveRPC()
537523
break
538524
}
539525
default:

src/amqp-channel.ts

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ export class AMQPChannel {
1313
readonly connection: AMQPBaseClient
1414
readonly id: number
1515
readonly consumers = new Map<string, AMQPConsumer>()
16-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
17-
readonly promises: [(value?: any) => void, (err?: Error) => void][] = []
16+
private rpcQueue: Promise<unknown> = Promise.resolve(true)
1817
private readonly unconfirmedPublishes: [number, (confirmId: number) => void, (err?: Error) => void][] = []
1918
closed = false
2019
confirmId = 0
@@ -32,6 +31,19 @@ export class AMQPChannel {
3231
this.onerror = (reason: string) => console.error(`channel ${this.id} closed: ${reason}`)
3332
}
3433

34+
open(): Promise<AMQPChannel> {
35+
let j = 0
36+
const channelOpen = new AMQPView(new ArrayBuffer(13))
37+
channelOpen.setUint8(j, 1); j += 1 // type: method
38+
channelOpen.setUint16(j, this.id); j += 2 // channel id
39+
channelOpen.setUint32(j, 5); j += 4 // frameSize
40+
channelOpen.setUint16(j, 20); j += 2 // class: channel
41+
channelOpen.setUint16(j, 10); j += 2 // method: open
42+
channelOpen.setUint8(j, 0); j += 1 // reserved1
43+
channelOpen.setUint8(j, 206); j += 1 // frame end byte
44+
return this.sendRpc(channelOpen, j)
45+
}
46+
3547
/**
3648
* Declare a queue and return an AMQPQueue instance.
3749
*/
@@ -716,34 +728,6 @@ export class AMQPChannel {
716728
return this.sendRpc(frame, j)
717729
}
718730

719-
/**
720-
* Resolves the next RPC promise
721-
* @ignore
722-
*/
723-
resolvePromise(value?: unknown): boolean {
724-
const promise = this.promises.shift()
725-
if (promise) {
726-
const [resolve, ] = promise
727-
resolve(value)
728-
return true
729-
}
730-
return false
731-
}
732-
733-
/**
734-
* Rejects the next RPC promise
735-
* @return true if a promise was rejected, otherwise false
736-
*/
737-
private rejectPromise(err?: Error): boolean {
738-
const promise = this.promises.shift()
739-
if (promise) {
740-
const [, reject] = promise
741-
reject(err)
742-
return true
743-
}
744-
return false
745-
}
746-
747731
/**
748732
* Send a RPC request, will resolve a RPC promise when RPC response arrives
749733
* @param frame with data
@@ -752,9 +736,14 @@ export class AMQPChannel {
752736
// eslint-disable-next-line @typescript-eslint/no-explicit-any
753737
private sendRpc(frame: AMQPView, frameSize: number): Promise<any> {
754738
return new Promise((resolve, reject) => {
755-
this.connection.send(new Uint8Array(frame.buffer, 0, frameSize))
756-
.then(() => this.promises.push([resolve, reject]))
757-
.catch(reject)
739+
this.rpcQueue = this.rpcQueue.then(() => {
740+
this.connection.send(new Uint8Array(frame.buffer, 0, frameSize))
741+
.then(() => {
742+
this.resolveRPC = resolve
743+
this.rejectRPC = reject
744+
})
745+
.catch(reject)
746+
})
758747
})
759748
}
760749

@@ -773,8 +762,7 @@ export class AMQPChannel {
773762
this.closed = true
774763
this.consumers.forEach((consumer) => consumer.setClosed(err))
775764
this.consumers.clear()
776-
// Empty and reject all RPC promises
777-
while(this.rejectPromise(err)) { 1 }
765+
this.rejectRPC(err)
778766
// Reject and clear all unconfirmed publishes
779767
this.unconfirmedPublishes.forEach(([, , reject]) => reject(err))
780768
this.unconfirmedPublishes.length = 0
@@ -826,13 +814,25 @@ export class AMQPChannel {
826814
this.deliver(message)
827815
} else if (this.getMessage) {
828816
delete this.getMessage
829-
this.resolvePromise(message)
817+
this.resolveRPC(message)
830818
} else {
831819
delete this.returned
832820
this.onReturn(message)
833821
}
834822
}
835823

824+
/**
825+
* Resolvs next RPC command
826+
* @ignore
827+
*/
828+
resolveRPC(value?: unknown) : void { value }
829+
830+
/**
831+
* Reject next RPC command
832+
* @ignore
833+
*/
834+
rejectRPC(err?: Error) : void { err }
835+
836836
/**
837837
* Deliver a message to a consumer
838838
* @ignore

test-browser/websocket.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -462,13 +462,6 @@ test('set basic flow on channel', async () => {
462462
await expect(ch.basicFlow(true)).resolves.toBeDefined()
463463
})
464464

465-
test('can resolve promise on channel', async () => {
466-
const amqp = getNewClient()
467-
const conn = await amqp.connect()
468-
const ch = await conn.channel()
469-
expect(ch.resolvePromise()).toBeFalsy()
470-
})
471-
472465
test('confirming unknown deliveryTag', async () => {
473466
const amqp = getNewClient()
474467
const conn = await amqp.connect()

test/test.ts

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -466,13 +466,6 @@ test('set basic flow on channel', async () => {
466466
await expect(ch.basicFlow(true)).resolves.toBeDefined()
467467
})
468468

469-
test('can resolve promise on channel', async () => {
470-
const amqp = getNewClient()
471-
const conn = await amqp.connect()
472-
const ch = await conn.channel()
473-
expect(ch.resolvePromise()).toBeFalsy()
474-
})
475-
476469
test('confirming unknown deliveryTag', async () => {
477470
const amqp = getNewClient()
478471
const conn = await amqp.connect()
@@ -593,9 +586,9 @@ test("has an onerror callback", async () => {
593586
const conn = await amqp.connect()
594587
const ch = await conn.channel()
595588
let errMessage: string | null = null
596-
conn.onerror = vi.fn((err) => errMessage = err.message)
589+
ch.onerror = vi.fn((reason) => errMessage = reason)
597590
await expect(ch.exchangeDeclare("none", "none")).rejects.toThrow()
598-
expect(conn.onerror).toBeCalled()
591+
expect(ch.onerror).toBeCalled()
599592
expect(errMessage).toMatch(/invalid exchange type/)
600593
})
601594

0 commit comments

Comments
 (0)