Skip to content

Commit

Permalink
rlqs: Enable timer in the callback function (envoyproxy#31679)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: tyxia <[email protected]>
  • Loading branch information
tyxia authored Jan 8, 2024
1 parent 0c6e47b commit d6d45bb
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 20 deletions.
3 changes: 2 additions & 1 deletion source/extensions/filters/http/rate_limit_quota/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ RateLimitQuotaFilter::sendImmediateReport(const size_t bucket_id,
ASSERT(client_.send_reports_timer != nullptr);
// Set the reporting interval and enable the timer.
const int64_t reporting_interval = PROTOBUF_GET_MS_REQUIRED(bucket_settings, reporting_interval);
client_.send_reports_timer->enableTimer(std::chrono::milliseconds(reporting_interval));
client_.report_interval_ms = std::chrono::milliseconds(reporting_interval);
client_.send_reports_timer->enableTimer(client_.report_interval_ms);

initiating_call_ = false;
// TODO(tyxia) Revisit later.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,8 @@ using BucketsCache = absl::flat_hash_map<size_t, std::unique_ptr<Bucket>>;

struct ThreadLocalClient : public Logger::Loggable<Logger::Id::rate_limit_quota> {
ThreadLocalClient(Envoy::Event::Dispatcher& dispatcher) {
// Create the quota usage report method that sends the reports the RLS server periodically.
send_reports_timer = dispatcher.createTimer([this] {
if (rate_limit_client != nullptr) {
rate_limit_client->sendUsageReport(absl::nullopt);
} else {
ENVOY_LOG(error, "Rate limit client has been destroyed; no periodical report send");
}
});
// Create the quota usage report method that sends the reports to the RLS server periodically.
send_reports_timer = dispatcher.createTimer([this] { sendPeriodicalReports(); });
}

// Disable copy constructor and assignment.
Expand All @@ -76,10 +70,27 @@ struct ThreadLocalClient : public Logger::Loggable<Logger::Id::rate_limit_quota>
}
}

// Helper function to send the reports periodically on timer.
void sendPeriodicalReports() {
if (rate_limit_client != nullptr) {
rate_limit_client->sendUsageReport(absl::nullopt);
} else {
ENVOY_LOG(error, "Rate limit client has been destroyed; no periodical report send");
}

if (send_reports_timer != nullptr) {
send_reports_timer->enableTimer(report_interval_ms);
} else {
ENVOY_LOG(error, "Reports timer has been destroyed; no periodical report send");
}
}

// Rate limit client. It is owned here (in TLS) and is used by all the buckets.
std::unique_ptr<RateLimitClient> rate_limit_client;
// The timer for sending the reports periodically.
Event::TimerPtr send_reports_timer;
// Periodical reporting interval(in milliseconds).
std::chrono::milliseconds report_interval_ms = std::chrono::milliseconds::zero();
};

class ThreadLocalBucket : public Envoy::ThreadLocal::ThreadLocalObject {
Expand Down
27 changes: 16 additions & 11 deletions test/extensions/filters/http/rate_limit_quota/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,19 +397,24 @@ TEST_P(RateLimitQuotaIntegrationTest, BasicFlowPeriodicalReport) {
EXPECT_TRUE(response_->complete());
EXPECT_EQ(response_->headers().getStatusValue(), "200");

// Trigger the periodical report.
// TODO(tyxia) Make interval configurable. It is 60s in ValidMatcherConfig.
simTime().advanceTimeWait(std::chrono::milliseconds(60000));
ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports));

// Build the response.
envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response2;
auto* bucket_action2 = rlqs_response2.add_bucket_action();
// Trigger the report periodically, 10 times.
for (int i = 0; i < 10; ++i) {
// Advance the time by report_interval.
// TODO(tyxia) Make interval configurable in the test. It is currently 60s in
// ValidMatcherConfig.
simTime().advanceTimeWait(std::chrono::milliseconds(60000));
// Checks that the rate limit server has received the periodical reports.
ASSERT_TRUE(rlqs_stream_->waitForGrpcMessage(*dispatcher_, reports));

// Build the rlqs server response.
envoy::service::rate_limit_quota::v3::RateLimitQuotaResponse rlqs_response2;
auto* bucket_action2 = rlqs_response2.add_bucket_action();

for (const auto& [key, value] : custom_headers_cpy) {
(*bucket_action2->mutable_bucket_id()->mutable_bucket()).insert({key, value});
for (const auto& [key, value] : custom_headers_cpy) {
(*bucket_action2->mutable_bucket_id()->mutable_bucket()).insert({key, value});
}
rlqs_stream_->sendGrpcMessage(rlqs_response2);
}
rlqs_stream_->sendGrpcMessage(rlqs_response2);
}

TEST_P(RateLimitQuotaIntegrationTest, MultiRequestWithTokenBucketThrottling) {
Expand Down

0 comments on commit d6d45bb

Please sign in to comment.