-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathamqp-session.ts
More file actions
198 lines (151 loc) · 6.62 KB
/
amqp-session.ts
File metadata and controls
198 lines (151 loc) · 6.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/// <reference types="vite/client" />
import { expect, test, beforeEach } from "vitest"
import { AMQPSession } from "../src/amqp-session.js"
import { AMQPQueue } from "../src/amqp-queue.js"
import { AMQPExchange } from "../src/amqp-exchange.js"
import { AMQPMessage } from "../src/amqp-message.js"
const WS_URL = import.meta.env.VITE_WS_URL || "ws://127.0.0.1:15670/ws/amqp"
beforeEach(() => {
expect.hasAssertions()
})
test("AMQPSession.connect() returns a session over WebSocket", async () => {
const session = await AMQPSession.connect(WS_URL)
expect(session).toBeInstanceOf(AMQPSession)
await session.stop()
})
test("session.queue() declares a queue and returns AMQPQueue", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-sq-" + Math.random(), { durable: false, autoDelete: true })
expect(q).toBeInstanceOf(AMQPQueue)
expect(q.name).toMatch(/^test-ws-sq-/)
await session.stop()
})
test("AMQPQueue.publish() and get() round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-rtt-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("round-trip")
const msg = await q.get({ noAck: true })
expect(msg?.bodyString()).toBe("round-trip")
await session.stop()
})
test("AMQPQueue.subscribe() delivers messages via callback", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-sub-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("hello ws queue")
const received = await new Promise<string>((resolve) => {
q.subscribe({ noAck: true }, (msg) => resolve(msg.bodyString()!))
})
expect(received).toBe("hello ws queue")
await session.stop()
})
test("AMQPQueue.subscribe() nack", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-nack-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("nack me")
const msg = await new Promise<AMQPMessage>((resolve) => {
q.subscribe({ noAck: false }, (m) => {
m.nack()
resolve(m)
})
})
expect(msg.bodyString()).toBe("nack me")
await session.stop()
})
test("AMQPQueue.subscribe() async generator", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-gen-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("msg1")
await q.publish("msg2")
const received: string[] = []
const sub = await q.subscribe({ noAck: true })
for await (const msg of sub) {
received.push(msg.bodyString()!)
if (received.length >= 2) break
}
expect(received).toEqual(["msg1", "msg2"])
await session.stop()
})
test("AMQPQueue.publish({ confirm: false }) sends without waiting for confirm", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-paf-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("fire and forget", { confirm: false })
await new Promise((resolve) => setTimeout(resolve, 50))
const msg = await q.get({ noAck: true })
expect(msg?.bodyString()).toBe("fire and forget")
await session.stop()
})
test("AMQPQueue.bind() and unbind()", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-bind-" + Math.random(), { durable: false, autoDelete: true })
await expect(q.bind("amq.topic", "test.key")).resolves.toBeInstanceOf(AMQPQueue)
await expect(q.unbind("amq.topic", "test.key")).resolves.toBeInstanceOf(AMQPQueue)
await session.stop()
})
test("AMQPQueue.purge() empties the queue", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-purge-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("msg1", { confirm: false })
await q.publish("msg2", { confirm: false })
await new Promise((resolve) => setTimeout(resolve, 50))
const result = await q.purge()
expect(result.messageCount).toBe(2)
await session.stop()
})
test("AMQPQueue.delete() removes the queue", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-del-" + Math.random(), { durable: false, autoDelete: false })
await expect(q.delete()).resolves.toBeDefined()
await session.stop()
})
test("AMQPQueue.subscribe() with unconfirmed publish round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-sessub-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("via subscribe", { confirm: false })
const received = await new Promise<string>((resolve) => {
void q.subscribe({ noAck: true }, (msg) => resolve(msg.bodyString()!))
})
expect(received).toBe("via subscribe")
await session.stop()
})
test("AMQPQueue.publish() and get() confirmed round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const q = await session.queue("test-ws-spub-" + Math.random(), { durable: false, autoDelete: true })
await q.publish("confirmed")
const msg = await q.get({ noAck: true })
expect(msg?.bodyString()).toBe("confirmed")
await session.stop()
})
test("session.exchange() and AMQPExchange.publish() route messages", async () => {
const session = await AMQPSession.connect(WS_URL)
const xName = "test-ws-x-" + Math.random()
const q = await session.queue("test-ws-xq-" + Math.random(), { durable: false, autoDelete: true })
const x = await session.fanoutExchange(xName, { durable: false, autoDelete: true })
expect(x).toBeInstanceOf(AMQPExchange)
await q.bind(xName)
await x.publish("via exchange")
await new Promise((resolve) => setTimeout(resolve, 100))
const msg = await q.get({ noAck: true })
expect(msg?.bodyString()).toBe("via exchange")
await session.stop()
})
test("session.rpcClient() and session.rpcServer() round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const qName = "test-ws-rpc-" + Math.random()
await session.rpcServer(qName, (msg) => {
return `reply:${msg.bodyString()}`
})
const rpc = await session.rpcClient()
const reply = await rpc.call(qName, "hello")
expect(reply.bodyString()).toEqual("reply:hello")
await session.stop()
})
test("session.rpcCall() one-shot round-trip", async () => {
const session = await AMQPSession.connect(WS_URL)
const qName = "test-ws-rpc-oneshot-" + Math.random()
await session.rpcServer(qName, (msg) => {
return `got:${msg.bodyString()}`
})
const reply = await session.rpcCall(qName, "ping")
expect(reply.bodyString()).toEqual("got:ping")
await session.stop()
})