Skip to content

Commit aa07ac0

Browse files
committed
Add initial impl of PartitionedRateLimiter.Create (#67677)
1 parent d649022 commit aa07ac0

19 files changed

+1324
-9
lines changed

src/libraries/System.Threading.RateLimiting/ref/System.Threading.RateLimiting.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public MetadataName(string name) { }
4040
public static bool operator !=(System.Threading.RateLimiting.MetadataName<T> left, System.Threading.RateLimiting.MetadataName<T> right) { throw null; }
4141
public override string ToString() { throw null; }
4242
}
43+
public static partial class PartitionedRateLimiter
44+
{
45+
public static System.Threading.RateLimiting.PartitionedRateLimiter<TResource> Create<TResource, TPartitionKey>(System.Func<TResource, System.Threading.RateLimiting.RateLimitPartition<TPartitionKey>> partitioner, System.Collections.Generic.IEqualityComparer<TPartitionKey>? equalityComparer = null) where TPartitionKey : notnull { throw null; }
46+
}
4347
public abstract partial class PartitionedRateLimiter<TResource> : System.IAsyncDisposable, System.IDisposable
4448
{
4549
protected PartitionedRateLimiter() { }
@@ -83,6 +87,21 @@ protected virtual void Dispose(bool disposing) { }
8387
public abstract bool TryGetMetadata(string metadataName, out object? metadata);
8488
public bool TryGetMetadata<T>(System.Threading.RateLimiting.MetadataName<T> metadataName, [System.Diagnostics.CodeAnalysis.MaybeNullAttribute] out T metadata) { throw null; }
8589
}
90+
public static partial class RateLimitPartition
91+
{
92+
public static System.Threading.RateLimiting.RateLimitPartition<TKey> CreateConcurrencyLimiter<TKey>(TKey partitionKey, System.Func<TKey, System.Threading.RateLimiting.ConcurrencyLimiterOptions> factory) { throw null; }
93+
public static System.Threading.RateLimiting.RateLimitPartition<TKey> CreateNoLimiter<TKey>(TKey partitionKey) { throw null; }
94+
public static System.Threading.RateLimiting.RateLimitPartition<TKey> CreateTokenBucketLimiter<TKey>(TKey partitionKey, System.Func<TKey, System.Threading.RateLimiting.TokenBucketRateLimiterOptions> factory) { throw null; }
95+
public static System.Threading.RateLimiting.RateLimitPartition<TKey> Create<TKey>(TKey partitionKey, System.Func<TKey, System.Threading.RateLimiting.RateLimiter> factory) { throw null; }
96+
}
97+
public partial struct RateLimitPartition<TKey>
98+
{
99+
private readonly TKey _PartitionKey_k__BackingField;
100+
private object _dummy;
101+
private int _dummyPrimitive;
102+
public RateLimitPartition(TKey partitionKey, System.Func<TKey, System.Threading.RateLimiting.RateLimiter> factory) { throw null; }
103+
public readonly TKey PartitionKey { get { throw null; } }
104+
}
86105
public abstract partial class ReplenishingRateLimiter : System.Threading.RateLimiting.RateLimiter
87106
{
88107
protected ReplenishingRateLimiter() { }

src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,26 @@ System.Threading.RateLimiting.RateLimitLease</PackageDescription>
2020
<Compile Include="System\Threading\RateLimiting\FixedWindowRateLimiterOptions.cs" />
2121
<Compile Include="System\Threading\RateLimiting\MetadataName.cs" />
2222
<Compile Include="System\Threading\RateLimiting\MetadataName.T.cs" />
23+
<Compile Include="System\Threading\RateLimiting\NoopLimiter.cs" />
24+
<Compile Include="System\Threading\RateLimiting\PartitionedRateLimiter.cs" />
2325
<Compile Include="System\Threading\RateLimiting\PartitionedRateLimiter.T.cs" />
2426
<Compile Include="System\Threading\RateLimiting\QueueProcessingOrder.cs" />
2527
<Compile Include="System\Threading\RateLimiting\RateLimiter.cs" />
2628
<Compile Include="System\Threading\RateLimiting\RateLimitLease.cs" />
29+
<Compile Include="System\Threading\RateLimiting\RateLimitPartition.cs" />
30+
<Compile Include="System\Threading\RateLimiting\RateLimitPartition.T.cs" />
2731
<Compile Include="System\Threading\RateLimiting\ReplenishingRateLimiter.cs" />
2832
<Compile Include="System\Threading\RateLimiting\SlidingWindowRateLimiter.cs" />
2933
<Compile Include="System\Threading\RateLimiting\SlidingWindowRateLimiterOptions.cs" />
34+
<Compile Include="System\Threading\RateLimiting\TimerAwaitable.cs" />
3035
<Compile Include="System\Threading\RateLimiting\TokenBucketRateLimiter.cs" />
3136
<Compile Include="System\Threading\RateLimiting\TokenBucketRateLimiterOptions.cs" />
3237
<Compile Include="$(CommonPath)System\Collections\Generic\Deque.cs" Link="Common\System\Collections\Generic\Deque.cs" />
3338
</ItemGroup>
3439
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETCoreApp'">
3540
<Reference Include="System.Runtime" />
3641
<Reference Include="System.Threading" />
42+
<Reference Include="System.Collections" />
3743
</ItemGroup>
3844
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
3945
<PackageReference Include="System.Threading.Tasks.Extensions" Version="$(SystemThreadingTasksExtensionsVersion)" />

src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,11 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, Canc
112112
RequestRegistration oldestRequest = _queue.DequeueHead();
113113
_queueCount -= oldestRequest.Count;
114114
Debug.Assert(_queueCount >= 0);
115-
oldestRequest.Tcs.TrySetResult(FailedLease);
115+
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
116+
{
117+
// Updating queue count is handled by the cancellation code
118+
_queueCount += oldestRequest.Count;
119+
}
116120
}
117121
while (_options.QueueLimit - _queueCount < permitCount);
118122
}
@@ -249,7 +253,7 @@ protected override void Dispose(bool disposing)
249253
? _queue.DequeueHead()
250254
: _queue.DequeueTail();
251255
next.CancellationTokenRegistration.Dispose();
252-
next.Tcs.SetResult(FailedLease);
256+
next.Tcs.TrySetResult(FailedLease);
253257
}
254258
}
255259
}

src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,10 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int requestCount, Can
128128
RequestRegistration oldestRequest = _queue.DequeueHead();
129129
_queueCount -= oldestRequest.Count;
130130
Debug.Assert(_queueCount >= 0);
131-
oldestRequest.Tcs.TrySetResult(FailedLease);
131+
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
132+
{
133+
_queueCount += oldestRequest.Count;
134+
}
132135
}
133136
while (_options.QueueLimit - _queueCount < requestCount);
134137
}
@@ -326,7 +329,7 @@ protected override void Dispose(bool disposing)
326329
? _queue.DequeueHead()
327330
: _queue.DequeueTail();
328331
next.CancellationTokenRegistration.Dispose();
329-
next.Tcs.SetResult(FailedLease);
332+
next.Tcs.TrySetResult(FailedLease);
330333
}
331334
}
332335
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Collections.Generic;
5+
using System.Threading.Tasks;
6+
7+
namespace System.Threading.RateLimiting
8+
{
9+
internal sealed class NoopLimiter : RateLimiter
10+
{
11+
private static readonly RateLimitLease _lease = new NoopLease();
12+
13+
private NoopLimiter() { }
14+
15+
public static NoopLimiter Instance { get; } = new NoopLimiter();
16+
17+
public override TimeSpan? IdleDuration => null;
18+
19+
public override int GetAvailablePermits() => int.MaxValue;
20+
21+
protected override RateLimitLease AcquireCore(int permitCount) => _lease;
22+
23+
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken)
24+
=> new ValueTask<RateLimitLease>(_lease);
25+
26+
private sealed class NoopLease : RateLimitLease
27+
{
28+
public override bool IsAcquired => true;
29+
30+
public override IEnumerable<string> MetadataNames => Array.Empty<string>();
31+
32+
public override bool TryGetMetadata(string metadataName, out object? metadata)
33+
{
34+
metadata = null;
35+
return false;
36+
}
37+
}
38+
}
39+
}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Collections.Generic;
5+
using System.Diagnostics;
6+
using System.Runtime.CompilerServices;
7+
using System.Threading.Tasks;
8+
9+
namespace System.Threading.RateLimiting
10+
{
11+
/// <summary>
12+
/// Contains methods to assist with creating a <see cref="PartitionedRateLimiter{TResource}"/>.
13+
/// </summary>
14+
public static class PartitionedRateLimiter
15+
{
16+
/// <summary>
17+
/// Method used to create a default implementation of <see cref="PartitionedRateLimiter{TResource}"/>.
18+
/// </summary>
19+
/// <typeparam name="TResource">The resource type that is being rate limited.</typeparam>
20+
/// <typeparam name="TPartitionKey">The type to distinguish partitions with.</typeparam>
21+
/// <param name="partitioner">Method called every time an Acquire or WaitAsync call is made to figure out what rate limiter to apply to the request.
22+
/// If the <see cref="RateLimitPartition{TKey}.PartitionKey"/> matches a cached entry then the rate limiter previously used for that key is used. Otherwise, the factory is called to get a new rate limiter.</param>
23+
/// <param name="equalityComparer">Optional <see cref="IEqualityComparer{T}"/> to customize the comparison logic for <typeparamref name="TPartitionKey"/>.</param>
24+
/// <returns></returns>
25+
public static PartitionedRateLimiter<TResource> Create<TResource, TPartitionKey>(
26+
Func<TResource, RateLimitPartition<TPartitionKey>> partitioner,
27+
IEqualityComparer<TPartitionKey>? equalityComparer = null) where TPartitionKey : notnull
28+
{
29+
return new DefaultPartitionedRateLimiter<TResource, TPartitionKey>(partitioner, equalityComparer);
30+
}
31+
}
32+
33+
internal sealed class DefaultPartitionedRateLimiter<TResource, TKey> : PartitionedRateLimiter<TResource> where TKey : notnull
34+
{
35+
private readonly Func<TResource, RateLimitPartition<TKey>> _partitioner;
36+
37+
// TODO: Look at ConcurrentDictionary to try and avoid a global lock
38+
private Dictionary<TKey, Lazy<RateLimiter>> _limiters;
39+
private bool _disposed;
40+
private TaskCompletionSource<object?> _disposeComplete = new(TaskCreationOptions.RunContinuationsAsynchronously);
41+
42+
// Used by the Timer to call TryRelenish on ReplenishingRateLimiters
43+
// We use a separate list to avoid running TryReplenish (which might be user code) inside our lock
44+
// And we cache the list to amortize the allocation cost to as close to 0 as we can get
45+
private List<Lazy<RateLimiter>> _cachedLimiters = new();
46+
private bool _cacheInvalid;
47+
private TimerAwaitable _timer;
48+
private Task _timerTask;
49+
50+
// Use the Dictionary as the lock field so we don't need to allocate another object for a lock and have another field in the object
51+
private object Lock => _limiters;
52+
53+
public DefaultPartitionedRateLimiter(Func<TResource, RateLimitPartition<TKey>> partitioner,
54+
IEqualityComparer<TKey>? equalityComparer = null)
55+
{
56+
_limiters = new Dictionary<TKey, Lazy<RateLimiter>>(equalityComparer);
57+
_partitioner = partitioner;
58+
59+
// TODO: Figure out what interval we should use
60+
_timer = new TimerAwaitable(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100));
61+
_timerTask = RunTimer();
62+
}
63+
64+
private async Task RunTimer()
65+
{
66+
_timer.Start();
67+
while (await _timer)
68+
{
69+
try
70+
{
71+
Replenish(this);
72+
}
73+
// TODO: Can we log to EventSource or somewhere? Maybe dispatch throwing the exception so it is at least an unhandled exception?
74+
catch { }
75+
}
76+
_timer.Dispose();
77+
}
78+
79+
public override int GetAvailablePermits(TResource resourceID)
80+
{
81+
return GetRateLimiter(resourceID).GetAvailablePermits();
82+
}
83+
84+
protected override RateLimitLease AcquireCore(TResource resourceID, int permitCount)
85+
{
86+
return GetRateLimiter(resourceID).Acquire(permitCount);
87+
}
88+
89+
protected override ValueTask<RateLimitLease> WaitAsyncCore(TResource resourceID, int permitCount, CancellationToken cancellationToken)
90+
{
91+
return GetRateLimiter(resourceID).WaitAsync(permitCount, cancellationToken);
92+
}
93+
94+
private RateLimiter GetRateLimiter(TResource resourceID)
95+
{
96+
RateLimitPartition<TKey> partition = _partitioner(resourceID);
97+
Lazy<RateLimiter>? limiter;
98+
lock (Lock)
99+
{
100+
ThrowIfDisposed();
101+
if (!_limiters.TryGetValue(partition.PartitionKey, out limiter))
102+
{
103+
// Using Lazy avoids calling user code (partition.Factory) inside the lock
104+
limiter = new Lazy<RateLimiter>(() => partition.Factory(partition.PartitionKey));
105+
_limiters.Add(partition.PartitionKey, limiter);
106+
// Cache is invalid now
107+
_cacheInvalid = true;
108+
}
109+
}
110+
return limiter.Value;
111+
}
112+
113+
protected override void Dispose(bool disposing)
114+
{
115+
if (!disposing)
116+
{
117+
return;
118+
}
119+
120+
bool alreadyDisposed = CommonDispose();
121+
122+
_timerTask.GetAwaiter().GetResult();
123+
_cachedLimiters.Clear();
124+
125+
if (alreadyDisposed)
126+
{
127+
_disposeComplete.Task.GetAwaiter().GetResult();
128+
return;
129+
}
130+
131+
// Safe to access _limiters outside the lock
132+
// The timer is no longer running and _disposed is set so anyone trying to access fields will be checking that first
133+
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> limiter in _limiters)
134+
{
135+
limiter.Value.Value.Dispose();
136+
}
137+
_limiters.Clear();
138+
_disposeComplete.TrySetResult(null);
139+
}
140+
141+
protected override async ValueTask DisposeAsyncCore()
142+
{
143+
bool alreadyDisposed = CommonDispose();
144+
145+
await _timerTask.ConfigureAwait(false);
146+
_cachedLimiters.Clear();
147+
148+
if (alreadyDisposed)
149+
{
150+
await _disposeComplete.Task.ConfigureAwait(false);
151+
return;
152+
}
153+
154+
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> limiter in _limiters)
155+
{
156+
await limiter.Value.Value.DisposeAsync().ConfigureAwait(false);
157+
}
158+
_limiters.Clear();
159+
_disposeComplete.TrySetResult(null);
160+
}
161+
162+
// This handles the common state changes that Dispose and DisposeAsync need to do, the individual limiters still need to be Disposed after this call
163+
private bool CommonDispose()
164+
{
165+
lock (Lock)
166+
{
167+
if (_disposed)
168+
{
169+
return true;
170+
}
171+
_disposed = true;
172+
_timer.Stop();
173+
}
174+
return false;
175+
}
176+
177+
private void ThrowIfDisposed()
178+
{
179+
if (_disposed)
180+
{
181+
throw new ObjectDisposedException(nameof(PartitionedRateLimiter));
182+
}
183+
}
184+
185+
private static void Replenish(DefaultPartitionedRateLimiter<TResource, TKey> limiter)
186+
{
187+
lock (limiter.Lock)
188+
{
189+
if (limiter._disposed)
190+
{
191+
return;
192+
}
193+
194+
// If the cache has been invalidated we need to recreate it
195+
if (limiter._cacheInvalid)
196+
{
197+
limiter._cachedLimiters.Clear();
198+
bool cacheStillInvalid = false;
199+
foreach (KeyValuePair<TKey, Lazy<RateLimiter>> kvp in limiter._limiters)
200+
{
201+
if (kvp.Value.IsValueCreated)
202+
{
203+
if (kvp.Value.Value is ReplenishingRateLimiter)
204+
{
205+
limiter._cachedLimiters.Add(kvp.Value);
206+
}
207+
}
208+
else
209+
{
210+
// In rare cases the RateLimiter will be added to the storage but not be initialized yet
211+
// keep cache invalid if there was a non-initialized RateLimiter
212+
// the next time we run the timer the cache will be updated
213+
// with the initialized RateLimiter
214+
cacheStillInvalid = true;
215+
}
216+
}
217+
limiter._cacheInvalid = cacheStillInvalid;
218+
}
219+
}
220+
221+
// cachedLimiters is safe to use outside the lock because it is only updated by the Timer
222+
// and the Timer avoids re-entrancy issues via the _executingTimer field
223+
foreach (Lazy<RateLimiter> rateLimiter in limiter._cachedLimiters)
224+
{
225+
Debug.Assert(rateLimiter.IsValueCreated && rateLimiter.Value is ReplenishingRateLimiter);
226+
((ReplenishingRateLimiter)rateLimiter.Value).TryReplenish();
227+
}
228+
}
229+
}
230+
}

0 commit comments

Comments
 (0)