Skip to content

Commit 6a4b0f0

Browse files
committed
onerror not called when gracefully closed client
1 parent 9246fe3 commit 6a4b0f0

3 files changed

Lines changed: 28 additions & 6 deletions

File tree

src/amqp-base-client.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export abstract class AMQPBaseClient {
1818
channels: AMQPChannel[]
1919
protected connectPromise?: [(conn: AMQPBaseClient) => void, (err: Error) => void]
2020
protected closePromise?: [(value?: void) => void, (err: Error) => void]
21-
closed = false
21+
closed = true
2222
blocked?: string
2323
channelMax = 0
2424
frameMax: number
@@ -40,7 +40,6 @@ export abstract class AMQPBaseClient {
4040
if (name) this.name = name // connection name
4141
if (platform) this.platform = platform
4242
this.channels = [new AMQPChannel(this, 0)]
43-
this.closed = true
4443
this.onerror = (error: AMQPError) => console.error("amqp-client connection closed", error.message)
4544
if (frameMax < 4096) throw new Error("frameMax must be 4096 or larger")
4645
this.frameMax = frameMax

src/amqp-socket-client.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ export class AMQPClient extends AMQPBaseClient {
4949
})
5050
return new Promise((resolve, reject) => {
5151
socket.on('error', (err) => reject(new AMQPError(err.message, this)))
52-
socket.on('connect', () => {
53-
socket.on('error', (err) => this.onerror(new AMQPError(err.message, this)))
54-
socket.on('close', (hadError: boolean) => { if (!hadError) this.onerror(new AMQPError("Socket closed", this)) })
55-
})
5652
this.connectPromise = [resolve, reject]
5753
})
5854
}
@@ -67,6 +63,12 @@ export class AMQPClient extends AMQPBaseClient {
6763
const sendStart = () => this.send(new Uint8Array([65, 77, 81, 80, 0, 0, 9, 1]))
6864
const conn = this.tls ? tls.connect(options, sendStart) : net.connect(options, sendStart)
6965
conn.on('data', this.onRead.bind(this))
66+
conn.on('connect', () => {
67+
conn.on('error', (err) => this.onerror(new AMQPError(err.message, this)))
68+
conn.on('close', (hadError: boolean) => {
69+
if (!hadError && !this.closed) this.onerror(new AMQPError("Socket closed", this))
70+
})
71+
})
7072
return conn
7173
}
7274

test/test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,3 +592,24 @@ test("can handle heartbeats", async t => {
592592
await wait
593593
t.is(conn.closed, false)
594594
})
595+
596+
test("has an onerror callback", async t => {
597+
const amqp = new AMQPClient("amqp://127.0.0.1")
598+
const conn = await amqp.connect()
599+
const ch = await conn.channel()
600+
conn.onerror = (err) => {
601+
t.regex(err.message, /invalid exchange type/)
602+
}
603+
await t.throwsAsync(ch.exchangeDeclare("none", "none"))
604+
})
605+
606+
test("onerror is not called when conn is closed by client", async t => {
607+
const amqp = new AMQPClient("amqp://127.0.0.1")
608+
const conn = await amqp.connect()
609+
conn.onerror = () => {
610+
t.fail("onerror should not be called when gracefully closed")
611+
}
612+
await conn.close()
613+
await new Promise((resolv) => setTimeout(resolv, 10))
614+
t.pass()
615+
})

0 commit comments

Comments
 (0)