diff --git a/gateway/src/commonMain/kotlin/ratelimit/IdentifyRateLimiter.kt b/gateway/src/commonMain/kotlin/ratelimit/IdentifyRateLimiter.kt index c3e7b008391e..b0743bbd2ee9 100644 --- a/gateway/src/commonMain/kotlin/ratelimit/IdentifyRateLimiter.kt +++ b/gateway/src/commonMain/kotlin/ratelimit/IdentifyRateLimiter.kt @@ -7,7 +7,6 @@ import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.onSubscription import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlin.coroutines.resume import kotlin.time.Duration.Companion.seconds @@ -94,15 +93,15 @@ private class IdentifyRateLimiterImpl( ) = scope.launch { val rateLimitKey = shardId % maxConcurrency val mutex = mutexesByRateLimitKey[rateLimitKey] - // best effort, only used for logging (might be false even if mutex.withLock suspends later if we are unlucky) - val wasLocked = mutex.isLocked + val wasLocked = !mutex.tryLock() if (wasLocked) { logger.debug { "Waiting for other shard(s) with rate_limit_key $rateLimitKey to identify before identifying on " + "shard $shardId" } + mutex.lock() } - mutex.withLock { // in case something terrible happens, ensure the mutex is unlocked + try { // in case something terrible happens, ensure the mutex is unlocked // using a timeout so a broken gateway won't block its rate_limit_key for a long time val responseToIdentify = withTimeoutOrNull(IDENTIFY_TIMEOUT) { events.onSubscription { // onSubscription ensures we don't miss events @@ -126,6 +125,8 @@ private class IdentifyRateLimiterImpl( } + ", delaying $DELAY_AFTER_IDENTIFY before freeing up rate_limit_key $rateLimitKey" } delay(DELAY_AFTER_IDENTIFY) // delay before unlocking mutex to free up the current rateLimitKey + } finally { + mutex.unlock() } }