Skip to content

Commit

Permalink
test: don't poll more than once a minute
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed May 1, 2024
1 parent bc442b5 commit 345ac26
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions packages/core/src/anchor/anchor-processing-loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export class AnchorProcessingLoop {
*/
readonly #anchorStoreQueue: NamedTaskQueue
readonly #anchorPollingMetrics: TimeableMetric
/**
* Cache for the last time we polled a stream. Used to prevent polling the same stream more than once per minute.
*/
readonly #pollCache: Map<string, number> = new Map()

constructor(
batchSize: number,
Expand All @@ -50,6 +54,10 @@ export class AnchorProcessingLoop {
concurrency,
store.infiniteList(batchSize),
async (streamId) => {
// Exit early if we've already polled this stream in the last minute
if (!this.checkPollTime(streamId.toString())) {
return
}
try {
logger.verbose(
`Loading pending anchor metadata for Stream ${streamId} from AnchorRequestStore`
Expand Down Expand Up @@ -111,4 +119,19 @@ export class AnchorProcessingLoop {
this.#anchorPollingMetrics.stopPublishingStats()
return this.#loop.stop()
}

/**
* Check the last poll time for a stream. Updates the poll time and returns true if the stream has not been polled in
* the last minute.
*/
checkPollTime(streamId: string): boolean {
const currentTime = Date.now()
if (this.#pollCache.has(streamId) && (currentTime - this.#pollCache.get(streamId) < 60_000)) {
return false
}
// Add ±10 seconds of jitter to prevent all streams from being polled at the same time every minute after a restart
const jitter = Math.floor(Math.random() * 20_000) - 10_000
this.#pollCache.set(streamId, currentTime + jitter)
return true
}
}

0 comments on commit 345ac26

Please sign in to comment.