Skip to content

Commit ea7bcca

Browse files
Return publish frame buffer to pool after send (#142)
Fixes #137 --------- Co-authored-by: Anton Dalgren <antondalgren@users.noreply.github.com>
1 parent a4c2eb1 commit ea7bcca

2 files changed

Lines changed: 26 additions & 3 deletions

File tree

src/amqp-channel.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,15 +363,15 @@ export class AMQPChannel {
363363
}
364364
const sendFrames = this.connection.send(bufferView.subarray(0, j))
365365

366-
this.connection.bufferPool.push(buffer) // return buffer to buffer pool for later reuse
366+
// return buffer to buffer pool for later reuse
367367
// if publish confirm is enabled, put a promise on a queue if the sends were ok
368368
// the promise on the queue will be fullfilled by the read loop when an ack/nack
369369
// comes from the server
370370
if (this.confirmId) {
371371
const wait4Confirm = new Promise<number>((resolve, reject) => this.unconfirmedPublishes.push([this.confirmId++, resolve, reject]))
372-
return sendFrames.then(() => wait4Confirm)
372+
return sendFrames.then(() => wait4Confirm).finally(() => this.connection.bufferPool.push(buffer))
373373
} else {
374-
return sendFrames.then(() => 0)
374+
return sendFrames.then(() => 0).finally(() => this.connection.bufferPool.push(buffer))
375375
}
376376
}
377377

test/tls.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { expect, test, beforeEach } from "vitest";
22
import { AMQPClient } from '../src/amqp-socket-client.js';
3+
import { randomBytes } from 'crypto';
34

45
beforeEach(() => {
56
expect.hasAssertions()
@@ -11,3 +12,25 @@ test('can connect with TLS', () => {
1112
.then(conn => conn.channel())
1213
.then(ch => expect(ch.connection.channels.length).toBe(2)) // 2 because channel 0 is counted
1314
})
15+
16+
test('can batch send message', async () => {
17+
const messages = [
18+
randomBytes(500),
19+
randomBytes(500),
20+
randomBytes(500),
21+
randomBytes(500)
22+
]
23+
const amqp = new AMQPClient("amqps://localhost?insecure=1")
24+
const connection = await amqp.connect()
25+
const channel = await connection.channel()
26+
const queue = await channel.queue("bug137")
27+
await channel.confirmSelect();
28+
const sendMsgs = messages.map(message => queue.publish(JSON.stringify(message), {
29+
contentType: 'application/json',
30+
deliveryMode: 2,
31+
}))
32+
33+
expect(connection['bufferPool'].length).toBe(0)
34+
await Promise.all(sendMsgs)
35+
expect(connection['bufferPool'].length).toBe(4)
36+
})

0 commit comments

Comments
 (0)