Skip to content

Commit f2ba1df

Browse files
committed
Support negotiating channel max
1 parent 500e6b2 commit f2ba1df

3 files changed

Lines changed: 15 additions & 4 deletions

File tree

src/amqp-base-client.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export abstract class AMQPBaseClient {
3636
* @param name - name of the connection, set in client properties
3737
* @param platform - used in client properties
3838
*/
39-
constructor(vhost: string, username: string, password: string, name?: string, platform?: string, frameMax = 4096, heartbeat = 0) {
39+
constructor(vhost: string, username: string, password: string, name?: string, platform?: string, frameMax = 4096, heartbeat = 0, channelMax = 0) {
4040
this.vhost = vhost
4141
this.username = username
4242
this.password = ""
@@ -52,6 +52,8 @@ export abstract class AMQPBaseClient {
5252
this.frameMax = frameMax
5353
if (heartbeat < 0) throw new Error("heartbeat must be positive")
5454
this.heartbeat = heartbeat
55+
if (channelMax && channelMax < 0) throw new Error("channelMax must be positive")
56+
this.channelMax = channelMax
5557
}
5658

5759
/**
@@ -222,7 +224,7 @@ export abstract class AMQPBaseClient {
222224
const channelMax = view.getUint16(i); i += 2
223225
const frameMax = view.getUint32(i); i += 4
224226
const heartbeat = view.getUint16(i); i += 2
225-
this.channelMax = channelMax
227+
this.channelMax = this.channelMax === 0 ? channelMax : Math.min(this.channelMax, channelMax)
226228
this.frameMax = this.frameMax === 0 ? frameMax : Math.min(this.frameMax, frameMax)
227229
this.heartbeat = this.heartbeat === 0 ? 0 : Math.min(this.heartbeat, heartbeat)
228230

src/amqp-socket-client.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ export class AMQPClient extends AMQPBaseClient {
3131
const name = u.searchParams.get("name") || ""
3232
const frameMax = parseInt(u.searchParams.get("frameMax") || "4096")
3333
const heartbeat = parseInt(u.searchParams.get("heartbeat") || "0")
34+
const channelMax = parseInt(u.searchParams.get("channelMax") || "0")
3435
const platform = `${process.release.name} ${process.version} ${process.platform} ${process.arch}`
35-
super(vhost, username, password, name, platform, frameMax, heartbeat)
36+
super(vhost, username, password, name, platform, frameMax, heartbeat, channelMax)
3637
this.tls = u.protocol === "amqps:"
3738
this.tlsOptions = tlsOptions
3839
this.host = u.hostname || "localhost"

test/test.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ import { AMQPClient } from '../src/amqp-socket-client.js';
33
import { AMQPMessage } from '../src/amqp-message.js';
44
import type { AMQPError } from "../src/amqp-error.js";
55

6-
function getNewClient(init?: {frameMax?: number, heartbeat?: number}): AMQPClient {
6+
function getNewClient(init?: { frameMax?: number, heartbeat?: number, channelMax?: number }): AMQPClient {
77
const url = new URL("amqp://127.0.0.1")
88
if (init?.frameMax != null) url.searchParams.append("frameMax", init.frameMax.toString())
99
if (init?.heartbeat != null) url.searchParams.append("heartbeat", init.heartbeat.toString())
10+
if (init?.channelMax != null) url.searchParams.append("channelMax", init.channelMax.toString())
1011

1112
return new AMQPClient(url.toString())
1213
}
@@ -668,6 +669,13 @@ test("raises when channelMax is reached", async () => {
668669
await expect(ch1.basicQos(10)).resolves.toBeUndefined()
669670
}, 10_000)
670671

672+
test("client can negotiate channelMax", async () => {
673+
const amqp = getNewClient({ channelMax: 1 })
674+
const conn = await amqp.connect()
675+
await conn.channel()
676+
await expect(conn.channel()).rejects.toThrow('Max number of channels reached');
677+
}, 10_000)
678+
671679
test('can update-secret', async () => {
672680
const amqp = getNewClient()
673681
const conn = await amqp.connect()

0 commit comments

Comments
 (0)