Skip to content

Commit

Permalink
Merge from v7
Browse files Browse the repository at this point in the history
  • Loading branch information
electricessence committed Jan 31, 2024
2 parents 13b6594 + 05a1d36 commit 9245b48
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 54 deletions.
2 changes: 0 additions & 2 deletions Open.ChannelExtensions.ComparisonTests/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ await Enumerable
Console.WriteLine();
}

#if NETCOREAPP3_0
{
Console.WriteLine("Async Enumerable test...");
var sw = Stopwatch.StartNew();
Expand All @@ -120,7 +119,6 @@ await Enumerable
Console.WriteLine(sw.Elapsed);
Console.WriteLine();
}
#endif

}

Expand Down
4 changes: 1 addition & 3 deletions Open.ChannelExtensions.Tests/BatchTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ await c.Reader
_ = Task.Run(async () =>
{
await Task.Delay(60000, token);
if (!token.IsCancellationRequested) c.Writer.TryComplete(new Exception("Should have completed successfuly."));
if (!token.IsCancellationRequested) c.Writer.TryComplete(new Exception("Should have completed successfully."));
});
break;
case 2:
Expand Down Expand Up @@ -259,7 +259,6 @@ public static async Task TimeoutTest1()
}));
}

#if NET5_0_OR_GREATER
[Fact]
public static async Task BatchReadBehavior()
{
Expand Down Expand Up @@ -408,5 +407,4 @@ public static async IAsyncEnumerable<IList<T>> ReadBatchEnumerableAsyncBakedIn<T
if (item?.Count > 0) yield return item;
}
}
#endif
}
6 changes: 3 additions & 3 deletions Open.ChannelExtensions/BatchingChannelReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected BatchingChannelReader(
Timer? _timer;

/// <summary>
/// Specifies a timeout by which a batch will be emmited there is at least one item but has been waiting
/// Specifies a timeout by which a batch will be emitted there is at least one item but has been waiting
/// for longer than the timeout value.
/// </summary>
/// <param name="millisecondsTimeout">
Expand Down Expand Up @@ -151,7 +151,7 @@ protected override bool TryPipeItems(bool flush)
{
if (c is null)
{
newBatch = true; // a new batch could start but not be emmited.
newBatch = true; // a new batch could start but not be emitted.
_batch = c = CreateBatch(_batchSize);
AddBatchItem(c, item);
}
Expand Down Expand Up @@ -195,7 +195,7 @@ void Emit(ref TBatch? c)
{
_batch = null;
newBatch = false;
if (!batched) TryUpdateTimer(Timeout.Infinite); // Since we're emmitting one, let's ensure the timeout is cancelled.
if (!batched) TryUpdateTimer(Timeout.Infinite); // Since we're emitting one, let's ensure the timeout is cancelled.
batched = Buffer!.Writer.TryWrite(c!);
Debug.Assert(batched);
c = null;
Expand Down
32 changes: 16 additions & 16 deletions Open.ChannelExtensions/Extensions.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public static ValueTask<long> ReadAllAsync<T>(this ChannelReader<T> reader,
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> ReadAllAsync<T>(this ChannelReader<T> reader,
CancellationToken cancellationToken,
Func<T, long, ValueTask> receiver,
Expand Down Expand Up @@ -480,7 +480,7 @@ public static ValueTask<long> ReadAllAsync<TWrite, TRead>(this Channel<TWrite, T
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> ReadAllAsync<TWrite, TRead>(this Channel<TWrite, TRead> channel,
CancellationToken cancellationToken,
Func<TRead, long, ValueTask> receiver,
Expand Down Expand Up @@ -532,7 +532,7 @@ public static ValueTask<long> TaskReadAllAsync<TWrite, TRead>(this Channel<TWrit
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> TaskReadAllAsync<TWrite, TRead>(this Channel<TWrite, TRead> channel,
CancellationToken cancellationToken,
Func<TRead, long, Task> receiver,
Expand Down Expand Up @@ -581,7 +581,7 @@ public static ValueTask<long> ReadAllAsync<T>(this ChannelReader<T> reader,
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> ReadAllAsync<T>(this ChannelReader<T> reader,
CancellationToken cancellationToken,
Func<T, ValueTask> receiver,
Expand Down Expand Up @@ -630,7 +630,7 @@ public static ValueTask<long> TaskReadAllAsync<T>(this ChannelReader<T> reader,
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> TaskReadAllAsync<T>(this ChannelReader<T> reader,
CancellationToken cancellationToken,
Func<T, Task> receiver,
Expand Down Expand Up @@ -682,7 +682,7 @@ public static ValueTask<long> TaskReadAllAsync<TWrite, TRead>(this Channel<TWrit
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> TaskReadAllAsync<TWrite, TRead>(this Channel<TWrite, TRead> channel,
CancellationToken cancellationToken,
Func<TRead, Task> receiver,
Expand Down Expand Up @@ -740,7 +740,7 @@ public static ValueTask<long> ReadAllAsync<TWrite, TRead>(this Channel<TWrite, T
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> ReadAllAsync<TWrite, TRead>(this Channel<TWrite, TRead> channel,
CancellationToken cancellationToken,
Func<TRead, ValueTask> receiver,
Expand All @@ -753,7 +753,7 @@ public static ValueTask<long> ReadAllAsync<TWrite, TRead>(this Channel<TWrite, T
/// <typeparam name="T">The item type.</typeparam>
/// <param name="reader">The channel reader to read from.</param>
/// <param name="receiver">The receiver function.</param>
/// <param name="deferredExecution">If true, calls await Task.Yield() before writreadinging.</param>
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
Expand Down Expand Up @@ -783,7 +783,7 @@ public static ValueTask<long> ReadAll<T>(this ChannelReader<T> reader,
/// <param name="reader">The channel reader to read from.</param>
/// <param name="receiver">The receiver function.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="deferredExecution">If true, calls await Task.Yield() before writreadinging.</param>
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
public static ValueTask<long> ReadAll<T>(this ChannelReader<T> reader,
Expand All @@ -799,10 +799,10 @@ public static ValueTask<long> ReadAll<T>(this ChannelReader<T> reader,
/// <param name="reader">The channel reader to read from.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="receiver">The receiver function.</param>
/// <param name="deferredExecution">If true, calls await Task.Yield() before writreadinging.</param>
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> ReadAll<T>(this ChannelReader<T> reader,
CancellationToken cancellationToken,
Action<T, long> receiver,
Expand Down Expand Up @@ -860,7 +860,7 @@ public static ValueTask<long> ReadAll<TWrite, TRead>(this Channel<TWrite, TRead>
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> ReadAll<TWrite, TRead>(this Channel<TWrite, TRead> channel,
CancellationToken cancellationToken,
Action<TRead, long> receiver,
Expand Down Expand Up @@ -921,7 +921,7 @@ public static ValueTask<long> ReadAll<T>(this ChannelReader<T> reader,
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> ReadAll<T>(this ChannelReader<T> reader,
CancellationToken cancellationToken,
Action<T> receiver,
Expand Down Expand Up @@ -978,7 +978,7 @@ public static ValueTask<long> ReadAll<TWrite, TRead>(this Channel<TWrite, TRead>
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
/// The count should be ignored if the number of iterations could exceed the max value of long.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static ValueTask<long> ReadAll<TWrite, TRead>(this Channel<TWrite, TRead> channel,
CancellationToken cancellationToken,
Action<TRead> receiver,
Expand Down Expand Up @@ -1020,7 +1020,7 @@ public static ValueTask<long> ReadAllAsLines(this ChannelReader<string> reader,
/// </summary>
/// <typeparam name="T">The item type.</typeparam>
/// <param name="channel">The channel to read from.</param>
/// <param name="receiver">The TextWriter to recieve the lines.</param>
/// <param name="receiver">The TextWriter to receive the lines.</param>
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The count of items read after the reader has completed.
Expand All @@ -1041,7 +1041,7 @@ public static ValueTask<long> ReadAllAsLines<T>(this Channel<T, string> channel,
/// </summary>
/// <typeparam name="T">The item type.</typeparam>
/// <param name="channel">The channel to read from.</param>
/// <param name="receiver">The TextWriter to recieve the lines.</param>
/// <param name="receiver">The TextWriter to receive the lines.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <param name="deferredExecution">If true, calls await Task.Yield() before reading.</param>
/// <returns>The count of items read after the reader has completed.
Expand Down
12 changes: 6 additions & 6 deletions Open.ChannelExtensions/Extensions.ReadConcurrently.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async Task<long> Read()
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="receiver">The async receiver function.</param>
/// <returns>A task that completes when no more reading is to be done.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reader,
int maxConcurrency,
CancellationToken cancellationToken,
Expand Down Expand Up @@ -132,7 +132,7 @@ public static Task<long> ReadAllConcurrentlyAsync<TWrite, TRead>(this Channel<TW
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="receiver">The async receiver function.</param>
/// <returns>A task that completes when no more reading is to be done.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static Task<long> ReadAllConcurrentlyAsync<TWrite, TRead>(this Channel<TWrite, TRead> channel,
int maxConcurrency,
CancellationToken cancellationToken,
Expand Down Expand Up @@ -190,7 +190,7 @@ public static Task<long> ReadAllConcurrently<T>(this ChannelReader<T> reader,
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="receiver">The receiver function.</param>
/// <returns>A task that completes when no more reading is to be done.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static Task<long> ReadAllConcurrently<T>(this ChannelReader<T> reader,
int maxConcurrency,
CancellationToken cancellationToken,
Expand Down Expand Up @@ -228,7 +228,7 @@ public static Task<long> ReadAllConcurrently<TWrite, TRead>(this Channel<TWrite,
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="receiver">The receiver function.</param>
/// <returns>A task that completes when no more reading is to be done.</returns>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static Task<long> ReadAllConcurrently<TWrite, TRead>(this Channel<TWrite, TRead> channel,
int maxConcurrency,
CancellationToken cancellationToken,
Expand Down Expand Up @@ -300,7 +300,7 @@ async Task Read()
}

/// <inheritdoc cref="ReadAllAsEnumerablesAsync{T}(ChannelReader{T}, Func{IEnumerable{T}, ValueTask}, bool, CancellationToken)"/>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static Task ReadAllConcurrentlyAsEnumerablesAsync<T>(this ChannelReader<T> reader,
int maxConcurrency,
CancellationToken cancellationToken,
Expand All @@ -327,7 +327,7 @@ public static Task ReadAllConcurrentlyAsEnumerables<T>(this ChannelReader<T> rea
}

/// <inheritdoc cref="ReadAllAsEnumerablesAsync{T}(ChannelReader{T}, Func{IEnumerable{T}, ValueTask}, bool, CancellationToken)"/>
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convience.")]
[SuppressMessage("Design", "CA1068:CancellationToken parameters must come last", Justification = "Provided for aesthetic convenience.")]
public static Task ReadAllConcurrentlyAsEnumerables<T>(this ChannelReader<T> reader,
int maxConcurrency,
CancellationToken cancellationToken,
Expand Down
3 changes: 2 additions & 1 deletion Open.ChannelExtensions/Extensions.Source.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ public static ChannelReader<T> Source<T>(
CancellationToken cancellationToken)
=> Source(target, source, out _, false, cancellationToken);

#if NETSTANDARD2_1
#if NETSTANDARD2_0
#else
/// <inheritdoc cref="SourceAsync{TWrite, TRead}(Channel{TWrite, TRead}, IEnumerable{ValueTask{TWrite}}, out ValueTask{long}, bool, CancellationToken)"/>
public static ChannelReader<TRead> Source<TWrite, TRead>(
this Channel<TWrite, TRead> target,
Expand Down
Loading

0 comments on commit 9245b48

Please sign in to comment.