Skip to content

Commit f874bc1

Browse files
Increase min frameMax to 8KB (#134)
For compatibility with RabbitMQ 4.1 and large JWT tokens: rabbitmq/rabbitmq-server#13541 Co-authored-by: Patrik Ragnarsson <patrik@starkast.net>
1 parent cc8448a commit f874bc1

6 files changed

Lines changed: 20 additions & 16 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [Unreleased]
9+
10+
- Increase min `frameMax` to 8192 (8KB) for compatibility with RabbitMQ 4.1 and large JWT tokens ([#134](https://github.com/cloudamqp/amqp-client.js/pull/134))
11+
812
## [3.2.0] - 2025-03-07
913

1014
- Buffer all publish frames into a single huge buffer and send together

src/amqp-base-client.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export abstract class AMQPBaseClient {
3636
* @param name - name of the connection, set in client properties
3737
* @param platform - used in client properties
3838
*/
39-
constructor(vhost: string, username: string, password: string, name?: string, platform?: string, frameMax = 4096, heartbeat = 0, channelMax = 0) {
39+
constructor(vhost: string, username: string, password: string, name?: string, platform?: string, frameMax = 8192, heartbeat = 0, channelMax = 0) {
4040
this.vhost = vhost
4141
this.username = username
4242
this.password = ""
@@ -48,7 +48,7 @@ export abstract class AMQPBaseClient {
4848
if (platform) this.platform = platform
4949
this.channels = [new AMQPChannel(this, 0)]
5050
this.onerror = (error: AMQPError) => this.logger?.error("amqp-client connection closed", error.message)
51-
if (frameMax < 4096) throw new Error("frameMax must be 4096 or larger")
51+
if (frameMax < 8192) throw new Error("frameMax must be 8192 or larger")
5252
this.frameMax = frameMax
5353
if (heartbeat < 0) throw new Error("heartbeat must be positive")
5454
this.heartbeat = heartbeat
@@ -107,7 +107,7 @@ export abstract class AMQPBaseClient {
107107

108108
updateSecret(newSecret: string, reason: string) {
109109
let j = 0
110-
const frame = new AMQPView(new ArrayBuffer(4096))
110+
const frame = new AMQPView(new ArrayBuffer(8192))
111111
frame.setUint8(j, 1); j += 1 // type: method
112112
frame.setUint16(j, 0); j += 2 // channel: 0
113113
frame.setUint32(j, 0); j += 4 // frameSize
@@ -189,7 +189,7 @@ export abstract class AMQPBaseClient {
189189
// ignore start frame, just reply startok
190190
i += frameSize - 4
191191

192-
const startOk = new AMQPView(new ArrayBuffer(4096))
192+
const startOk = new AMQPView(new ArrayBuffer(8192))
193193
startOk.setUint8(j, 1); j += 1 // type: method
194194
startOk.setUint16(j, 0); j += 2 // channel: 0
195195
startOk.setUint32(j, 0); j += 4 // frameSize: to be updated

src/amqp-socket-client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export class AMQPClient extends AMQPBaseClient {
2929
const username = decodeURIComponent(u.username) || "guest"
3030
const password = decodeURIComponent(u.password) || "guest"
3131
const name = u.searchParams.get("name") || ""
32-
const frameMax = parseInt(u.searchParams.get("frameMax") || "4096")
32+
const frameMax = parseInt(u.searchParams.get("frameMax") || "8192")
3333
const heartbeat = parseInt(u.searchParams.get("heartbeat") || "0")
3434
const channelMax = parseInt(u.searchParams.get("channelMax") || "0")
3535
const platform = `${process.release.name} ${process.version} ${process.platform} ${process.arch}`

src/amqp-websocket-client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
2727
*/
2828
constructor(url: string, vhost?: string, username?: string, password?: string, name?: string, frameMax?: number, heartbeat?: number);
2929
constructor(init: AMQPWebSocketInit);
30-
constructor(url: string | AMQPWebSocketInit, vhost = "/", username = "guest", password = "guest", name?: string, frameMax = 4096, heartbeat = 0) {
30+
constructor(url: string | AMQPWebSocketInit, vhost = "/", username = "guest", password = "guest", name?: string, frameMax = 8192, heartbeat = 0) {
3131
if (typeof url === 'object') {
3232
vhost = url.vhost ?? vhost
3333
username = url.username ?? username

test-browser/websocket.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ test('will throw when headers are too long', async () => {
378378
const conn = await amqp.connect()
379379
const ch = await conn.channel()
380380
const q = await ch.queue()
381-
await expect(q.publish("a".repeat(8000), { headers: { long: "a".repeat(5000) } })).rejects.toThrow()
381+
await expect(q.publish("a".repeat(8000), { headers: { long: "a".repeat(9000) } })).rejects.toThrow()
382382
})
383383

384384
test('can purge a queue', async () => {
@@ -545,7 +545,7 @@ test("can't set too small frameMax", () => {
545545
})
546546

547547
test("can handle frames split over socket reads", async () => {
548-
const amqp = getNewClient({ frameMax: 4*1024 })
548+
const amqp = getNewClient({ frameMax: 8*1024 })
549549
const conn = await amqp.connect()
550550
const ch = await conn.channel()
551551
const q = await ch.queue("")
@@ -616,15 +616,15 @@ test("onerror is not called when conn is closed by client", async () => {
616616
})
617617

618618
test("will throw on too large headers", async () => {
619-
const amqp = getNewClient({ frameMax: 4096 })
619+
const amqp = getNewClient({ frameMax: 8192 })
620620
const conn = await amqp.connect()
621621
const ch = await conn.channel()
622622
await expect(ch.basicPublish("", "x".repeat(255), null, {"headers": {a: Array(4000).fill(1)}})).rejects.toThrow(RangeError)
623-
await expect(ch.basicPublish("", "", null, {"headers": {a: "x".repeat(5000)}})).rejects.toThrow(RangeError)
623+
await expect(ch.basicPublish("", "", null, {"headers": {a: "x".repeat(9000)}})).rejects.toThrow(RangeError)
624624
})
625625

626626
test("will split body over multiple frames", async () => {
627-
const amqp = getNewClient({ frameMax: 4096 })
627+
const amqp = getNewClient({ frameMax: 8192 })
628628
const conn = await amqp.connect()
629629
const ch = await conn.channel()
630630
const q = await ch.queue("")

test/test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ test('will throw when headers are too long', async () => {
381381
const conn = await amqp.connect()
382382
const ch = await conn.channel()
383383
const q = await ch.queue()
384-
await expect(q.publish("a".repeat(8000), { headers: { long: "a".repeat(5000) } })).rejects.toThrow()
384+
await expect(q.publish("a".repeat(8000), { headers: { long: "a".repeat(9000) } })).rejects.toThrow()
385385
})
386386

387387
test('can purge a queue', async () => {
@@ -548,7 +548,7 @@ test("can't set too small frameMax", () => {
548548
})
549549

550550
test("can handle frames split over socket reads", async () => {
551-
const amqp = getNewClient({ frameMax: 4 * 1024 })
551+
const amqp = getNewClient({ frameMax: 8 * 1024 })
552552
const conn = await amqp.connect()
553553
const ch = await conn.channel()
554554
const q = await ch.queue("")
@@ -619,15 +619,15 @@ test("onerror is not called when conn is closed by client", async () => {
619619
})
620620

621621
test("will throw on too large headers", async () => {
622-
const amqp = getNewClient({ frameMax: 4096 })
622+
const amqp = getNewClient({ frameMax: 8192 })
623623
const conn = await amqp.connect()
624624
const ch = await conn.channel()
625625
await expect(ch.basicPublish("", "x".repeat(255), null, { "headers": { a: Array(4000).fill(1) } })).rejects.toThrow(RangeError)
626-
await expect(ch.basicPublish("", "", null, { "headers": { a: "x".repeat(5000) } })).rejects.toThrow(RangeError)
626+
await expect(ch.basicPublish("", "", null, { "headers": { a: "x".repeat(9000) } })).rejects.toThrow(RangeError)
627627
})
628628

629629
test("will split body over multiple frames", async () => {
630-
const amqp = getNewClient({ frameMax: 4096 })
630+
const amqp = getNewClient({ frameMax: 8192 })
631631
const conn = await amqp.connect()
632632
const ch = await conn.channel()
633633
const q = await ch.queue("")

0 commit comments

Comments
 (0)