-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshared.ts
90 lines (75 loc) · 2.27 KB
/
shared.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import Client from "@axiomhq/axiom-node";
function throttle(fn: Function, wait: number) {
let lastFn: ReturnType<typeof setTimeout>, lastTime: number;
return function (this: any) {
const context = this,
args = arguments;
// First call, set lastTime
if (lastTime == null) {
lastTime = Date.now();
}
clearTimeout(lastFn);
lastFn = setTimeout(() => {
if (Date.now() - lastTime >= wait) {
fn.apply(context, args);
lastTime = Date.now();
}
}, Math.max(wait - (Date.now() - lastTime), 0));
};
}
export interface AxiomClient {
ingestEvents(events: Array<object> | object): Promise<void>
flush(): Promise<void>
}
export class ImmediateAxiomClient implements AxiomClient {
private readonly client: Client;
private readonly dataset: string;
constructor(token: string | undefined, dataset: string) {
this.client = new Client({ token });
this.dataset = dataset;
}
public async ingestEvents(events: Array<object> | object) {
await this.client.ingestEvents(this.dataset, events);
}
public async flush() {
// No-op
}
}
const FLUSH_INTERVAL = 1000;
const FLUSH_SIZE = 1000;
export class BatchedAxiomClient implements AxiomClient {
private readonly client: Client;
private readonly dataset: string;
private batch: object[];
private throttledFlush = throttle(this.flush.bind(this), FLUSH_INTERVAL);
private activeFlush: Promise<void> | null = null;
constructor(token: string | undefined, dataset: string) {
this.client = new Client({ token });
this.dataset = dataset;
this.batch = [];
}
// Ingests events into Axiom asynchronously every FLUSH_SIZE events or
// FLUSH_INTERVAL millis
public async ingestEvents(events: Array<object> | object) {
if (!Array.isArray(events)) {
this.batch.push(events);
} else {
this.batch.push(...events);
}
if (this.batch.length >= FLUSH_SIZE) {
this.flush();
} else {
this.throttledFlush();
}
}
public async flush() {
// If there's an active flush (due to throttling), wait for it to finish first
if (this.activeFlush) {
await this.activeFlush;
}
this.activeFlush = (async () => {
await this.client.ingestEvents(this.dataset, this.batch);
this.batch = [];
})()
}
}