-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathamqp-subscription.ts
More file actions
122 lines (110 loc) · 3.51 KB
/
amqp-subscription.ts
File metadata and controls
122 lines (110 loc) · 3.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import { AMQPGeneratorConsumer } from "./amqp-consumer.js"
import type { AMQPChannel } from "./amqp-channel.js"
import type { AMQPConsumer } from "./amqp-consumer.js"
import type { AMQPMessage } from "./amqp-message.js"
import type { ConsumeParams } from "./amqp-channel.js"
/** @internal */
export interface ConsumerDefinition {
queueName: string
consumeParams: ConsumeParams
callback?: (msg: AMQPMessage) => void | Promise<void>
prefetch?: number
}
/**
* A persistent queue subscription returned by {@link AMQPQueue.subscribe}.
*
* Remains valid across reconnections — the underlying channel and consumer tag
* are swapped in-place after each reconnect. Use `cancel()` to unsubscribe and
* remove from auto-recovery.
*/
export class AMQPSubscription {
protected consumer: AMQPConsumer
readonly def: ConsumerDefinition
/** @internal */
onCancel?: () => void
/** @internal */
constructor(consumer: AMQPConsumer, def: ConsumerDefinition) {
this.consumer = consumer
this.def = def
}
/** The underlying channel. Reflects the most recent channel after a reconnect. */
get channel(): AMQPChannel {
return this.consumer.channel
}
/** The consumer tag. Reflects the most recent tag after a reconnect. */
get consumerTag(): string {
return this.consumer.tag
}
/**
* Cancel the subscription and remove it from session auto-recovery.
* Safe to call on a closed channel.
*/
async cancel(): Promise<void> {
this.onCancel?.()
await this.consumer.cancel()
}
/**
* Swap in a new underlying consumer after reconnect.
* @internal
*/
setConsumer(consumer: AMQPConsumer): void {
this.consumer = consumer
}
}
/**
* A persistent queue subscription that yields messages via an async iterator.
* Returned by {@link AMQPQueue.subscribe} when no callback is provided.
*
* Bridges across reconnections — the iterator continues yielding after each
* reconnect without the caller needing to re-subscribe.
*
* @example
* ```ts
* const sub = await session.subscribe("my-queue", { noAck: true })
* for await (const msg of sub) {
* console.log(msg.bodyString())
* }
* ```
*/
export class AMQPGeneratorSubscription extends AMQPSubscription implements AsyncIterable<AMQPMessage> {
private stopped = false
private consumerReady?: () => void
override setConsumer(consumer: AMQPConsumer): void {
super.setConsumer(consumer)
this.consumerReady?.()
delete this.consumerReady
}
override async cancel(): Promise<void> {
this.stopped = true
this.consumerReady?.()
delete this.consumerReady
await super.cancel()
}
async *[Symbol.asyncIterator](): AsyncGenerator<AMQPMessage, void, undefined> {
const autoAck = !this.def.consumeParams.noAck
let prev: AMQPMessage | undefined
while (!this.stopped) {
const consumer = this.consumer
if (!(consumer instanceof AMQPGeneratorConsumer)) {
throw new Error("Cannot iterate messages on a callback-based subscription")
}
try {
for await (const msg of consumer.messages) {
if (this.stopped) return
if (autoAck) await prev?.ack()
prev = msg
yield msg
}
} catch {
// Consumer's channel was closed — wait for reconnect to provide a new consumer
}
// Reset on disconnect; unacked messages are requeued by the server when the channel closes
prev = undefined
if (!this.stopped) {
await new Promise<void>((resolve) => {
this.consumerReady = resolve
})
}
}
}
}