Skip to content

Commit

Permalink
Node 429 rate limit timeout support (#1085)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelGHSeg authored Jun 3, 2024
1 parent 0589554 commit 7f45c97
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 5 deletions.
75 changes: 75 additions & 0 deletions packages/node/src/plugins/segmentio/__tests__/publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,51 @@ const testClient = new TestFetchClient()
const makeReqSpy = jest.spyOn(testClient, 'makeRequest')
const getLastRequest = () => makeReqSpy.mock.lastCall![0]

class TestHeaders implements Headers {
private headers: Record<string, string>

constructor() {
this.headers = {}
}

append(name: string, value: string): void {
if (this.headers[name]) {
this.headers[name] += `, ${value}`
} else {
this.headers[name] = value
}
}

delete(name: string): void {
delete this.headers[name]
}

get(name: string): string | null {
return this.headers[name] || null
}

has(name: string): boolean {
return name in this.headers
}

set(name: string, value: string): void {
this.headers[name] = value
}

forEach(
callback: (value: string, name: string, parent: Headers) => void
): void {
for (const name in this.headers) {
callback(this.headers[name], name, this)
}
}

getSetCookie(): string[] {
// Implement the getSetCookie method here
return []
}
}

const createTestNodePlugin = (props: Partial<PublisherProps> = {}) =>
createConfiguredNodePlugin(
{
Expand Down Expand Up @@ -306,6 +351,36 @@ describe('error handling', () => {
`)
})

it('delays retrying 429 errors', async () => {
jest.useRealTimers()
const headers = new TestHeaders()
const resetTime = Date.now() + 350
headers.set('x-ratelimit-reset', resetTime.toString())
makeReqSpy
.mockReturnValueOnce(
createError({
status: 429,
statusText: 'Too Many Requests',
...headers,
})
)
.mockReturnValue(createSuccess())

const { plugin: segmentPlugin } = createTestNodePlugin({
maxRetries: 3,
flushAt: 1,
})

const context = new Context(eventFactory.alias('to', 'from'))
const pendingContext = segmentPlugin.alias(context)
validateMakeReqInputs(context)
expect(await pendingContext).toBe(context)
expect(makeReqSpy).toHaveBeenCalledTimes(2)
// Check that we've waited until roughly the reset time.
expect(Date.now()).toBeLessThanOrEqual(resetTime + 20)
expect(Date.now()).toBeGreaterThanOrEqual(resetTime - 20)
})

it.each([
{ status: 500, statusText: 'Internal Server Error' },
{ status: 300, statusText: 'Multiple Choices' },
Expand Down
27 changes: 22 additions & 5 deletions packages/node/src/plugins/segmentio/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ export class Publisher {
while (currentAttempt < maxAttempts) {
currentAttempt++

let requestedRetryTimeout: number | undefined
let failureReason: unknown
try {
if (this._disable) {
Expand Down Expand Up @@ -279,6 +280,20 @@ export class Publisher {
new Error(`[${response.status}] ${response.statusText}`)
)
return
} else if (response.status === 429) {
// Rate limited, wait for the reset time
if (response.headers && 'x-ratelimit-reset' in response.headers) {
const rateLimitResetTimestamp = parseInt(
response.headers['x-ratelimit-reset'],
10
)
if (isFinite(rateLimitResetTimestamp)) {
requestedRetryTimeout = rateLimitResetTimestamp - Date.now()
}
}
failureReason = new Error(
`[${response.status}] ${response.statusText}`
)
} else {
// Treat other errors as transient and retry.
failureReason = new Error(
Expand All @@ -298,11 +313,13 @@ export class Publisher {

// Retry after attempt-based backoff.
await sleep(
backoff({
attempt: currentAttempt,
minTimeout: 25,
maxTimeout: 1000,
})
requestedRetryTimeout
? requestedRetryTimeout
: backoff({
attempt: currentAttempt,
minTimeout: 25,
maxTimeout: 1000,
})
)
}
}
Expand Down

0 comments on commit 7f45c97

Please sign in to comment.