From ba3587a5b7af43a8d04a910f2df7e32865489f31 Mon Sep 17 00:00:00 2001 From: Albert Chae Date: Wed, 11 Sep 2024 05:08:10 -0700 Subject: [PATCH 1/2] Implement rateLimit configuration Very similar to throttle implementation in https://github.com/inngest/inngest-kt/pull/66 --- .../testfunctions/RateLimitedFunction.java | 28 +++++++++++++ .../springbootdemo/DemoTestConfiguration.java | 1 + .../RateLimitedFunctionIntegrationTest.java | 42 +++++++++++++++++++ .../src/main/kotlin/com/inngest/Function.kt | 2 + .../inngest/InngestFunctionConfigBuilder.kt | 27 ++++++++++++ 5 files changed, 100 insertions(+) create mode 100644 inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RateLimitedFunction.java create mode 100644 inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/RateLimitedFunctionIntegrationTest.java diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RateLimitedFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RateLimitedFunction.java new file mode 100644 index 00000000..1ab6b408 --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RateLimitedFunction.java @@ -0,0 +1,28 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.FunctionContext; +import com.inngest.InngestFunction; +import com.inngest.InngestFunctionConfigBuilder; +import com.inngest.Step; +import org.jetbrains.annotations.NotNull; + +import java.time.Duration; + +public class RateLimitedFunction extends InngestFunction { + + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("RateLimitedFunction") + .name("RateLimited Function") + .triggerEvent("test/rateLimit") + .rateLimit(3, Duration.ofSeconds(2)); + } + + @Override + public Integer execute(FunctionContext ctx, Step step) { + return step.run("result", () -> 42, Integer.class); + } +} + diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java index d4614f0f..dcd85d7e 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java @@ -24,6 +24,7 @@ protected HashMap functions() { addInngestFunction(functions, new InvokeFailureFunction()); addInngestFunction(functions, new TryCatchRunFunction()); addInngestFunction(functions, new ThrottledFunction()); + addInngestFunction(functions, new RateLimitedFunction()); addInngestFunction(functions, new DebouncedFunction()); addInngestFunction(functions, new PriorityFunction()); addInngestFunction(functions, new IdempotentFunction()); diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/RateLimitedFunctionIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/RateLimitedFunctionIntegrationTest.java new file mode 100644 index 00000000..7c7fdb1c --- /dev/null +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/RateLimitedFunctionIntegrationTest.java @@ -0,0 +1,42 @@ +package com.inngest.springbootdemo; + +import com.inngest.Inngest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@IntegrationTest +@Execution(ExecutionMode.CONCURRENT) +class RateLimitedFunctionIntegrationTest { + @Autowired + private DevServerComponent devServer; + + @Autowired + private Inngest client; + + @Test + void testFunctionIsRateLimited() throws Exception { + String event1 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0]; + Thread.sleep(500); + String event2 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0]; + Thread.sleep(500); + String event3 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0]; + + Thread.sleep(4000); + + // Rate limit should only allow the first 2 events to run + assertEquals("Completed", devServer.runsByEvent(event1).first().getStatus()); + assertEquals("Completed", devServer.runsByEvent(event2).first().getStatus()); + assertEquals(0, devServer.runsByEvent(event3).data.length); + + // new event after the rate limit period will run, but the previously skipped event will stay skipped + String event4 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0]; + Thread.sleep(4000); + + assertEquals(0, devServer.runsByEvent(event3).data.length); + assertEquals("Completed", devServer.runsByEvent(event4).first().getStatus()); + } +} diff --git a/inngest/src/main/kotlin/com/inngest/Function.kt b/inngest/src/main/kotlin/com/inngest/Function.kt index f1a83171..0a1a01a8 100644 --- a/inngest/src/main/kotlin/com/inngest/Function.kt +++ b/inngest/src/main/kotlin/com/inngest/Function.kt @@ -81,6 +81,8 @@ internal class InternalFunctionConfig @Json(serializeNull = false) val throttle: Throttle? = null, @Json(serializeNull = false) + val rateLimit: RateLimit? = null, + @Json(serializeNull = false) val debounce: Debounce? = null, @Json(serializeNull = false) val priority: Priority? = null, diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index 04b480f6..2006f004 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -14,6 +14,7 @@ class InngestFunctionConfigBuilder { private var concurrency: MutableList? = null private var retries = 3 private var throttle: Throttle? = null + private var rateLimit: RateLimit? = null private var debounce: Debounce? = null private var priority: Priority? = null private var idempotency: String? = null @@ -158,6 +159,21 @@ class InngestFunctionConfigBuilder { burst: Int? = null, ): InngestFunctionConfigBuilder = apply { this.throttle = Throttle(limit, period, key, burst) } + /** + * Configure function rate limit + * + * @param limit The number of times to allow the function to run per the given `period`. + * @param period The period of time to allow the function to run `limit` times. The period begins when the first matching event + * is received + * @param key An optional expression to use for rate limiting, similar to idempotency. + */ + @JvmOverloads + fun rateLimit( + limit: Int, + period: Duration, + key: String? = null, + ): InngestFunctionConfigBuilder = apply { this.rateLimit = RateLimit(limit, period, key) } + /** * Debounce delays functions for the `period` specified. If an event is sent, * the function will not run until at least `period` has elapsed. @@ -235,6 +251,7 @@ class InngestFunctionConfigBuilder { triggers, concurrency, throttle, + rateLimit, debounce, priority, idempotency, @@ -307,6 +324,16 @@ internal data class Throttle val burst: Int? = null, ) +internal data class RateLimit + @JvmOverloads + constructor( + val limit: Int, + @KlaxonDuration + val period: Duration, + @Json(serializeNull = false) + val key: String? = null, + ) + internal data class Debounce @JvmOverloads constructor( From d6ec58bc666abd59a4bd36c605e4f6b01c038e2a Mon Sep 17 00:00:00 2001 From: Albert Chae Date: Thu, 12 Sep 2024 17:44:46 -0700 Subject: [PATCH 2/2] Fix rate limiting test --- .../testfunctions/RateLimitedFunction.java | 2 +- .../RateLimitedFunctionIntegrationTest.java | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RateLimitedFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RateLimitedFunction.java index 1ab6b408..188d9d44 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RateLimitedFunction.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/RateLimitedFunction.java @@ -17,7 +17,7 @@ public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) .id("RateLimitedFunction") .name("RateLimited Function") .triggerEvent("test/rateLimit") - .rateLimit(3, Duration.ofSeconds(2)); + .rateLimit(2, Duration.ofSeconds(6)); } @Override diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/RateLimitedFunctionIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/RateLimitedFunctionIntegrationTest.java index 7c7fdb1c..3e46574b 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/RateLimitedFunctionIntegrationTest.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/RateLimitedFunctionIntegrationTest.java @@ -20,12 +20,16 @@ class RateLimitedFunctionIntegrationTest { @Test void testFunctionIsRateLimited() throws Exception { String event1 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0]; - Thread.sleep(500); + // Rate limit test function is limited to 2 over 6 seconds. Based on the simplistic description of GCRA in + // https://www.inngest.com/docs/guides/rate-limiting#how-rate-limiting-works + // we need to sleep at least 3 seconds here for the second event not to get rate limited + Thread.sleep(3500); String event2 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0]; - Thread.sleep(500); + Thread.sleep(1000); String event3 = InngestFunctionTestHelpers.sendEvent(client, "test/rateLimit").getIds()[0]; - Thread.sleep(4000); + // Sleep at least 6 seconds for the rate limit bucket to be completely cleared + Thread.sleep(6000); // Rate limit should only allow the first 2 events to run assertEquals("Completed", devServer.runsByEvent(event1).first().getStatus());