Skip to content

Commit

Permalink
🔀 merge: pull request #8 from kresil/res4j-rate-limiter
Browse files Browse the repository at this point in the history
Learn how Resilience4j implements the Rate Limiter mechanism
  • Loading branch information
franciscoengenheiro authored Aug 20, 2024
2 parents 55d087f + 4238310 commit d0b02da
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 5 deletions.
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ ktor-server-configyaml = { module = "io.ktor:ktor-server-config-yaml", version.r

# Resilience4j
resilience4j-circuitbreaker = { module = "io.github.resilience4j:resilience4j-circuitbreaker", version.ref = "resilience4j" }
resilience4j-ratelimiter = { module = "io.github.resilience4j:resilience4j-ratelimiter", version.ref = "resilience4j" }
resilience4j-retry = { module = "io.github.resilience4j:resilience4j-retry", version.ref = "resilience4j" }
resilience4j-kotlin = { module = "io.github.resilience4j:resilience4j-kotlin", version.ref = "resilience4j" }

Expand Down
79 changes: 74 additions & 5 deletions resilience4j/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@
- [Configuration](#configuration-2)
- [Sliding Window](#sliding-window)
- [Additional Details](#additional-details)
3. [Kotlin Multiplatform Design](#kotlin-multiplatform-design)
4. [Flow](#flow)
3. [Rate Limiter](#rate-limiter)
- [Configuration](#configuration-3)
- [Implementations](#implementations)
- [Approaches After Rate Exceeded](#approaches-after-rate-exceeded)
- [Additional Details](#additional-details-1)
4. [Kotlin Multiplatform Design](#kotlin-multiplatform-design)
5. [Flow](#flow)

## Retry

Expand Down Expand Up @@ -406,7 +411,7 @@ The circuit breaker, which acts like a proxy for the underlying operation, can b
- `Open`: The request from the application fails immediately, and an exception is returned to the application.
- `Half-Open`: A limited number of requests from the application are allowed to pass through and invoke the operation. If these requests are successful, it's assumed that the fault that was previously causing the failure has been fixed and the circuit breaker switches to the `Closed` state (the failure counter is reset). If any request fails, the circuit breaker assumes that the fault is still present so it reverts to the `Open` state and restarts the timeout timer to give the system a further period of time to recover from the failure.
- `Half-Open`: A limited number of requests from the application are allowed to pass through and invoke the operation. If these requests are successful, it's assumed that the fault that was previously causing the failure has been fixed and the circuit breaker switches to the `Closed` state (the failure counter is resetted). If any request fails, the circuit breaker assumes that the fault is still present so it reverts to the `Open` state and restarts the timeout timer to give the system a further period of time to recover from the failure.

> [!IMPORTANT]
> The `Half-Open` state is useful to prevent a recovering service from suddenly being flooded with requests. As a service recovers, it might be able to support a limited volume of requests until the recovery is complete, but while recovery is in progress, a flood of work can cause the service to time out or fail again.
Expand Down Expand Up @@ -501,7 +506,7 @@ From: [Resilience4j Circuit Breaker Docs](https://resilience4j.readme.io/docs/ci

> [!NOTE]
> `Resilience4j` also provides two more states: `DISABLED` (stopping automatic state transition, metrics and event publishing)
> and `FORCED_OPEN` (same behavior as disabled state, but always returning an exception), as well as manual control
> and `FORCED_OPEN` (same behavior as disabled state, but always returning an exception); as well as manual control
> over the possible state transitions.

> [!NOTE]
Expand Down Expand Up @@ -531,9 +536,73 @@ From [Resilience4j Circuit Breaker Docs](https://resilience4j.readme.io/docs/cir
Just like the [Retry](#retry) mechanism, the Circuit Breaker mechanism also provides:
- [Registry](#registry) for managing Circuit Breaker instances and configurations;
- [Decorators](#decorators) for wrapping functions with the Circuit Breaker logic;
- [Events](#events) for monitoring the Circuit Breaker's state transitions and outcomes.
- [Events](#events) for monitoring the Circuit Breaker's state transitions and outcomes;
- [Kotlin Interop](#kotlin-interop) for accessing the Circuit Breaker mechanism in Kotlin that compiles to JVM bytecode.
## Rate Limiter
Rate limiting restricts the number of requests a client can make to a service within a specified time frame.
It aims to prevent abuse, ensure fair usage,
protect the service from being overwhelmed and ensure that it remains responsive to legitimate users.
Contrary to throttling, rate limiting is applied to the number of requests per time frame, while throttling is applied to the rate of requests.
### Configuration
<table>
<tr>
<th>Config property</th>
<th>Default value</th>
<th>Description</th>
</tr>
<tr>
<td>timeoutDuration</td>
<td>5 [s]</td>
<td>The default wait time a thread waits for a permission.</td>
</tr>
<tr>
<td>limitRefreshPeriod</td>
<td>500 [ns]</td>
<td>The period of a limit refresh. After each period the rate limiter sets its permissions count back to the limitForPeriod value.</td>
</tr>
<tr>
<td>limitForPeriod</td>
<td>50</td>
<td>The number of permissions available during one limit refresh period.</td>
</tr>
<tr>
<td>drainPermissionsOnResult</td>
<td>Either<? extends Throwable, ? extends Result> -&gt; false</td>
<td>Configures a Predicate which evaluates if a result of the underlying service should be used to drain permissions.</td>
</tr>
</table>
> [!IMPORTANT]
> Both `limitForPeriod` and `limitRefreshPeriod` can be adjusted at runtime
> using the `changeLimitForPeriod` and `changeLimitRefreshPeriod` methods respectively of the `RateLimiter` instance.
From: [Resilience4j Rate Limiter Docs](https://resilience4j.readme.io/docs/ratelimiter#create-and-configure-a-ratelimiter)
### Implementations
There are two implementations of the Rate Limiter mechanism:
- `AtomicRateLimiter`: A rate limiter that manages its state via `AtomicReference`. Represents the default implementation.
- `SemaphoreBasedRateLimiter`: A rate limiter that uses `Semaphores` and a `Scheduler` that will refresh permissions after each `limitRefreshPeriod`.
### Approaches After Rate Exceeded
When the rate is exceeded, the Rate Limiter can either:
- `Block`: The rate limiter blocks the request.
- `Queue`: The rate limiter queues the request to be processed later.
- `Combined`: A way to combine both blocking and queuing based on some custom policy.
### Additional Details
Just like the [Retry](#retry) mechanism, the Rate Limiter mechanism also provides:
- [Registry](#registry) for managing Rate Limiter instances and configurations;
- [Decorators](#decorators) for wrapping functions with the Rate Limiter logic;
- [Events](#events) for monitoring the Rate Limiter's state transitions and outcomes;
- [Kotlin Interop](#kotlin-interop) for accessing the Rate Limiter mechanism in Kotlin that compiles to JVM bytecode.

## Kotlin Multiplatform Design

Resilience4j is compatible with Kotlin but only for the JVM environment.
Expand Down
1 change: 1 addition & 0 deletions resilience4j/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ repositories {

dependencies {
testImplementation(libs.resilience4j.circuitbreaker)
testImplementation(libs.resilience4j.ratelimiter)
testImplementation(libs.resilience4j.retry)
testImplementation(libs.resilience4j.kotlin)
testImplementation(libs.kotlinx.coroutines.core)
Expand Down
243 changes: 243 additions & 0 deletions resilience4j/src/test/java/ratelimiter/RateLimiterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package ratelimiter;

import io.github.resilience4j.core.functions.Either;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
import org.junit.jupiter.api.Test;
import service.RemoteService;

import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Logger;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

public class RateLimiterTest {

static Logger logger = Logger.getLogger(RateLimiterTest.class.getName());

@Test
public void testRateLimiterNormalBehavior() {
// given: a remote service
RemoteService service = mock(RemoteService.class);

// and: a rate limiter configuration
int limitForPeriod = 5;
Duration limitRefreshPeriod = Duration.ofSeconds(1);
Duration timeoutDuration = Duration.ofMillis(500);
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(limitForPeriod)
.limitRefreshPeriod(limitRefreshPeriod)
.timeoutDuration(timeoutDuration)
.build();

// and: a function is decorated with a rate limiter
RateLimiter rateLimiter = RateLimiter.of("test", config);
Function<Integer, Integer> decorated = RateLimiter
.decorateFunction(rateLimiter, service::process);

// and: logs are placed on all rate limiter events
logAllRateLimiterEvents(rateLimiter);

// and: the underlying service is configured to always return success
when(service.process(anyInt())).thenReturn(0);

// when: the decorated function is invoked within the limit
for (int i = 0; i < limitForPeriod; i++) {
decorated.apply(i);
}

// then: the rate limiter allows the calls
verify(service, times(limitForPeriod)).process(anyInt());

// when: the decorated function is invoked exceeding the limit
assertThrows(RequestNotPermitted.class, () -> decorated.apply(limitForPeriod + 1));

// then: the rate limiter blocks the call
verify(service, times(limitForPeriod)).process(anyInt());

// and: after the refresh period
sleepFor(limitRefreshPeriod.toMillis());

// then: the rate limiter allows the calls again
decorated.apply(0);
verify(service, times(limitForPeriod + 1)).process(anyInt());
}

@Test
public void testRateLimiterTimeout() {
// given: a remote service
RemoteService service = mock(RemoteService.class);

// and: a rate limiter configuration
int limitForPeriod = 2;
Duration limitRefreshPeriod = Duration.ofSeconds(1);
Duration timeoutDuration = Duration.ofMillis(100);
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(limitForPeriod)
.limitRefreshPeriod(limitRefreshPeriod)
.timeoutDuration(timeoutDuration)
.build();

// and: a function is decorated with a rate limiter
RateLimiter rateLimiter = RateLimiter.of("testTimeout", config);
Function<Integer, Integer> decorated = RateLimiter.decorateFunction(rateLimiter, service::process);

// and: logs are placed on all rate limiter events
logAllRateLimiterEvents(rateLimiter);

// and: the underlying service is configured to always return success
when(service.process(anyInt())).thenReturn(0);

// when: the decorated function is invoked, exceeding the limit
Executors.newSingleThreadExecutor().submit(() -> {
for (int i = 0; i < limitForPeriod + 1; i++) {
try {
decorated.apply(i);
} catch (Exception ignore) {
}
}
});

sleepFor(50L); // Wait for half of the timeout duration

// then: the rate limiter blocks the call and waits
assertThrows(RequestNotPermitted.class, () -> decorated.apply(limitForPeriod + 1));

// and: after the refresh period
sleepFor(limitRefreshPeriod.toMillis());

// then: the rate limiter allows the calls again
decorated.apply(0);
verify(service, times(limitForPeriod + 1)).process(anyInt());

}

@Test
public void testDynamicLimitChange() {
// given: a remote service
RemoteService service = mock(RemoteService.class);

// and: a rate limiter configuration
int initialLimitForPeriod = 2;
Duration limitRefreshPeriod = Duration.ofSeconds(1);
Duration timeoutDuration = Duration.ofMillis(500);
RateLimiterConfig initialConfig = RateLimiterConfig.custom()
.limitForPeriod(initialLimitForPeriod)
.limitRefreshPeriod(limitRefreshPeriod)
.timeoutDuration(timeoutDuration)
.build();

// and: a rate limiter is created with the initial configuration
RateLimiter rateLimiter = RateLimiter.of("testDynamic", initialConfig);
Function<Integer, Integer> decorated = RateLimiter.decorateFunction(rateLimiter, service::process);

// and: logs are placed on all rate limiter events
logAllRateLimiterEvents(rateLimiter);

// and: the underlying service is configured to always return success
when(service.process(anyInt())).thenReturn(0);

// when: the decorated function is invoked within the initial limit
for (int i = 0; i < initialLimitForPeriod; i++) {
decorated.apply(i);
}

// then: the rate limiter allows the calls
verify(service, times(initialLimitForPeriod)).process(anyInt());

// when: the limit is dynamically changed
int newLimitForPeriod = 5;
rateLimiter.changeLimitForPeriod(newLimitForPeriod);

// and: some time is allowed for the rate limiter to refresh
sleepFor(limitRefreshPeriod.toMillis());

// and: the decorated function is invoked within the new limit
for (int i = 0; i < newLimitForPeriod; i++) {
decorated.apply(0);
}

// then: the rate limiter allows the calls up to the new limit
int actualLimitCount = initialLimitForPeriod + newLimitForPeriod;
verify(service, times(actualLimitCount)).process(anyInt());

// when: the decorated function is invoked, exceeding the new limit
assertThrows(Exception.class, () -> decorated.apply(newLimitForPeriod + 1));

// then: the rate limiter blocks the call
verify(service, times(actualLimitCount)).process(anyInt());
}

@Test
public void testDrainPermissionsOnServiceResult() {
// given: a remote service
RemoteService service = mock(RemoteService.class);

// and: a rate limiter configuration
int limitForPeriod = 1500;
Duration limitRefreshPeriod = Duration.ofSeconds(1);
Duration timeoutDuration = Duration.ofMillis(500);
int resultToDrainOn = -1;
Predicate<Either<? extends Throwable, ?>> drainOnResult = Either -> Either.isRight() && Either.get().equals(resultToDrainOn);
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(limitForPeriod)
.limitRefreshPeriod(limitRefreshPeriod)
.timeoutDuration(timeoutDuration)
.drainPermissionsOnResult(drainOnResult)
.build();

// and: a rate limiter is created
RateLimiter rateLimiter = RateLimiter.of("testDrain", config);
Function<Integer, Integer> decorated = RateLimiter.decorateFunction(rateLimiter, service::process);

// and: logs are placed on all rate limiter events
logAllRateLimiterEvents(rateLimiter);

// and: the underlying service is configured to return different results
when(service.process(0)).thenReturn(resultToDrainOn);
when(service.process(1)).thenReturn(0);

// when: the decorated function is invoked successively a few times before exceeding the limit
int drainOnResultCount = 100;
for (int i = 0; i < drainOnResultCount; i++) {
decorated.apply(1);
}

// then: the rate limiter allows the calls
verify(service, times(drainOnResultCount)).process(anyInt());

// when: the decorated function is invoked with a result that should drain the permissions
decorated.apply(0);

// then: the rate limiter blocks the call
verify(service, times(drainOnResultCount)).process(1);

// and: after the refresh period
sleepFor(limitRefreshPeriod.toMillis());

// then: the rate limiter allows the calls again
for (int i = 0; i < limitForPeriod; i++) {
decorated.apply(1);
}
verify(service, times(drainOnResultCount + limitForPeriod)).process(1);

}

private static void sleepFor(Long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private static void logAllRateLimiterEvents(RateLimiter rateLimiter) {
rateLimiter.getEventPublisher()
.onEvent(event -> logger.info(event.toString()));
}
}

0 comments on commit d0b02da

Please sign in to comment.