Skip to content

Commit b3c2d8d

Browse files
committed
Add AMQPFrame class for encapsulated frame construction
1 parent 5bdb39c commit b3c2d8d

3 files changed

Lines changed: 461 additions & 373 deletions

File tree

src/amqp-base-client.ts

Lines changed: 96 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { AMQPChannel } from './amqp-channel.js'
22
import { AMQPError } from './amqp-error.js'
3+
import { AMQPFrame } from './amqp-frame.js'
34
import { AMQPMessage } from './amqp-message.js'
45
import { AMQPView } from './amqp-view.js'
56
import type { Logger } from './types.js'
@@ -87,40 +88,39 @@ export abstract class AMQPBaseClient {
8788
close(reason = "", code = 200): Promise<void> {
8889
if (this.closed) return this.rejectClosed()
8990
this.closed = true
90-
let j = 0
91-
const frame = new AMQPView(new ArrayBuffer(512))
92-
frame.setUint8(j, 1); j += 1 // type: method
93-
frame.setUint16(j, 0); j += 2 // channel: 0
94-
frame.setUint32(j, 0); j += 4 // frameSize
95-
frame.setUint16(j, 10); j += 2 // class: connection
96-
frame.setUint16(j, 50); j += 2 // method: close
97-
frame.setUint16(j, code); j += 2 // reply code
98-
j += frame.setShortString(j, reason) // reply reason
99-
frame.setUint16(j, 0); j += 2 // failing-class-id
100-
frame.setUint16(j, 0); j += 2 // failing-method-id
101-
frame.setUint8(j, 206); j += 1 // frame end byte
102-
frame.setUint32(3, j - 8) // update frameSize
91+
const frame = new AMQPFrame({
92+
bufferSize: 512,
93+
type: 1,
94+
channel: 0,
95+
classId: 10,
96+
method: 50,
97+
})
98+
frame.writeUint16(code) // reply code
99+
frame.writeShortString(reason) // reply reason
100+
frame.writeUint16(0) // failing-class-id
101+
frame.writeUint16(0) // failing-method-id
102+
frame.finalize()
103103
return new Promise((resolve, reject) => {
104-
this.send(new Uint8Array(frame.buffer, 0, j))
104+
this.send(frame.toUint8Array())
105105
.then(() => this.closePromise = [resolve, reject])
106106
.catch(reject)
107107
})
108108
}
109109

110110
updateSecret(newSecret: string, reason: string) {
111-
let j = 0
112-
const frame = new AMQPView(new ArrayBuffer(8192))
113-
frame.setUint8(j, 1); j += 1 // type: method
114-
frame.setUint16(j, 0); j += 2 // channel: 0
115-
frame.setUint32(j, 0); j += 4 // frameSize
116-
frame.setUint16(j, 10); j += 2 // class: connection
117-
frame.setUint16(j, 70); j += 2 // method: update-secret
118-
j += frame.setLongString(j, newSecret) // new secret
119-
j += frame.setShortString(j, reason) // reason
120-
frame.setUint8(j, 206); j += 1 // frame end byte
121-
frame.setUint32(3, j - 8) // update frameSize
111+
const frame = new AMQPFrame({
112+
bufferSize: 8192,
113+
type: 1,
114+
channel: 0,
115+
classId: 10,
116+
method: 70,
117+
})
118+
119+
frame.writeLongString(newSecret) // new secret
120+
frame.writeShortString(reason) // reason
121+
frame.finalize()
122122
return new Promise((resolve, reject) => {
123-
this.send(new Uint8Array(frame.buffer, 0, j))
123+
this.send(frame.toUint8Array())
124124
.then(() => this.onUpdateSecretOk = resolve)
125125
.catch(reject)
126126
})
@@ -161,7 +161,6 @@ export abstract class AMQPBaseClient {
161161
protected parseFrames(view: AMQPView): void {
162162
// Can possibly be multiple AMQP frames in a single WS frame
163163
for (let i = 0; i < view.byteLength;) {
164-
let j = 0 // position in outgoing frame
165164
const type = view.getUint8(i); i += 1
166165
const channelId = view.getUint16(i); i += 2
167166
const frameSize = view.getUint32(i); i += 4
@@ -191,12 +190,14 @@ export abstract class AMQPBaseClient {
191190
// ignore start frame, just reply startok
192191
i += frameSize - 4
193192

194-
const startOk = new AMQPView(new ArrayBuffer(8192))
195-
startOk.setUint8(j, 1); j += 1 // type: method
196-
startOk.setUint16(j, 0); j += 2 // channel: 0
197-
startOk.setUint32(j, 0); j += 4 // frameSize: to be updated
198-
startOk.setUint16(j, 10); j += 2 // class: connection
199-
startOk.setUint16(j, 11); j += 2 // method: startok
193+
const startOk = new AMQPFrame({
194+
bufferSize: 8192,
195+
type: 1,
196+
channel: 0,
197+
classId: 10,
198+
method: 11,
199+
})
200+
200201
const clientProps = {
201202
connection_name: this.name || undefined,
202203
product: "amqp-client.js",
@@ -213,14 +214,13 @@ export abstract class AMQPBaseClient {
213214
"publisher_confirms": true,
214215
}
215216
}
216-
j += startOk.setTable(j, clientProps) // client properties
217-
j += startOk.setShortString(j, "PLAIN") // mechanism
217+
startOk.writeTable(clientProps) // client properties
218+
startOk.writeShortString("PLAIN") // mechanism
218219
const response = `\u0000${this.username}\u0000${this.password}`
219-
j += startOk.setLongString(j, response) // response
220-
j += startOk.setShortString(j, "") // locale
221-
startOk.setUint8(j, 206); j += 1 // frame end byte
222-
startOk.setUint32(3, j - 8) // update frameSize
223-
this.send(new Uint8Array(startOk.buffer, 0, j)).catch(this.rejectConnect)
220+
startOk.writeLongString(response) // response
221+
startOk.writeShortString("") // locale
222+
startOk.finalize()
223+
this.send(startOk.toUint8Array()).catch(this.rejectConnect)
224224
break
225225
}
226226
case 30: { // tune
@@ -231,31 +231,32 @@ export abstract class AMQPBaseClient {
231231
this.frameMax = this.frameMax === 0 ? frameMax : Math.min(this.frameMax, frameMax)
232232
this.heartbeat = this.heartbeat === 0 ? 0 : Math.min(this.heartbeat, heartbeat)
233233

234-
const tuneOk = new AMQPView(new ArrayBuffer(20))
235-
tuneOk.setUint8(j, 1); j += 1 // type: method
236-
tuneOk.setUint16(j, 0); j += 2 // channel: 0
237-
tuneOk.setUint32(j, 12); j += 4 // frameSize: 12
238-
tuneOk.setUint16(j, 10); j += 2 // class: connection
239-
tuneOk.setUint16(j, 31); j += 2 // method: tuneok
240-
tuneOk.setUint16(j, this.channelMax); j += 2 // channel max
241-
tuneOk.setUint32(j, this.frameMax); j += 4 // frame max
242-
tuneOk.setUint16(j, this.heartbeat); j += 2 // heartbeat
243-
tuneOk.setUint8(j, 206); j += 1 // frame end byte
244-
this.send(new Uint8Array(tuneOk.buffer, 0, j)).catch(this.rejectConnect)
234+
const tuneOk = new AMQPFrame({
235+
bufferSize: 20,
236+
type: 1,
237+
channel: 0,
238+
frameSize: 12,
239+
classId: 10,
240+
method: 31,
241+
})
242+
tuneOk.writeUint16(this.channelMax) // channel max
243+
tuneOk.writeUint32(this.frameMax) // frame max
244+
tuneOk.writeUint16(this.heartbeat) // heartbeat
245+
tuneOk.finalize()
246+
this.send(tuneOk.toUint8Array()).catch(this.rejectConnect)
245247

246-
j = 0
247-
const open = new AMQPView(new ArrayBuffer(512))
248-
open.setUint8(j, 1); j += 1 // type: method
249-
open.setUint16(j, 0); j += 2 // channel: 0
250-
open.setUint32(j, 0); j += 4 // frameSize: to be updated
251-
open.setUint16(j, 10); j += 2 // class: connection
252-
open.setUint16(j, 40); j += 2 // method: open
253-
j += open.setShortString(j, this.vhost) // vhost
254-
open.setUint8(j, 0); j += 1 // reserved1
255-
open.setUint8(j, 0); j += 1 // reserved2
256-
open.setUint8(j, 206); j += 1 // frame end byte
257-
open.setUint32(3, j - 8) // update frameSize
258-
this.send(new Uint8Array(open.buffer, 0, j)).catch(this.rejectConnect)
248+
const open = new AMQPFrame({
249+
bufferSize: 512,
250+
type: 1,
251+
channel: 0,
252+
classId: 10,
253+
method: 40,
254+
})
255+
open.writeShortString(this.vhost) // vhost
256+
open.writeUint8(0) // reserved1
257+
open.writeUint8(0) // reserved2
258+
open.finalize()
259+
this.send(open.toUint8Array()).catch(this.rejectConnect)
259260

260261
break
261262
}
@@ -282,14 +283,16 @@ export abstract class AMQPBaseClient {
282283
this.channels.forEach((ch) => ch.setClosed(err))
283284
this.channels = [new AMQPChannel(this, 0)]
284285

285-
const closeOk = new AMQPView(new ArrayBuffer(12))
286-
closeOk.setUint8(j, 1); j += 1 // type: method
287-
closeOk.setUint16(j, 0); j += 2 // channel: 0
288-
closeOk.setUint32(j, 4); j += 4 // frameSize
289-
closeOk.setUint16(j, 10); j += 2 // class: connection
290-
closeOk.setUint16(j, 51); j += 2 // method: closeok
291-
closeOk.setUint8(j, 206); j += 1 // frame end byte
292-
this.send(new Uint8Array(closeOk.buffer, 0, j))
286+
const closeOk = new AMQPFrame({
287+
bufferSize: 12,
288+
type: 1,
289+
channel: 0,
290+
frameSize: 4,
291+
classId: 10,
292+
method: 51,
293+
})
294+
closeOk.finalize()
295+
this.send(closeOk.toUint8Array())
293296
.catch(err => this.logger?.warn("Error while sending Connection#CloseOk", err))
294297
this.onerror(err)
295298
this.rejectConnect(err)
@@ -355,14 +358,16 @@ export abstract class AMQPBaseClient {
355358
channel.setClosed(err)
356359
delete this.channels[channelId]
357360

358-
const closeOk = new AMQPView(new ArrayBuffer(12))
359-
closeOk.setUint8(j, 1); j += 1 // type: method
360-
closeOk.setUint16(j, channelId); j += 2 // channel
361-
closeOk.setUint32(j, 4); j += 4 // frameSize
362-
closeOk.setUint16(j, 20); j += 2 // class: channel
363-
closeOk.setUint16(j, 41); j += 2 // method: closeok
364-
closeOk.setUint8(j, 206); j += 1 // frame end byte
365-
this.send(new Uint8Array(closeOk.buffer, 0, j))
361+
const closeOk = new AMQPFrame({
362+
bufferSize: 12,
363+
type: 1,
364+
channel: channelId,
365+
frameSize: 4,
366+
classId: 20,
367+
method: 41,
368+
})
369+
closeOk.finalize()
370+
this.send(closeOk.toUint8Array())
366371
.catch(err => this.logger?.error("Error while sending Channel#closeOk", err))
367372
break
368373
}
@@ -447,16 +452,17 @@ export abstract class AMQPBaseClient {
447452
channel.consumers.delete(consumerTag)
448453
}
449454
if (!noWait) {
450-
const frame = new AMQPView(new ArrayBuffer(512))
451-
frame.setUint8(j, 1); j += 1 // type: method
452-
frame.setUint16(j, channel.id); j += 2 // channel
453-
frame.setUint32(j, 0); j += 4 // frameSize
454-
frame.setUint16(j, 60); j += 2 // class: basic
455-
frame.setUint16(j, 31); j += 2 // method: cancelOk
456-
j += frame.setShortString(j, consumerTag) // tag
457-
frame.setUint8(j, 206); j += 1 // frame end byte
458-
frame.setUint32(3, j - 8) // update frameSize
459-
this.send(new Uint8Array(frame.buffer, 0, j))
455+
const frame = new AMQPFrame({
456+
bufferSize: 512,
457+
type: 1,
458+
channel: channel.id,
459+
classId: 60,
460+
method: 31,
461+
})
462+
463+
frame.writeShortString(consumerTag) // tag
464+
frame.finalize()
465+
this.send(frame.toUint8Array())
460466
}
461467
break
462468
}

0 commit comments

Comments
 (0)