Skip to content

Commit 4665289

Browse files
authored
Correctly handle split frame headers for WebSocket handleMessage (#68)
This resolves the failing tests in the browser around: can handle frames split over socket reads can republish in consume block without race condition It appears the math from the socket implementation for handling partial headers in handleMessage wasn't copied over fully for the WebSocket implementation.
1 parent 08c1aa6 commit 4665289

3 files changed

Lines changed: 6 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
- Pass the correct array buffer to dataview when reading framesize (related to [#55](https://github.com/cloudamqp/amqp-client.js/issues/55))
1414
- Raise `AMQPError` when `channelMax` is reached (related to [#43](https://github.com/cloudamqp/amqp-client.js/issues/43))
1515
- Add `Channel#onerror` callback (related to [#40](https://github.com/cloudamqp/amqp-client.js/issues/40))
16+
- Correctly handle frame headers split across reads in the WebSocket client. (related to [#55](https://github.com/cloudamqp/amqp-client.js/issues/55))
1617

1718
### Changed
1819

src/amqp-websocket-client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
9797
if (this.frameSize === 0) {
9898
// first 7 bytes of a frame was split over two reads, this reads the second part
9999
if (this.framePos !== 0) {
100-
const len = buf.byteLength - bufPos
101-
this.frameBuffer.set(new Uint8Array(buf, bufPos), this.framePos)
100+
const len = 7 - this.framePos
101+
this.frameBuffer.set(new Uint8Array(buf, bufPos, len), this.framePos)
102102
this.frameSize = new DataView(this.frameBuffer.buffer).getInt32(bufPos + 3) + 8
103103
this.framePos += len
104104
bufPos += len

test-browser/websocket.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -538,8 +538,7 @@ test("can't set too small frameMax", () => {
538538
expect(() => getNewClient({ frameMax: 16 })).toThrow()
539539
})
540540

541-
// TODO: throws unhandled exception, stopping the rest of the test
542-
test.skip("can handle frames split over socket reads", async () => {
541+
test("can handle frames split over socket reads", async () => {
543542
const amqp = getNewClient({ frameMax: 4*1024 })
544543
const conn = await amqp.connect()
545544
const ch = await conn.channel()
@@ -553,7 +552,7 @@ test.skip("can handle frames split over socket reads", async () => {
553552
const consumer = await q.subscribe({ noAck: true }, () => { if (++i === msgs) consumer.cancel() })
554553
await consumer.wait(20_000)
555554
expect(i).toEqual(msgs)
556-
}, 40_000)
555+
}, 60_000)
557556

558557
test("have to connect socket before opening channels", async () => {
559558
const amqp = getNewClient()
@@ -635,8 +634,7 @@ test("will split body over multiple frames", async () => {
635634
assert.fail("no msg")
636635
})
637636

638-
// TODO: fails intermittently, throws unhandled exception, stopping the rest of the test
639-
test.skip("can republish in consume block without race condition", async () => {
637+
test("can republish in consume block without race condition", async () => {
640638
const amqp = getNewClient()
641639
const conn = await amqp.connect()
642640
const ch = await conn.channel()

0 commit comments

Comments
 (0)