Skip to content

Commit 39cdbba

Browse files
committed
feat: add ondisconnect hook and reconnect-safe connect() to transport clients
1 parent 3782a00 commit 39cdbba

5 files changed

Lines changed: 58 additions & 16 deletions

File tree

src/amqp-base-client.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { AMQPView } from "./amqp-view.js"
66
import type { Logger } from "./types.js"
77

88
export const VERSION = "3.4.1"
9-
export const MIN_FRAME_SIZE = 4096 // 8192
109

1110
/**
1211
* Base class for AMQPClients.
@@ -34,6 +33,12 @@ export abstract class AMQPBaseClient {
3433
// Buffer pool for publishes, let multiple microtasks publish at the same time but save on allocations
3534
readonly bufferPool: AMQPView[] = []
3635

36+
/**
37+
* Callback when connection is lost
38+
* @param error - The error that caused the disconnection, if any
39+
*/
40+
ondisconnect?: (error?: Error) => void
41+
3742
/**
3843
* @param name - name of the connection, set in client properties
3944
* @param platform - used in client properties
@@ -45,7 +50,7 @@ export abstract class AMQPBaseClient {
4550
password: string,
4651
name?: string,
4752
platform?: string,
48-
frameMax = MIN_FRAME_SIZE,
53+
frameMax = 8192,
4954
heartbeat = 0,
5055
channelMax = 0,
5156
logger?: Logger | null,
@@ -61,8 +66,10 @@ export abstract class AMQPBaseClient {
6166
if (platform) this.platform = platform
6267
this.logger = logger || undefined
6368
this.channels = [new AMQPChannel(this, 0)]
64-
this.onerror = (error: AMQPError) => this.logger?.error("amqp-client connection closed", error.message)
65-
if (frameMax < MIN_FRAME_SIZE) throw new Error(`frameMax must be ${MIN_FRAME_SIZE} or larger`)
69+
this.onerror = (error: AMQPError) => {
70+
this.logger?.error("amqp-client connection closed", error.message)
71+
}
72+
if (frameMax < 8192) throw new Error("frameMax must be 8192 or larger")
6673
this.frameMax = frameMax
6774
if (heartbeat < 0) throw new Error("heartbeat must be positive")
6875
this.heartbeat = heartbeat
@@ -93,12 +100,13 @@ export abstract class AMQPBaseClient {
93100
}
94101

95102
/**
96-
* Gracefully close the AMQP connection
103+
* Gracefully close the AMQP connection.
97104
* @param [reason] might be logged by the server
98105
*/
99106
close(reason = "", code = 200): Promise<void> {
100107
if (this.closed) return this.rejectClosed()
101108
this.closed = true
109+
102110
const frame = new AMQPFrame.Writer({
103111
bufferSize: 512,
104112
type: AMQPFrame.Type.METHOD,
@@ -322,6 +330,7 @@ export abstract class AMQPBaseClient {
322330
this.logger?.warn("Error while sending Connection#CloseOk", err),
323331
)
324332
this.onerror(err)
333+
this.ondisconnect?.(err)
325334
this.rejectConnect(err)
326335
this.onUpdateSecretOk?.()
327336
break

src/amqp-consumer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export class AMQPConsumer {
4949
* Note that any unacked messages are still unacked as they belong to the channel and not the consumer.
5050
*/
5151
cancel() {
52+
if (this.channel.closed) return Promise.resolve(this.channel)
5253
return this.channel.basicCancel(this.tag)
5354
}
5455

src/amqp-socket-client.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { AMQPBaseClient, MIN_FRAME_SIZE } from "./amqp-base-client.js"
1+
import { AMQPBaseClient } from "./amqp-base-client.js"
2+
import { AMQPChannel } from "./amqp-channel.js"
23
import { AMQPError } from "./amqp-error.js"
34
import type { AMQPTlsOptions } from "./amqp-tls-options.js"
45
import type { Logger } from "./types.js"
@@ -32,7 +33,7 @@ export class AMQPClient extends AMQPBaseClient {
3233
const username = decodeURIComponent(u.username) || "guest"
3334
const password = decodeURIComponent(u.password) || "guest"
3435
const name = u.searchParams.get("name") || ""
35-
const frameMax = parseInt(u.searchParams.get("frameMax") || MIN_FRAME_SIZE.toString())
36+
const frameMax = parseInt(u.searchParams.get("frameMax") || "8192")
3637
const heartbeat = parseInt(u.searchParams.get("heartbeat") || "0")
3738
const channelMax = parseInt(u.searchParams.get("channelMax") || "0")
3839
const platform = `${process.release.name} ${process.version} ${process.platform} ${process.arch}`
@@ -51,10 +52,18 @@ export class AMQPClient extends AMQPBaseClient {
5152
}
5253

5354
override connect(): Promise<AMQPBaseClient> {
55+
this.framePos = 0
56+
this.frameSize = 0
57+
this.channels = [new AMQPChannel(this, 0)]
58+
5459
let rejectConnection: (reason: Error) => void
60+
let socketError: AMQPError | undefined
5561
const socket = this.connectSocket()
5662
socket.on("connect", () => {
57-
socket.on("error", (err) => this.onerror(new AMQPError(err.message, this)))
63+
socket.on("error", (err) => {
64+
socketError = new AMQPError(err.message, this)
65+
this.onerror(socketError)
66+
})
5867
socket.on("end", () => {
5968
if (rejectConnection) {
6069
rejectConnection(new AMQPError("Connection ended", this))
@@ -63,7 +72,20 @@ export class AMQPClient extends AMQPBaseClient {
6372
socket.on("close", (hadError: boolean) => {
6473
const clientClosed = this.closed
6574
this.closed = true
66-
if (!hadError && !clientClosed) this.onerror(new AMQPError("Socket closed", this))
75+
const closeError = hadError ? socketError : clientClosed ? undefined : new AMQPError("Socket closed", this)
76+
if (!clientClosed) {
77+
const err = closeError ?? new AMQPError("Socket closed", this)
78+
this.channels.forEach((ch) => ch?.setClosed(err))
79+
this.channels = [new AMQPChannel(this, 0)]
80+
}
81+
if (closeError && !hadError) {
82+
this.onerror(closeError)
83+
}
84+
if (!clientClosed) {
85+
this.ondisconnect?.(closeError)
86+
}
87+
this.socket = undefined
88+
socketError = undefined
6789
})
6890
})
6991
Object.defineProperty(this, "socket", {

src/amqp-websocket-client.ts

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { AMQPBaseClient, VERSION, MIN_FRAME_SIZE } from "./amqp-base-client.js"
1+
import { AMQPBaseClient, VERSION } from "./amqp-base-client.js"
22
import { AMQPView } from "./amqp-view.js"
33
import { AMQPError } from "./amqp-error.js"
44
import { AMQPChannel } from "./amqp-channel.js"
@@ -19,7 +19,7 @@ interface AMQPWebSocketInit {
1919
}
2020

2121
/**
22-
* WebSocket client for AMQP 0-9-1 servers
22+
* WebSocket client for AMQP 0-9-1 servers.
2323
*/
2424
export class AMQPWebSocketClient extends AMQPBaseClient {
2525
readonly url: string
@@ -49,7 +49,7 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
4949
username = "guest",
5050
password = "guest",
5151
name?: string,
52-
frameMax = MIN_FRAME_SIZE,
52+
frameMax = 8192,
5353
heartbeat = 0,
5454
logger?: Logger | null,
5555
) {
@@ -72,6 +72,10 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
7272
* Establish a AMQP connection over WebSocket
7373
*/
7474
override connect(): Promise<AMQPBaseClient> {
75+
this.framePos = 0
76+
this.frameSize = 0
77+
this.channels = [new AMQPChannel(this, 0)]
78+
7579
const socket = new WebSocket(this.url)
7680
this.socket = socket
7781
socket.binaryType = "arraybuffer"
@@ -85,25 +89,29 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
8589
if (!this.closed) {
8690
const err = new AMQPError(ev.toString(), this)
8791
this.closed = true
88-
// Close all channels and their consumers when there's an error
8992
this.channels.forEach((ch) => ch?.setClosed(err))
9093
this.channels = [new AMQPChannel(this, 0)]
9194
this.onerror(err)
95+
this.ondisconnect?.(err)
96+
this.socket = undefined
9297
}
9398
})
9499
socket.addEventListener("close", (ev: CloseEvent) => {
95100
const clientClosed = this.closed
96101
this.closed = true
97102
if (!(ev.wasClean && clientClosed)) {
98103
const err = new AMQPError(`connection not cleanly closed (${ev.code})`, this)
99-
// Close all channels and their consumers when connection is lost
100104
this.channels.forEach((ch) => ch?.setClosed(err))
101105
this.channels = [new AMQPChannel(this, 0)]
102106
this.onerror(err)
107+
if (!clientClosed) {
108+
this.ondisconnect?.(err)
109+
}
103110
} else {
104111
this.channels.forEach((ch) => ch?.setClosed())
105112
this.channels = [new AMQPChannel(this, 0)]
106113
}
114+
this.socket = undefined
107115
})
108116
socket.send(new Uint8Array([65, 77, 81, 80, 0, 0, 9, 1]))
109117
})
@@ -194,3 +202,5 @@ export class AMQPWebSocketClient extends AMQPBaseClient {
194202
}
195203

196204
export { AMQPBaseClient, AMQPChannel, AMQPConsumer, AMQPError, AMQPMessage, AMQPQueue, AMQPView, VERSION }
205+
export { AMQPSession } from "./amqp-session.js"
206+
export type { AMQPSessionOptions } from "./amqp-session.js"

test-browser/websocket.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ test("onerror is not called when conn is closed by client", async () => {
705705
})
706706

707707
test("will throw on too large headers", async () => {
708-
const amqp = getNewClient()
708+
const amqp = getNewClient({ frameMax: 8192 })
709709
const conn = await amqp.connect()
710710
const ch = await conn.channel()
711711
await expect(
@@ -717,7 +717,7 @@ test("will throw on too large headers", async () => {
717717
})
718718

719719
test("will split body over multiple frames", async () => {
720-
const amqp = getNewClient()
720+
const amqp = getNewClient({ frameMax: 8192 })
721721
const conn = await amqp.connect()
722722
const ch = await conn.channel()
723723
const q = await ch.queue("")

0 commit comments

Comments
 (0)