-
Notifications
You must be signed in to change notification settings - Fork 149
/
Copy pathrate-limiter.ts
109 lines (88 loc) · 3.61 KB
/
rate-limiter.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import { CAPTURE_RATE_LIMIT } from './constants'
import type { PostHog } from './posthog-core'
import { RequestResponse } from './types'
import { createLogger } from './utils/logger'
const logger = createLogger('[RateLimiter]')
const ONE_MINUTE_IN_MILLISECONDS = 60 * 1000
const RATE_LIMIT_EVENT = '$$client_ingestion_warning'
interface CaptureResponse {
quota_limited?: string[]
}
export class RateLimiter {
instance: PostHog
serverLimits: Record<string, number> = {}
captureEventsPerSecond: number
captureEventsBurstLimit: number
lastEventRateLimited = false
constructor(instance: PostHog) {
this.instance = instance
this.captureEventsPerSecond = instance.config.rate_limiting?.events_per_second || 10
this.captureEventsBurstLimit = Math.max(
instance.config.rate_limiting?.events_burst_limit || this.captureEventsPerSecond * 10,
this.captureEventsPerSecond
)
this.lastEventRateLimited = this.clientRateLimitContext(true).isRateLimited
}
public clientRateLimitContext(checkOnly = false): {
isRateLimited: boolean
remainingTokens: number
} {
// This is primarily to prevent runaway loops from flooding capture with millions of events for a single user.
// It's as much for our protection as theirs.
const now = new Date().getTime()
const bucket = this.instance.persistence?.get_property(CAPTURE_RATE_LIMIT) ?? {
tokens: this.captureEventsBurstLimit,
last: now,
}
bucket.tokens += ((now - bucket.last) / 1000) * this.captureEventsPerSecond
bucket.last = now
if (bucket.tokens > this.captureEventsBurstLimit) {
bucket.tokens = this.captureEventsBurstLimit
}
const isRateLimited = bucket.tokens < 1
if (!isRateLimited && !checkOnly) {
bucket.tokens = Math.max(0, bucket.tokens - 1)
}
if (isRateLimited && !this.lastEventRateLimited && !checkOnly) {
this.instance.capture(
RATE_LIMIT_EVENT,
{
$$client_ingestion_warning_message: `posthog-js client rate limited. Config is set to ${this.captureEventsPerSecond} events per second and ${this.captureEventsBurstLimit} events burst limit.`,
},
{
skip_client_rate_limiting: true,
}
)
}
this.lastEventRateLimited = isRateLimited
this.instance.persistence?.set_property(CAPTURE_RATE_LIMIT, bucket)
return {
isRateLimited,
remainingTokens: bucket.tokens,
}
}
public isServerRateLimited(batchKey: string | undefined): boolean {
const retryAfter = this.serverLimits[batchKey || 'events'] || false
if (retryAfter === false) {
return false
}
return new Date().getTime() < retryAfter
}
public checkForLimiting = (httpResponse: RequestResponse): void => {
const text = httpResponse.text
if (!text || !text.length) {
return
}
try {
const response: CaptureResponse = JSON.parse(text)
const quotaLimitedProducts = response.quota_limited || []
quotaLimitedProducts.forEach((batchKey) => {
logger.info(`${batchKey || 'events'} is quota limited.`)
this.serverLimits[batchKey] = new Date().getTime() + ONE_MINUTE_IN_MILLISECONDS
})
} catch (e: any) {
logger.warn(`could not rate limit - continuing. Error: "${e?.message}"`, { text })
return
}
}
}