Skip to content

Commit 38f1e1e

Browse files
committed
Increased code coverage
1 parent 20926a6 commit 38f1e1e

2 files changed

Lines changed: 41 additions & 1 deletion

File tree

src/amqp-socket-client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ export default class AMQPClient extends AMQPBaseClient {
126126
if (this.socket)
127127
this.socket.write(bytes, undefined, (err) => err ? reject(err) : resolve())
128128
else
129-
reject("Socket not connected")
129+
reject(new AMQPError("Socket not connected", this))
130130
})
131131
}
132132

test/test.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,18 @@ test('can cancel a consumer', t => {
147147
.then((channel) => t.is(channel.consumers.size, 0))
148148
})
149149

150+
test('will clear consumer wait timeout on cancel', async t => {
151+
const amqp = new AMQPClient("amqp://127.0.0.1")
152+
const conn = await amqp.connect()
153+
const ch = await conn.channel()
154+
const q = await ch.queue("")
155+
const consumer = await q.subscribe({noAck: false}, () => "")
156+
const wait = consumer.wait(1000);
157+
consumer.cancel()
158+
const ok = await wait
159+
t.is(ok, undefined)
160+
})
161+
150162
test('can close a channel', async t => {
151163
const amqp = new AMQPClient("amqp://127.0.0.1")
152164
const conn = await amqp.connect()
@@ -534,3 +546,31 @@ test("can set frameMax", async t => {
534546
test("can't set too small frameMax", t => {
535547
t.throws(() => new AMQPClient("amqp://127.0.0.1?frameMax=" + 16))
536548
})
549+
550+
test("can handle frames split over socket reads", async t => {
551+
const amqp = new AMQPClient("amqp://127.0.0.1?frameMax=" + 4*1024)
552+
const conn = await amqp.connect()
553+
const ch = await conn.channel()
554+
const q = await ch.queue("")
555+
const body = "a".repeat(5)
556+
const msgs = 100000
557+
for (let i = 0; i < msgs; i++) {
558+
await q.publish(body)
559+
}
560+
let i = 0
561+
const consumer = await q.subscribe({ noAck: true }, () => { if (++i === msgs) consumer.cancel() })
562+
await consumer.wait(5000)
563+
t.is(i, msgs)
564+
})
565+
566+
test("have to connect socket before opening channels", async t => {
567+
const amqp = new AMQPClient("amqp://127.0.0.1")
568+
await t.throwsAsync(() => amqp.channel(), { message: /not connected/ })
569+
})
570+
571+
test("will raise if socket is closed on send", async t => {
572+
const amqp = new AMQPClient("amqp://127.0.0.1")
573+
const conn = await amqp.connect()
574+
if (amqp.socket) amqp.socket.destroy()
575+
await t.throwsAsync(() => conn.channel())
576+
})

0 commit comments

Comments
 (0)