Skip to content

Commit

Permalink
AsyncEnumerableBuffer throws ObjectDisposedException for access after…
Browse files Browse the repository at this point in the history
… disposal.

Add docs.
  • Loading branch information
CXuesong committed Oct 14, 2017
1 parent e52aa3d commit f154e36
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 12 deletions.
3 changes: 2 additions & 1 deletion AsyncEnumerableExtensions/AsyncEnumerableExtensions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
<TargetFramework>netstandard1.1</TargetFramework>
<PackageId>CXuesong.AsyncEnumerableExtensions</PackageId>
<Authors>CXuesong</Authors>
<Version>0.1.3</Version>
<Version>0.1.4</Version>
<Description>Some rudimentary utilities to flavor Ix.Async. Such as asynchronous generator methods.</Description>
<RepositoryUrl>https://github.com/CXuesong/AsyncEnumerableExtensions</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageReleaseNotes>See https://github.com/CXuesong/AsyncEnumerableExtensions/releases .</PackageReleaseNotes>
<PackageLicenseUrl>https://github.com/CXuesong/AsyncEnumerableExtensions/blob/master/LICENSE.txt</PackageLicenseUrl>
<PackageProjectUrl>https://github.com/CXuesong/AsyncEnumerableExtensions</PackageProjectUrl>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<GenerateDocumentationFile>True</GenerateDocumentationFile>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>../AsyncEnumerableExtensions.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
Expand Down
30 changes: 30 additions & 0 deletions AsyncEnumerableExtensions/AsyncEnumerableFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,43 @@ namespace AsyncEnumerableExtensions
public static class AsyncEnumerableFactory
{

/// <summary>
/// Creates an instance of <see cref="IAsyncEnumerable{T}"/> from an asynchronous generator method.
/// </summary>
/// <typeparam name="T">Type of the items.</typeparam>
/// <param name="generator">Generator method.</param>
/// <returns>The encapsulated <see cref="IAsyncEnumerable{T}"/>.</returns>
/// <remarks>
/// <para>The <paramref name="generator"/> parameter should use <see cref="IAsyncEnumerableSink{T}"/>
/// passed in to yield the items.</para>
/// <para>In this overload, the asynchronous generator accepts a <see cref="CancellationToken"/>
/// which is cancelled when the <see cref="IAsyncEnumerator{T}"/> is disposed.</para>
/// </remarks>
/// <see cref="IAsyncEnumerableSink{T}.Yield(T)"/>
/// <see cref="IAsyncEnumerableSink{T}.Wait"/>
/// <see cref="AsyncEnumerableSinkExtensions.YieldAndWait{T}(IAsyncEnumerableSink{T},T)"/>
public static IAsyncEnumerable<T> FromAsyncGenerator<T>(Func<IAsyncEnumerableSink<T>, CancellationToken, Task> generator)
{
if (generator == null) throw new ArgumentNullException(nameof(generator));
return new TaskAsyncEnumerable<T>(generator);
}

/// <summary>
/// Creates an instance of <see cref="IAsyncEnumerable{T}"/> from an asynchronous generator method.
/// </summary>
/// <typeparam name="T">Type of the items.</typeparam>
/// <param name="generator">Generator method.</param>
/// <returns>The encapsulated <see cref="IAsyncEnumerable{T}"/>.</returns>
/// <remarks>
/// <para>The <paramref name="generator"/> parameter should use <see cref="IAsyncEnumerableSink{T}"/>
/// passed in to yield the items. </para>
/// </remarks>
/// <see cref="IAsyncEnumerableSink{T}.Yield(T)"/>
/// <see cref="IAsyncEnumerableSink{T}.Wait"/>
/// <see cref="AsyncEnumerableSinkExtensions.YieldAndWait{T}(IAsyncEnumerableSink{T},T)"/>
public static IAsyncEnumerable<T> FromAsyncGenerator<T>(Func<IAsyncEnumerableSink<T>, Task> generator)
{
if (generator == null) throw new ArgumentNullException(nameof(generator));
return new TaskAsyncEnumerable<T>(generator);
}

Expand Down
2 changes: 1 addition & 1 deletion AsyncEnumerableExtensions/IAsyncEnumerableSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface IAsyncEnumerableSink<in T>
/// <param name="cancellationToken">The token used to cancel waiting.</param>
/// <returns>A task that completes when the yielded item has been consumed.</returns>
/// <exception cref="OperationCanceledException">The wait operation has been cancelled.</exception>
/// <exception cref="ObjectDisposedException">The sink does not accepts items anymore.</exception>
/// <exception cref="ObjectDisposedException">The sink does not accept items anymore.</exception>
Task Wait(CancellationToken cancellationToken);
}

Expand Down
18 changes: 8 additions & 10 deletions AsyncEnumerableExtensions/TaskAsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,26 @@

namespace AsyncEnumerableExtensions
{
public class TaskAsyncEnumerable<T> : IAsyncEnumerable<T>
internal class TaskAsyncEnumerable<T> : IAsyncEnumerable<T>
{

private readonly Func<IAsyncEnumerableSink<T>, CancellationToken, Task> generator;
private readonly bool acceptsCancellationToken;

public TaskAsyncEnumerable(Func<IAsyncEnumerableSink<T>, Task> sourceTask)
{
if (sourceTask == null) throw new ArgumentNullException(nameof(sourceTask));
generator = (sink, ct) => sourceTask(sink);
acceptsCancellationToken = false;
}

public TaskAsyncEnumerable(Func<IAsyncEnumerableSink<T>, CancellationToken, Task> sourceTask)
{
this.generator = sourceTask ?? throw new ArgumentNullException(nameof(sourceTask));
acceptsCancellationToken = true;
}

/// <inheritdoc />
public IAsyncEnumerator<T> GetEnumerator()
{
return new Enumerator(generator, acceptsCancellationToken);
return new Enumerator(generator);
}

private class Enumerator : IAsyncEnumerator<T>
Expand All @@ -43,7 +40,7 @@ private class Enumerator : IAsyncEnumerator<T>
private CancellationToken lastMoveNextCancellationToken = CancellationToken.None;
private CancellationTokenSource lastCombinedCancellationTokenSource;

public Enumerator(Func<IAsyncEnumerableSink<T>, CancellationToken, Task> generator, bool acceptsCancellationToken)
public Enumerator(Func<IAsyncEnumerableSink<T>, CancellationToken, Task> generator)
{
Debug.Assert(generator != null);
this.generator = generator;
Expand Down Expand Up @@ -188,10 +185,11 @@ public bool Yield(IEnumerable<T> items)

public Task Wait(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (cancellationToken.IsCancellationRequested)
return Task.Delay(-1, cancellationToken);
lock (syncLock)
{
if (queue == null) throw new ObjectDisposedException(nameof(AsyncEnumerableBuffer<T>));
if (queue == null) return ObjectDisposedTask;
if (queue.Count == 0) return CompletedTask;
if (onQueueExhaustedTcs == null)
{
Expand Down Expand Up @@ -249,8 +247,8 @@ internal void Terminate()
{
lock (syncLock)
{
onQueueExhaustedTcs?.TrySetCanceled();
onYieldTcs?.TrySetCanceled();
onQueueExhaustedTcs?.TrySetException(new ObjectDisposedException(nameof(AsyncEnumerableBuffer<T>)));
onYieldTcs?.TrySetException(new ObjectDisposedException(nameof(AsyncEnumerableBuffer<T>)));
onQueueExhaustedTcs = null;
onYieldTcs = null;
queue = null;
Expand Down

0 comments on commit f154e36

Please sign in to comment.