-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathamqp-consumer.ts
More file actions
147 lines (135 loc) · 4.59 KB
/
amqp-consumer.ts
File metadata and controls
147 lines (135 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import { AMQPError } from "./amqp-error.js"
import type { AMQPChannel } from "./amqp-channel.js"
import type { AMQPMessage } from "./amqp-message.js"
/**
* A consumer, subscribed to a queue
*/
export class AMQPConsumer {
/** Channel this consumer is attached to. */
readonly channel: AMQPChannel
/** Server-assigned consumer tag. */
readonly tag: string
/** Callback invoked for each delivered message. */
readonly onMessage: (msg: AMQPMessage) => void | Promise<void>
protected closed = false
protected closedError?: Error
private resolveWait?: (value: void) => void
private rejectWait?: (err: Error) => void
private timeoutId?: ReturnType<typeof setTimeout>
/**
* @param channel - the consumer is created on
* @param tag - consumer tag
* @param onMessage - callback executed when a message arrive
*/
constructor(channel: AMQPChannel, tag: string, onMessage: (msg: AMQPMessage) => void | Promise<void>) {
this.channel = channel
this.tag = tag
this.onMessage = onMessage
}
/**
* Wait for the consumer to finish.
* @param [timeout] wait for this many milliseconds and then return regardless
* @return Fulfilled when the consumer/channel/connection is closed by the client. Rejected if the timeout is hit.
*/
wait(timeout?: number): Promise<void> {
if (this.closedError) return Promise.reject(this.closedError)
if (this.closed) return Promise.resolve()
return new Promise((resolve, reject) => {
this.resolveWait = resolve
this.rejectWait = reject
if (timeout) {
const onTimeout = () => reject(new AMQPError("Timeout", this.channel.connection))
this.timeoutId = setTimeout(onTimeout, timeout)
}
})
}
/**
* Cancel/abort/stop the consumer. No more messages will be deliviered to the consumer.
* Note that any unacked messages are still unacked as they belong to the channel and not the consumer.
* Safe to call multiple times — concurrent calls share the same underlying wire operation.
*/
cancel() {
if (this.channel.closed) return Promise.resolve(this.channel)
if (!this.cancelPromise) {
this.cancelPromise = this.channel.basicCancel(this.tag)
}
return this.cancelPromise
}
private cancelPromise?: Promise<AMQPChannel>
/**
* @ignore
* @param [err] - why the consumer was closed
*/
setClosed(err?: Error): void {
this.closed = true
if (err) this.closedError = err
if (this.timeoutId) clearTimeout(this.timeoutId)
if (err) {
if (this.rejectWait) this.rejectWait(err)
} else {
if (this.resolveWait) this.resolveWait()
}
}
}
export class AMQPGeneratorConsumer extends AMQPConsumer {
private messageQueue: AMQPMessage[] = []
private messageResolver: ((msg: AMQPMessage) => void) | null = null
private generator?: AsyncGenerator<AMQPMessage, void, undefined>
constructor(channel: AMQPChannel, tag: string) {
super(channel, tag, (msg: AMQPMessage) => {
// Feed messages to the generator queue
if (this.messageResolver) {
this.messageResolver(msg)
this.messageResolver = null
} else if (this.messageQueue) {
this.messageQueue.push(msg)
}
})
}
/**
* Get an AsyncGenerator for consuming messages.
* @return An AsyncGenerator that yields messages
*/
get messages(): AsyncGenerator<AMQPMessage, void, undefined> {
if (this.generator) {
return this.generator
}
this.generator = this.generateMessages()
return this.generator
}
private async *generateMessages(): AsyncGenerator<AMQPMessage, void, undefined> {
try {
while (!this.closedError && !this.closed) {
if (this.messageQueue.length > 0) {
const msg = this.messageQueue.shift()!
yield msg
} else {
const msg = await new Promise<AMQPMessage>((resolve) => {
this.messageResolver = resolve
})
if (this.closedError || this.closed) break
yield msg
}
}
if (this.closedError) throw this.closedError
} finally {
// Clean up: cancel consumer when generator is done
try {
await this.cancel()
} catch {
// Ignore errors during cleanup
}
}
}
override setClosed(err?: Error): void {
super.setClosed(err)
// Wake up the generator if it's waiting
if (this.messageResolver) {
const resolver = this.messageResolver
this.messageResolver = null
// Resolve the promise with a sentinel value
// The generator will check closedError/closed immediately and break without yielding
resolver(undefined as unknown as AMQPMessage)
}
}
}