11import { AMQPChannel } from './amqp-channel.js'
22import { AMQPError } from './amqp-error.js'
3+ import { AMQPFrame } from './amqp-frame.js'
34import { AMQPMessage } from './amqp-message.js'
45import { AMQPView } from './amqp-view.js'
56import type { Logger } from './types.js'
@@ -87,40 +88,39 @@ export abstract class AMQPBaseClient {
8788 close ( reason = "" , code = 200 ) : Promise < void > {
8889 if ( this . closed ) return this . rejectClosed ( )
8990 this . closed = true
90- let j = 0
91- const frame = new AMQPView ( new ArrayBuffer ( 512 ) )
92- frame . setUint8 ( j , 1 ) ; j += 1 // type: method
93- frame . setUint16 ( j , 0 ) ; j += 2 // channel: 0
94- frame . setUint32 ( j , 0 ) ; j += 4 // frameSize
95- frame . setUint16 ( j , 10 ) ; j += 2 // class: connection
96- frame . setUint16 ( j , 50 ) ; j += 2 // method: close
97- frame . setUint16 ( j , code ) ; j += 2 // reply code
98- j += frame . setShortString ( j , reason ) // reply reason
99- frame . setUint16 ( j , 0 ) ; j += 2 // failing-class-id
100- frame . setUint16 ( j , 0 ) ; j += 2 // failing-method-id
101- frame . setUint8 ( j , 206 ) ; j += 1 // frame end byte
102- frame . setUint32 ( 3 , j - 8 ) // update frameSize
91+ const frame = new AMQPFrame ( {
92+ bufferSize : 512 ,
93+ type : 1 ,
94+ channel : 0 ,
95+ classId : 10 ,
96+ method : 50 ,
97+ } )
98+ frame . writeUint16 ( code ) // reply code
99+ frame . writeShortString ( reason ) // reply reason
100+ frame . writeUint16 ( 0 ) // failing-class-id
101+ frame . writeUint16 ( 0 ) // failing-method-id
102+ frame . finalize ( )
103103 return new Promise ( ( resolve , reject ) => {
104- this . send ( new Uint8Array ( frame . buffer , 0 , j ) )
104+ this . send ( frame . toUint8Array ( ) )
105105 . then ( ( ) => this . closePromise = [ resolve , reject ] )
106106 . catch ( reject )
107107 } )
108108 }
109109
110110 updateSecret ( newSecret : string , reason : string ) {
111- let j = 0
112- const frame = new AMQPView ( new ArrayBuffer ( 8192 ) )
113- frame . setUint8 ( j , 1 ) ; j += 1 // type: method
114- frame . setUint16 ( j , 0 ) ; j += 2 // channel: 0
115- frame . setUint32 ( j , 0 ) ; j += 4 // frameSize
116- frame . setUint16 ( j , 10 ) ; j += 2 // class: connection
117- frame . setUint16 ( j , 70 ) ; j += 2 // method: update-secret
118- j += frame . setLongString ( j , newSecret ) // new secret
119- j += frame . setShortString ( j , reason ) // reason
120- frame . setUint8 ( j , 206 ) ; j += 1 // frame end byte
121- frame . setUint32 ( 3 , j - 8 ) // update frameSize
111+ const frame = new AMQPFrame ( {
112+ bufferSize : 8192 ,
113+ type : 1 ,
114+ channel : 0 ,
115+ classId : 10 ,
116+ method : 70 ,
117+ } )
118+
119+ frame . writeLongString ( newSecret ) // new secret
120+ frame . writeShortString ( reason ) // reason
121+ frame . finalize ( )
122122 return new Promise ( ( resolve , reject ) => {
123- this . send ( new Uint8Array ( frame . buffer , 0 , j ) )
123+ this . send ( frame . toUint8Array ( ) )
124124 . then ( ( ) => this . onUpdateSecretOk = resolve )
125125 . catch ( reject )
126126 } )
@@ -161,7 +161,6 @@ export abstract class AMQPBaseClient {
161161 protected parseFrames ( view : AMQPView ) : void {
162162 // Can possibly be multiple AMQP frames in a single WS frame
163163 for ( let i = 0 ; i < view . byteLength ; ) {
164- let j = 0 // position in outgoing frame
165164 const type = view . getUint8 ( i ) ; i += 1
166165 const channelId = view . getUint16 ( i ) ; i += 2
167166 const frameSize = view . getUint32 ( i ) ; i += 4
@@ -191,12 +190,14 @@ export abstract class AMQPBaseClient {
191190 // ignore start frame, just reply startok
192191 i += frameSize - 4
193192
194- const startOk = new AMQPView ( new ArrayBuffer ( 8192 ) )
195- startOk . setUint8 ( j , 1 ) ; j += 1 // type: method
196- startOk . setUint16 ( j , 0 ) ; j += 2 // channel: 0
197- startOk . setUint32 ( j , 0 ) ; j += 4 // frameSize: to be updated
198- startOk . setUint16 ( j , 10 ) ; j += 2 // class: connection
199- startOk . setUint16 ( j , 11 ) ; j += 2 // method: startok
193+ const startOk = new AMQPFrame ( {
194+ bufferSize : 8192 ,
195+ type : 1 ,
196+ channel : 0 ,
197+ classId : 10 ,
198+ method : 11 ,
199+ } )
200+
200201 const clientProps = {
201202 connection_name : this . name || undefined ,
202203 product : "amqp-client.js" ,
@@ -213,14 +214,13 @@ export abstract class AMQPBaseClient {
213214 "publisher_confirms" : true ,
214215 }
215216 }
216- j += startOk . setTable ( j , clientProps ) // client properties
217- j += startOk . setShortString ( j , "PLAIN" ) // mechanism
217+ startOk . writeTable ( clientProps ) // client properties
218+ startOk . writeShortString ( "PLAIN" ) // mechanism
218219 const response = `\u0000${ this . username } \u0000${ this . password } `
219- j += startOk . setLongString ( j , response ) // response
220- j += startOk . setShortString ( j , "" ) // locale
221- startOk . setUint8 ( j , 206 ) ; j += 1 // frame end byte
222- startOk . setUint32 ( 3 , j - 8 ) // update frameSize
223- this . send ( new Uint8Array ( startOk . buffer , 0 , j ) ) . catch ( this . rejectConnect )
220+ startOk . writeLongString ( response ) // response
221+ startOk . writeShortString ( "" ) // locale
222+ startOk . finalize ( )
223+ this . send ( startOk . toUint8Array ( ) ) . catch ( this . rejectConnect )
224224 break
225225 }
226226 case 30 : { // tune
@@ -231,31 +231,32 @@ export abstract class AMQPBaseClient {
231231 this . frameMax = this . frameMax === 0 ? frameMax : Math . min ( this . frameMax , frameMax )
232232 this . heartbeat = this . heartbeat === 0 ? 0 : Math . min ( this . heartbeat , heartbeat )
233233
234- const tuneOk = new AMQPView ( new ArrayBuffer ( 20 ) )
235- tuneOk . setUint8 ( j , 1 ) ; j += 1 // type: method
236- tuneOk . setUint16 ( j , 0 ) ; j += 2 // channel: 0
237- tuneOk . setUint32 ( j , 12 ) ; j += 4 // frameSize: 12
238- tuneOk . setUint16 ( j , 10 ) ; j += 2 // class: connection
239- tuneOk . setUint16 ( j , 31 ) ; j += 2 // method: tuneok
240- tuneOk . setUint16 ( j , this . channelMax ) ; j += 2 // channel max
241- tuneOk . setUint32 ( j , this . frameMax ) ; j += 4 // frame max
242- tuneOk . setUint16 ( j , this . heartbeat ) ; j += 2 // heartbeat
243- tuneOk . setUint8 ( j , 206 ) ; j += 1 // frame end byte
244- this . send ( new Uint8Array ( tuneOk . buffer , 0 , j ) ) . catch ( this . rejectConnect )
234+ const tuneOk = new AMQPFrame ( {
235+ bufferSize : 20 ,
236+ type : 1 ,
237+ channel : 0 ,
238+ frameSize : 12 ,
239+ classId : 10 ,
240+ method : 31 ,
241+ } )
242+ tuneOk . writeUint16 ( this . channelMax ) // channel max
243+ tuneOk . writeUint32 ( this . frameMax ) // frame max
244+ tuneOk . writeUint16 ( this . heartbeat ) // heartbeat
245+ tuneOk . finalize ( )
246+ this . send ( tuneOk . toUint8Array ( ) ) . catch ( this . rejectConnect )
245247
246- j = 0
247- const open = new AMQPView ( new ArrayBuffer ( 512 ) )
248- open . setUint8 ( j , 1 ) ; j += 1 // type: method
249- open . setUint16 ( j , 0 ) ; j += 2 // channel: 0
250- open . setUint32 ( j , 0 ) ; j += 4 // frameSize: to be updated
251- open . setUint16 ( j , 10 ) ; j += 2 // class: connection
252- open . setUint16 ( j , 40 ) ; j += 2 // method: open
253- j += open . setShortString ( j , this . vhost ) // vhost
254- open . setUint8 ( j , 0 ) ; j += 1 // reserved1
255- open . setUint8 ( j , 0 ) ; j += 1 // reserved2
256- open . setUint8 ( j , 206 ) ; j += 1 // frame end byte
257- open . setUint32 ( 3 , j - 8 ) // update frameSize
258- this . send ( new Uint8Array ( open . buffer , 0 , j ) ) . catch ( this . rejectConnect )
248+ const open = new AMQPFrame ( {
249+ bufferSize : 512 ,
250+ type : 1 ,
251+ channel : 0 ,
252+ classId : 10 ,
253+ method : 40 ,
254+ } )
255+ open . writeShortString ( this . vhost ) // vhost
256+ open . writeUint8 ( 0 ) // reserved1
257+ open . writeUint8 ( 0 ) // reserved2
258+ open . finalize ( )
259+ this . send ( open . toUint8Array ( ) ) . catch ( this . rejectConnect )
259260
260261 break
261262 }
@@ -282,14 +283,16 @@ export abstract class AMQPBaseClient {
282283 this . channels . forEach ( ( ch ) => ch . setClosed ( err ) )
283284 this . channels = [ new AMQPChannel ( this , 0 ) ]
284285
285- const closeOk = new AMQPView ( new ArrayBuffer ( 12 ) )
286- closeOk . setUint8 ( j , 1 ) ; j += 1 // type: method
287- closeOk . setUint16 ( j , 0 ) ; j += 2 // channel: 0
288- closeOk . setUint32 ( j , 4 ) ; j += 4 // frameSize
289- closeOk . setUint16 ( j , 10 ) ; j += 2 // class: connection
290- closeOk . setUint16 ( j , 51 ) ; j += 2 // method: closeok
291- closeOk . setUint8 ( j , 206 ) ; j += 1 // frame end byte
292- this . send ( new Uint8Array ( closeOk . buffer , 0 , j ) )
286+ const closeOk = new AMQPFrame ( {
287+ bufferSize : 12 ,
288+ type : 1 ,
289+ channel : 0 ,
290+ frameSize : 4 ,
291+ classId : 10 ,
292+ method : 51 ,
293+ } )
294+ closeOk . finalize ( )
295+ this . send ( closeOk . toUint8Array ( ) )
293296 . catch ( err => this . logger ?. warn ( "Error while sending Connection#CloseOk" , err ) )
294297 this . onerror ( err )
295298 this . rejectConnect ( err )
@@ -355,14 +358,16 @@ export abstract class AMQPBaseClient {
355358 channel . setClosed ( err )
356359 delete this . channels [ channelId ]
357360
358- const closeOk = new AMQPView ( new ArrayBuffer ( 12 ) )
359- closeOk . setUint8 ( j , 1 ) ; j += 1 // type: method
360- closeOk . setUint16 ( j , channelId ) ; j += 2 // channel
361- closeOk . setUint32 ( j , 4 ) ; j += 4 // frameSize
362- closeOk . setUint16 ( j , 20 ) ; j += 2 // class: channel
363- closeOk . setUint16 ( j , 41 ) ; j += 2 // method: closeok
364- closeOk . setUint8 ( j , 206 ) ; j += 1 // frame end byte
365- this . send ( new Uint8Array ( closeOk . buffer , 0 , j ) )
361+ const closeOk = new AMQPFrame ( {
362+ bufferSize : 12 ,
363+ type : 1 ,
364+ channel : channelId ,
365+ frameSize : 4 ,
366+ classId : 20 ,
367+ method : 41 ,
368+ } )
369+ closeOk . finalize ( )
370+ this . send ( closeOk . toUint8Array ( ) )
366371 . catch ( err => this . logger ?. error ( "Error while sending Channel#closeOk" , err ) )
367372 break
368373 }
@@ -447,16 +452,17 @@ export abstract class AMQPBaseClient {
447452 channel . consumers . delete ( consumerTag )
448453 }
449454 if ( ! noWait ) {
450- const frame = new AMQPView ( new ArrayBuffer ( 512 ) )
451- frame . setUint8 ( j , 1 ) ; j += 1 // type: method
452- frame . setUint16 ( j , channel . id ) ; j += 2 // channel
453- frame . setUint32 ( j , 0 ) ; j += 4 // frameSize
454- frame . setUint16 ( j , 60 ) ; j += 2 // class: basic
455- frame . setUint16 ( j , 31 ) ; j += 2 // method: cancelOk
456- j += frame . setShortString ( j , consumerTag ) // tag
457- frame . setUint8 ( j , 206 ) ; j += 1 // frame end byte
458- frame . setUint32 ( 3 , j - 8 ) // update frameSize
459- this . send ( new Uint8Array ( frame . buffer , 0 , j ) )
455+ const frame = new AMQPFrame ( {
456+ bufferSize : 512 ,
457+ type : 1 ,
458+ channel : channel . id ,
459+ classId : 60 ,
460+ method : 31 ,
461+ } )
462+
463+ frame . writeShortString ( consumerTag ) // tag
464+ frame . finalize ( )
465+ this . send ( frame . toUint8Array ( ) )
460466 }
461467 break
462468 }
0 commit comments