Skip to content

Commit

Permalink
fix: remove rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
w3b6x9 committed Oct 24, 2023
1 parent f4c0aef commit de2b90b
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 70 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ import { RealtimeClient } from '@supabase/realtime-js'

const client = new RealtimeClient(REALTIME_URL, {
params: {
apikey: API_KEY,
eventsPerSecond: 10,
apikey: API_KEY
},
})

Expand Down Expand Up @@ -75,7 +74,6 @@ channel.subscribe((status, err) => {
- `REALTIME_URL` is `'ws://localhost:4000/socket'` when developing locally and `'wss://<project_ref>.supabase.co/realtime/v1'` when connecting to your Supabase project.
- `API_KEY` is a JWT whose claims must contain `exp` and `role` (existing database role).
- Channel name can be any `string`.
- `eventsPerSecond`, or client-side rate limiting, enforces the number of events sent to the Realtime server uniformly spread across a second. The default is 10, which means that the client can send one event, whether that's **Broadcast**/**Presence**/**Postgres CDC**, every 100 milliseconds. You may change this as you see fit, and choose to disable by passing in a negative number, but note that the server's rate limiting will need to be updated accordingly. You can learn more about Realtime's rate limits here: https://supabase.com/docs/guides/realtime/rate-limits.

## Broadcast

Expand Down
10 changes: 1 addition & 9 deletions src/RealtimeChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ export type RealtimePostgresChangesFilter<
filter?: string
}

export type RealtimeChannelSendResponse =
| 'ok'
| 'timed out'
| 'rate limited'
| 'error'
export type RealtimeChannelSendResponse = 'ok' | 'timed out' | 'error'

export enum REALTIME_POSTGRES_CHANGES_LISTEN_EVENT {
ALL = '*',
Expand Down Expand Up @@ -462,10 +458,6 @@ export default class RealtimeChannel {
return new Promise((resolve) => {
const push = this._push(args.type, args, opts.timeout || this.timeout)

if (push.rateLimited) {
resolve('rate limited')
}

if (args.type === 'broadcast' && !this.params?.config?.broadcast?.ack) {
resolve('ok')
}
Expand Down
41 changes: 3 additions & 38 deletions src/RealtimeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ export default class RealtimeClient {
error: [],
message: [],
}
eventsPerSecondLimitMs: number = 100
inThrottle: boolean = false
fetch: Fetch

/**
Expand Down Expand Up @@ -102,10 +100,6 @@ export default class RealtimeClient {
if (options?.heartbeatIntervalMs)
this.heartbeatIntervalMs = options.heartbeatIntervalMs

const eventsPerSecond = options?.params?.eventsPerSecond
if (eventsPerSecond)
this.eventsPerSecondLimitMs = Math.floor(1000 / eventsPerSecond)

const accessToken = options?.params?.apikey
if (accessToken) this.accessToken = accessToken

Expand Down Expand Up @@ -248,23 +242,16 @@ export default class RealtimeClient {
*
* If the socket is not connected, the message gets enqueued within a local buffer, and sent out when a connection is next established.
*/
push(data: RealtimeMessage): 'rate limited' | void {
push(data: RealtimeMessage): void {
const { topic, event, payload, ref } = data
let callback = () => {
const callback = () => {
this.encode(data, (result: any) => {
this.conn?.send(result)
})
}
this.log('push', `${topic} ${event} (${ref})`, payload)
if (this.isConnected()) {
if (['broadcast', 'presence', 'postgres_changes'].includes(event)) {
const isThrottled = this._throttle(callback)()
if (isThrottled) {
return 'rate limited'
}
} else {
callback()
}
callback()
} else {
this.sendBuffer.push(callback)
}
Expand Down Expand Up @@ -471,26 +458,4 @@ export default class RealtimeClient {
})
this.setAuth(this.accessToken)
}

/** @internal */
private _throttle(
callback: Function,
eventsPerSecondLimitMs: number = this.eventsPerSecondLimitMs
): () => boolean {
return () => {
if (this.inThrottle) return true

callback()

if (eventsPerSecondLimitMs > 0) {
this.inThrottle = true

setTimeout(() => {
this.inThrottle = false
}, eventsPerSecondLimitMs)
}

return false
}
}
}
6 changes: 1 addition & 5 deletions src/lib/push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ export default class Push {
callback: Function
}[] = []
refEvent: string | null = null
rateLimited: boolean = false

/**
* Initializes the Push
Expand Down Expand Up @@ -47,16 +46,13 @@ export default class Push {
}
this.startTimeout()
this.sent = true
const status = this.channel.socket.push({
this.channel.socket.push({
topic: this.channel.topic,
event: this.event,
payload: this.payload,
ref: this.ref,
join_ref: this.channel._joinRef(),
})
if (status === 'rate limited') {
this.rateLimited = true
}
}

updatePayload(payload: { [key: string]: any }): void {
Expand Down
17 changes: 2 additions & 15 deletions test/channel_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,7 @@ describe('send', () => {
if (status === "SUBSCRIBED") {
pushStub = sinon.stub(channel, '_push')
pushStub.returns({
rateLimited: false, receive: (status, cb) => {
receive: (status, cb) => {
if (status === 'ok') cb()
}
})
Expand All @@ -1100,24 +1100,11 @@ describe('send', () => {
})
})

it("tries to send message via ws conn when subscribed to channel but is rate limited", async () => {
channel.subscribe(async status => {
if (status === "SUBSCRIBED") {
pushStub = sinon.stub(channel, '_push')
pushStub.returns({ rateLimited: true })

const res = await channel.send({ type: 'test', id: 'u123' })

assert.equal(res, 'rate limited')
}
})
})

it("tries to send message via ws conn when subscribed to channel but times out", async () => {
channel.subscribe(async status => {
if (status === "SUBSCRIBED") {
pushStub = sinon.stub(channel, '_push')
pushStub.returns({ rateLimited: false, receive: (status, cb) => {
pushStub.returns({ receive: (status, cb) => {
if (status === 'timeout') cb()
}})

Expand Down

0 comments on commit de2b90b

Please sign in to comment.