Skip to content

Commit 3ad05ad

Browse files
committed
Use a pool of buffers when publishing
Before when a single buffer was shared multiple simulations calls to AMQPChannel#basicPublish could corrupt the buffer. Fixes #36 Downside with this fix is that if many microtasks run simulantious many buffers will be created, but never cleaned up, so there's a risk of memory leak. Upside compared to allocating a new buffer for each call to basicPublish is that this way is a lot faster (about 50%).
1 parent fbb4d49 commit 3ad05ad

3 files changed

Lines changed: 59 additions & 7 deletions

File tree

src/amqp-base-client.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ export abstract class AMQPBaseClient {
2424
frameMax: number
2525
heartbeat: number
2626
onerror: (error: AMQPError) => void
27+
/** Used for string -> arraybuffer when publishing */
28+
readonly textEncoder = new TextEncoder()
29+
// Buffer pool for publishes, let multiple microtasks publish at the same time but save on allocations
30+
readonly bufferPool: AMQPView[] = []
2731

2832
/**
2933
* @param name - name of the connection, set in client properties

src/amqp-channel.ts

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@ export class AMQPChannel {
1616
readonly promises: [(value?: any) => void, (err?: Error) => void][] = []
1717
private readonly unconfirmedPublishes: [number, (confirmId: number) => void, (err?: Error) => void][] = []
1818
closed = false
19-
/** Used for string -> arraybuffer when publishing */
20-
private static textEncoder = new TextEncoder()
21-
/** Frame buffer, reuse when publishes to avoid repated allocations */
22-
private readonly buffer: AMQPView
2319
confirmId = 0
2420
delivery?: AMQPMessage
2521
getMessage?: AMQPMessage
@@ -31,7 +27,6 @@ export class AMQPChannel {
3127
constructor(connection: AMQPBaseClient, id: number) {
3228
this.connection = connection
3329
this.id = id
34-
this.buffer = new AMQPView(new ArrayBuffer(connection.frameMax))
3530
}
3631

3732
/**
@@ -288,13 +283,14 @@ export class AMQPChannel {
288283
} else if (data === null) {
289284
body = new Uint8Array(0)
290285
} else if (typeof data === "string") {
291-
body = AMQPChannel.textEncoder.encode(data)
286+
body = this.connection.textEncoder.encode(data)
292287
} else {
293288
throw new TypeError(`Invalid type ${typeof data} for parameter data`)
294289
}
295290

296291
let j = 0
297-
const buffer = this.buffer
292+
// get a buffer from the pool or create a new, it will later be returned to the pool for reuse
293+
const buffer = this.connection.bufferPool.pop() || new AMQPView(new ArrayBuffer(this.connection.frameMax))
298294
buffer.setUint8(j, 1); j += 1 // type: method
299295
buffer.setUint16(j, this.id); j += 2 // channel
300296
j += 4 // frame size, update later
@@ -346,6 +342,7 @@ export class AMQPChannel {
346342
bodyPos += frameSize
347343
j = 0
348344
}
345+
this.connection.bufferPool.push(buffer) // return buffer to buffer pool for later reuse
349346
// if publish confirm is enabled, put a promise on a queue if the sends were ok
350347
// the promise on the queue will be fullfilled by the read loop when an ack/nack
351348
// comes from the server

test/test.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,3 +613,54 @@ test("onerror is not called when conn is closed by client", async t => {
613613
await new Promise((resolv) => setTimeout(resolv, 10))
614614
t.pass()
615615
})
616+
617+
test("will throw on too large headers", async t => {
618+
const amqp = new AMQPClient("amqp://127.0.0.1?frameMax=4096")
619+
const conn = await amqp.connect()
620+
const ch = await conn.channel()
621+
await t.throwsAsync(() => ch.basicPublish("", "x".repeat(255), null, {"headers": {a: Array(4000).fill(1)}}),
622+
{ instanceOf: RangeError })
623+
await t.throwsAsync(() => ch.basicPublish("", "", null, {"headers": {a: "x".repeat(5000)}}),
624+
{ instanceOf: RangeError })
625+
})
626+
627+
test("will split body over multiple frames", async t => {
628+
const amqp = new AMQPClient("amqp://127.0.0.1?frameMax=4096")
629+
const conn = await amqp.connect()
630+
const ch = await conn.channel()
631+
const q = await ch.queue("")
632+
await ch.confirmSelect()
633+
await q.publish("x".repeat(5000))
634+
const msg = await q.get()
635+
if (msg)
636+
if (msg.body)
637+
t.is(msg.body.length, 5000)
638+
else
639+
t.fail("no body")
640+
else
641+
t.fail("no msg")
642+
})
643+
644+
test("can republish in consume block without race condition", async t => {
645+
const amqp = new AMQPClient("amqp://127.0.0.1")
646+
const conn = await amqp.connect()
647+
const ch = await conn.channel()
648+
await ch.prefetch(0)
649+
const q = await ch.queue("")
650+
await ch.confirmSelect()
651+
await q.publish("x".repeat(500))
652+
const consumer = await q.subscribe({noAck: false}, async (msg) => {
653+
if (msg.deliveryTag < 10000) {
654+
await Promise.all([
655+
q.publish(msg.body),
656+
q.publish(msg.body),
657+
msg.ack()
658+
])
659+
} else if (msg.deliveryTag === 10000) {
660+
await consumer.cancel()
661+
}
662+
})
663+
await t.notThrowsAsync(() => consumer.wait())
664+
await t.notThrowsAsync(() => conn.close())
665+
console.log(conn.bufferPool.length)
666+
})

0 commit comments

Comments
 (0)