-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Expand file tree
/
Copy pathoffset_tracking_receive.js
More file actions
61 lines (54 loc) · 1.77 KB
/
offset_tracking_receive.js
File metadata and controls
61 lines (54 loc) · 1.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
const rabbit = require("rabbitmq-stream-js-client");
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
async function main() {
const streamName = "stream-offset-tracking-javascript";
console.log("Connecting...");
const client = await rabbit.connect({
hostname: "localhost",
port: 5552,
username: "guest",
password: "guest",
vhost: "/",
});
const consumerRef = "offset-tracking-tutorial";
let firstOffset = undefined;
let offsetSpecification = rabbit.Offset.first();
try {
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName });
firstOffset = offset + 1n;
offsetSpecification = rabbit.Offset.offset(firstOffset);
} catch (e) {}
let lastOffset = offsetSpecification.value;
let messageCount = 0;
const consumer = await client.declareConsumer(
{ stream: streamName, offset: offsetSpecification, consumerRef },
async (message) => {
messageCount++;
if (!firstOffset && messageCount === 1) {
firstOffset = message.offset;
console.log("First message received");
}
if (messageCount % 10 === 0) {
console.log("Storing offset");
await consumer.storeOffset(message.offset);
}
if (message.content.toString() === "marker") {
console.log("Marker found");
lastOffset = message.offset;
await consumer.storeOffset(message.offset);
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`);
await consumer.close(true);
}
}
);
console.log(`Start consuming...`);
await sleep(2000);
}
main()
.then(async () => {
await new Promise(function () {});
})
.catch((res) => {
console.log("Error while receiving message!", res);
process.exit(-1);
});