diff --git a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt index e69de29bb2..470d59e293 100644 --- a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt @@ -0,0 +1,3 @@ +OpenTelemetry.Batch.Transform(OpenTelemetry.BatchTransformationPredicate! transformation, ref TState state) -> void +OpenTelemetry.BatchTransformationPredicate +virtual OpenTelemetry.BatchTransformationPredicate.Invoke(TItem! item, ref TState state) -> bool diff --git a/src/OpenTelemetry/Batch.cs b/src/OpenTelemetry/Batch.cs index b4ddd08cf1..d0cd04be9f 100644 --- a/src/OpenTelemetry/Batch.cs +++ b/src/OpenTelemetry/Batch.cs @@ -1,14 +1,29 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +using System.Buffers; using System.Collections; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; using OpenTelemetry.Internal; using OpenTelemetry.Logs; namespace OpenTelemetry; +/// +/// Represents a callback action for transforming and filtering items contained +/// in a . +/// +/// Item type. +/// State type. +/// Item being transformed. +/// The state supplied for the transformation. +/// Return to indicate the item should be +/// removed from the . +public delegate bool BatchTransformationPredicate(TItem item, ref TState state) + where TItem : class; + /// /// Stores a batch of completed objects to be exported. /// @@ -16,9 +31,10 @@ namespace OpenTelemetry; public readonly struct Batch : IDisposable where T : class { - private readonly T? item = null; - private readonly CircularBuffer? circularBuffer = null; - private readonly T[]? items = null; + internal readonly T? Item; + internal readonly CircularBuffer? CircularBuffer; + internal readonly T[]? Items; + internal readonly bool Rented; private readonly long targetCount; /// @@ -27,11 +43,17 @@ namespace OpenTelemetry; /// The items to store in the batch. /// The number of items in the batch. public Batch(T[] items, int count) + : this(items, count, rented: false) + { + } + + internal Batch(T[] items, int count, bool rented) { Guard.ThrowIfNull(items); Guard.ThrowIfOutOfRange(count, min: 0, max: items.Length); - this.items = items; + this.Items = items; + this.Rented = rented; this.Count = this.targetCount = count; } @@ -39,7 +61,7 @@ internal Batch(T item) { Debug.Assert(item != null, $"{nameof(item)} was null."); - this.item = item; + this.Item = item; this.Count = this.targetCount = 1; } @@ -48,7 +70,7 @@ internal Batch(CircularBuffer circularBuffer, int maxSize) Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number."); Debug.Assert(circularBuffer != null, $"{nameof(circularBuffer)} was null."); - this.circularBuffer = circularBuffer; + this.CircularBuffer = circularBuffer; this.Count = Math.Min(maxSize, circularBuffer!.Count); this.targetCount = circularBuffer.RemovedCount + this.Count; } @@ -60,25 +82,118 @@ internal Batch(CircularBuffer circularBuffer, int maxSize) /// public long Count { get; } - /// - public void Dispose() + /// + /// Transforms and filters the items of a using the + /// supplied and state. + /// + /// State type. + /// Transformation function. Return to remove an item from the . + /// State to be passed into . + public void Transform(BatchTransformationPredicate transformation, ref TState state) { - if (this.circularBuffer != null) + Guard.ThrowIfNull(transformation); + + if (this.Count <= 0) + { + return; + } + + if (this.Item != null) { - // Drain anything left in the batch. - while (this.circularBuffer.RemovedCount < this.targetCount) + Debug.Assert( + typeof(T) != typeof(LogRecord) + || ((LogRecord)(object)this.Item).Source != LogRecord.LogRecordSource.FromSharedPool, + "Batch contained a single item rented from the shared pool"); + + // Special case for a batch of a single item + + if (!TransformItem(transformation, ref state, this.Item)) { - T item = this.circularBuffer.Read(); - if (typeof(T) == typeof(LogRecord)) + Unsafe.AsRef(in this) = new Batch(Array.Empty(), 0, rented: false); + } + + return; + } + + var rentedArray = ArrayPool.Shared.Rent((int)this.Count); + + var i = 0; + + if (typeof(T) == typeof(LogRecord)) + { + foreach (var item in this) + { + if (TransformItem(transformation, ref state, item)) { var logRecord = (LogRecord)(object)item; if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) { - LogRecordSharedPool.Current.Return(logRecord); + logRecord.AddReference(); } + + rentedArray[i++] = item; } } } + else + { + foreach (var item in this) + { + if (TransformItem(transformation, ref state, item)) + { + rentedArray[i++] = item; + } + } + } + + this.Dispose(); + + Unsafe.AsRef(in this) = new Batch(rentedArray, i, rented: true); + + static bool TransformItem(BatchTransformationPredicate transformation, ref TState state, T item) + { + try + { + return transformation(item, ref state); + } + catch (Exception ex) + { + OpenTelemetrySdkEventSource.Log.BatchTransformationException(ex); + return true; + } + } + } + + /// + public void Dispose() + { + if (this.CircularBuffer != null) + { + // Note: Drain anything left in the batch and return to the pool if + // needed. + while (this.CircularBuffer.RemovedCount < this.targetCount) + { + T item = this.CircularBuffer.Read(); + if (typeof(T) == typeof(LogRecord)) + { + Enumerator.TryReturnLogRecordToPool(item); + } + } + } + else if (this.Items != null && this.Rented) + { + // Note: We don't attempt to return individual LogRecords to the + // pool. If the batch wasn't drained fully some records may get + // garbage collected but the pool will recreate more as needed. The + // idea is most batches are expected to be drained during export so + // it isn't worth the effort to track what was/was not returned. + + ArrayPool.Shared.Return(this.Items); + + Unsafe.AsRef(in this) = new Batch(Array.Empty(), 0); + } } /// @@ -87,12 +202,12 @@ public void Dispose() /// . public Enumerator GetEnumerator() { - return this.circularBuffer != null - ? new Enumerator(this.circularBuffer, this.targetCount) - : this.item != null - ? new Enumerator(this.item) + return this.CircularBuffer != null + ? new Enumerator(this.CircularBuffer, this.targetCount) + : this.Item != null + ? new Enumerator(this.Item) /* In the event someone uses default/new Batch() to create Batch we fallback to empty items mode. */ - : new Enumerator(this.items ?? Array.Empty(), this.targetCount); + : new Enumerator(this.Items ?? Array.Empty(), this.targetCount); } /// @@ -114,6 +229,8 @@ public struct Enumerator : IEnumerator private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBuffer = (ref Enumerator enumerator) => { + Debug.Assert(typeof(T) != typeof(LogRecord), "T was an unexpected type"); + var circularBuffer = enumerator.circularBuffer; if (circularBuffer!.RemovedCount < enumerator.targetCount) @@ -128,37 +245,45 @@ public struct Enumerator : IEnumerator private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBufferLogRecord = (ref Enumerator enumerator) => { - // Note: This type check here is to give the JIT a hint it can - // remove all of this code when T != LogRecord - if (typeof(T) == typeof(LogRecord)) + Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type"); + + TryReturnCurrentLogRecordToPool(ref enumerator); + + var circularBuffer = enumerator.circularBuffer; + + if (circularBuffer!.RemovedCount < enumerator.targetCount) { - var circularBuffer = enumerator.circularBuffer; + enumerator.current = circularBuffer.Read(); + return true; + } - var currentItem = enumerator.Current; + enumerator.current = null; - if (currentItem != null) - { - var logRecord = (LogRecord)(object)currentItem; - if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) - { - LogRecordSharedPool.Current.Return(logRecord); - } - } + return false; + }; - if (circularBuffer!.RemovedCount < enumerator.targetCount) - { - enumerator.current = circularBuffer.Read(); - return true; - } + private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) => + { + Debug.Assert(typeof(T) != typeof(LogRecord), "T was an unexpected type"); - enumerator.current = null; + var items = enumerator.items; + + if (enumerator.itemIndex < enumerator.targetCount) + { + enumerator.current = items![enumerator.itemIndex++]; + return true; } + enumerator.current = null; return false; }; - private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) => + private static readonly BatchEnumeratorMoveNextFunc MoveNextArrayLogRecord = (ref Enumerator enumerator) => { + Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type"); + + TryReturnCurrentLogRecordToPool(ref enumerator); + var items = enumerator.items; if (enumerator.itemIndex < enumerator.targetCount) @@ -168,6 +293,7 @@ public struct Enumerator : IEnumerator } enumerator.current = null; + return false; }; @@ -206,7 +332,7 @@ internal Enumerator(T[] items, long targetCount) this.items = items; this.targetCount = targetCount; this.itemIndex = 0; - this.moveNextFunc = MoveNextArray; + this.moveNextFunc = typeof(T) == typeof(LogRecord) ? MoveNextArrayLogRecord : MoveNextArray; } /// @@ -223,12 +349,7 @@ public void Dispose() var currentItem = this.current; if (currentItem != null) { - var logRecord = (LogRecord)(object)currentItem; - if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) - { - LogRecordSharedPool.Current.Return(logRecord); - } - + TryReturnLogRecordToPool(currentItem); this.current = null; } } @@ -243,5 +364,29 @@ public bool MoveNext() /// public readonly void Reset() => throw new NotSupportedException(); + + internal static void TryReturnLogRecordToPool(T currentItem) + { + Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type"); + Debug.Assert(currentItem != null, "currentItem was null"); + + var logRecord = (LogRecord)(object)currentItem!; + if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) + { + LogRecordSharedPool.Current.Return(logRecord); + } + } + + private static void TryReturnCurrentLogRecordToPool(ref Enumerator enumerator) + { + Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type"); + + var currentItem = enumerator.Current; + + if (currentItem != null) + { + TryReturnLogRecordToPool(currentItem); + } + } } } diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index ca4d195698..0eef208b04 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +* Added `Transform` API on `Batch` which can be used to modify and/or filter + records contained in a batch. + ([#5733](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5733)) + ## 1.9.0 Released 2024-Jun-14 diff --git a/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs b/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs index 88312e3a14..dcc5feb2f7 100644 --- a/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs +++ b/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs @@ -167,6 +167,15 @@ public void MetricViewException(string source, Exception ex) } } + [NonEvent] + public void BatchTransformationException(Exception ex) + { + if (this.IsEnabled(EventLevel.Error, EventKeywords.All)) + { + this.BatchTransformationException(typeof(T).FullName!, ex.ToInvariantString()); + } + } + [Event(4, Message = "Unknown error in SpanProcessor event '{0}': '{1}'.", Level = EventLevel.Error)] public void SpanProcessorException(string evnt, string ex) { @@ -374,6 +383,12 @@ public void MetricViewException(string source, string ex) this.WriteEvent(56, source, ex); } + [Event(57, Message = "Exception thrown by user code supplied as batch of type '{0}' transformation function: '{1}'.", Level = EventLevel.Error)] + public void BatchTransformationException(string type, string ex) + { + this.WriteEvent(57, type, ex); + } + void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value) { this.InvalidConfigurationValue(key, value); diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs b/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs index c298bef430..d7a1cd723b 100644 --- a/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs +++ b/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs @@ -15,7 +15,7 @@ public static void Clear(LogRecord logRecord) { if (attributeStorage.Count > DefaultMaxNumberOfAttributes) { - // Don't allow the pool to grow unconstained. + // Don't allow the pool to grow unconstrained. logRecord.AttributeStorage = null; } else @@ -31,7 +31,7 @@ underlying array (capacity). */ { if (scopeStorage.Count > DefaultMaxNumberOfScopes) { - // Don't allow the pool to grow unconstained. + // Don't allow the pool to grow unconstrained. logRecord.ScopeStorage = null; } else diff --git a/test/OpenTelemetry.Tests/BatchTest.cs b/test/OpenTelemetry.Tests/BatchTest.cs new file mode 100644 index 0000000000..4f8e98ecc5 --- /dev/null +++ b/test/OpenTelemetry.Tests/BatchTest.cs @@ -0,0 +1,383 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#nullable enable + +using OpenTelemetry.Internal; +using OpenTelemetry.Logs; +using Xunit; + +namespace OpenTelemetry.Tests; + +public class BatchTest +{ + [Fact] + public void CheckConstructorExceptions() + { + Assert.Throws(() => new Batch((string[])null!, 0)); + Assert.Throws(() => new Batch(Array.Empty(), -1)); + Assert.Throws(() => new Batch(Array.Empty(), 1)); + } + + [Fact] + public void CheckValidConstructors() + { + var value = "a"; + var batch = new Batch(value); + foreach (var item in batch) + { + Assert.Equal(value, item); + } + + var circularBuffer = new CircularBuffer(1); + circularBuffer.Add(value); + batch = new Batch(circularBuffer, 1); + foreach (var item in batch) + { + Assert.Equal(value, item); + } + } + + [Fact] + public void CheckDispose() + { + var value = "a"; + var batch = new Batch(value); + batch.Dispose(); // A test to make sure it doesn't bomb on a null CircularBuffer. + + var circularBuffer = new CircularBuffer(10); + circularBuffer.Add(value); + circularBuffer.Add(value); + circularBuffer.Add(value); + batch = new Batch(circularBuffer, 10); // Max size = 10 + batch.GetEnumerator().MoveNext(); + Assert.Equal(3, circularBuffer.AddedCount); + Assert.Equal(1, circularBuffer.RemovedCount); + batch.Dispose(); // Test anything remaining in the batch is drained when disposed. + Assert.Equal(3, circularBuffer.AddedCount); + Assert.Equal(3, circularBuffer.RemovedCount); + batch.Dispose(); // Verify we don't go into an infinite loop or thrown when empty. + + circularBuffer = new CircularBuffer(10); + circularBuffer.Add(value); + circularBuffer.Add(value); + circularBuffer.Add(value); + batch = new Batch(circularBuffer, 2); // Max size = 2 + Assert.Equal(3, circularBuffer.AddedCount); + Assert.Equal(0, circularBuffer.RemovedCount); + batch.Dispose(); // Test the batch is drained up to max size. + Assert.Equal(3, circularBuffer.AddedCount); + Assert.Equal(2, circularBuffer.RemovedCount); + } + + [Fact] + public void CheckEnumerator() + { + var value = "a"; + var batch = new Batch(value); + var enumerator = batch.GetEnumerator(); + ValidateEnumerator(enumerator, value); + + var circularBuffer = new CircularBuffer(1); + circularBuffer.Add(value); + batch = new Batch(circularBuffer, 1); + enumerator = batch.GetEnumerator(); + ValidateEnumerator(enumerator, value); + } + + [Fact] + public void CheckMultipleEnumerator() + { + var value = "a"; + var circularBuffer = new CircularBuffer(10); + circularBuffer.Add(value); + circularBuffer.Add(value); + circularBuffer.Add(value); + var batch = new Batch(circularBuffer, 10); + + int itemsProcessed = 0; + foreach (var item in batch) + { + itemsProcessed++; + } + + Assert.Equal(3, itemsProcessed); + + itemsProcessed = 0; + foreach (var item in batch) + { + itemsProcessed++; + } + + Assert.Equal(0, itemsProcessed); + } + + [Fact] + public void CheckEnumeratorResetException() + { + var value = "a"; + var batch = new Batch(value); + var enumerator = batch.GetEnumerator(); + Assert.Throws(() => enumerator.Reset()); + } + + [Fact] + public void DrainIntoNewBatchTest() + { + var circularBuffer = new CircularBuffer(100); + circularBuffer.Add("a"); + circularBuffer.Add("b"); + + Batch batch = new Batch(circularBuffer, 10); + + Assert.Equal(2, batch.Count); + + string[] storage = new string[10]; + int selectedItemCount = 0; + foreach (string item in batch) + { + if (item == "b") + { + storage[selectedItemCount++] = item; + } + } + + batch = new Batch(storage, selectedItemCount); + + Assert.Equal(1, batch.Count); + + ValidateEnumerator(batch.GetEnumerator(), "b"); + } + + [Fact] + public void TransformBatchUsingCircularBuffer() + { + var circularBuffer = new CircularBuffer(100); + circularBuffer.Add("a"); + circularBuffer.Add("b"); + + Batch batch = new Batch(circularBuffer, 10); + + Assert.Equal(2, batch.Count); + Assert.NotNull(batch.CircularBuffer); + Assert.Null(batch.Items); + Assert.False(batch.Rented); + + object? state = null; + + batch.Transform( + static (string item, ref object? state) => + { + return item == "a"; + }, + ref state); + + Assert.Equal(0, circularBuffer.Count); + Assert.Equal(1, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.True(batch.Rented); + + var previousRentedArray = batch.Items; + + batch.Transform( + static (string item, ref object? state) => + { + return false; + }, + ref state); + + Assert.Equal(0, batch.Count); + Assert.NotNull(batch.Items); + Assert.True(batch.Rented); + + Assert.NotEqual(previousRentedArray, batch.Items); + + previousRentedArray = batch.Items; + + batch.Dispose(); + + Assert.NotEqual(previousRentedArray, batch.Items); + Assert.False(batch.Rented); + } + + [Fact] + public void TransformBatchUsingCircularBufferOfLogRecords() + { + var pool = LogRecordSharedPool.Current; + + var circularBuffer = new CircularBuffer(100); + + var record = pool.Rent(); + record.CategoryName = "Category1"; + circularBuffer.Add(record); + + record = pool.Rent(); + record.CategoryName = "Category2"; + circularBuffer.Add(record); + + record = pool.Rent(); + record.CategoryName = "Category3"; + circularBuffer.Add(record); + + Batch batch = new Batch(circularBuffer, 10); + + Assert.Equal(3, batch.Count); + Assert.NotNull(batch.CircularBuffer); + Assert.Null(batch.Items); + Assert.False(batch.Rented); + + object? state = null; + + batch.Transform( + static (LogRecord item, ref object? state) => + { + return item.CategoryName != "Category3"; + }, + ref state); + + Assert.Equal(0, circularBuffer.Count); + Assert.Equal(2, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.True(batch.Rented); + + Assert.Equal(1, pool.Count); + + var previousRentedArray = batch.Items; + + batch.Transform( + static (LogRecord item, ref object? state) => + { + return item.CategoryName != "Category2"; + }, + ref state); + + Assert.Equal(1, batch.Count); + Assert.NotNull(batch.Items); + Assert.True(batch.Rented); + + Assert.NotEqual(previousRentedArray, batch.Items); + + previousRentedArray = batch.Items; + + var enumerator = batch.GetEnumerator(); + Assert.True(enumerator.MoveNext()); + enumerator.Dispose(); + + Assert.Equal(3, pool.Count); + + batch.Dispose(); + + Assert.NotEqual(previousRentedArray, batch.Items); + Assert.False(batch.Rented); + } + + [Fact] + public void TransformBatchEmpty() + { + var source = Array.Empty(); + + Batch batch = new Batch(source, 0); + + Assert.Equal(0, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.False(batch.Rented); + + object? state = null; + bool transformExecuted = false; + + batch.Transform( + (string item, ref object? state) => + { + transformExecuted = true; + return true; + }, + ref state); + + Assert.False(transformExecuted); + + Assert.Equal(0, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.False(batch.Rented); + + Assert.Equal(source, batch.Items); + } + + [Fact] + public void TransformBatchSingleItem() + { + Batch batch = new Batch("Item"); + + Assert.Equal(1, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.Null(batch.Items); + Assert.NotNull(batch.Item); + Assert.False(batch.Rented); + + object? state = null; + + batch.Transform( + (string item, ref object? state) => + { + return true; + }, + ref state); + + Assert.Equal(1, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.Null(batch.Items); + Assert.NotNull(batch.Item); + Assert.False(batch.Rented); + + batch.Transform( + static (string item, ref object? state) => + { + return false; + }, + ref state); + + Assert.Equal(0, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.Null(batch.Item); + Assert.False(batch.Rented); + } + + [Fact] + public void TransformBatchWithExceptionThrownInDelegate() + { + Batch batch = new Batch("Item"); + + Assert.Equal(1, batch.Count); + + object? state = null; + bool transformExecuted = false; + + batch.Transform( + (string item, ref object? state) => + { + transformExecuted = true; + throw new InvalidOperationException(); + }, + ref state); + + Assert.True(transformExecuted); + Assert.Equal(1, batch.Count); + } + + private static void ValidateEnumerator(Batch.Enumerator enumerator, string expected) + { + if (enumerator.Current != null) + { + Assert.Equal(expected, enumerator.Current); + } + + if (enumerator.MoveNext()) + { + Assert.Equal(expected, enumerator.Current); + } + } +} diff --git a/test/OpenTelemetry.Tests/Trace/BatchTest.cs b/test/OpenTelemetry.Tests/Trace/BatchTest.cs deleted file mode 100644 index b1d5b170d5..0000000000 --- a/test/OpenTelemetry.Tests/Trace/BatchTest.cs +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -using OpenTelemetry.Internal; -using Xunit; - -namespace OpenTelemetry.Trace.Tests; - -public class BatchTest -{ - [Fact] - public void CheckConstructorExceptions() - { - Assert.Throws(() => new Batch((string[])null, 0)); - Assert.Throws(() => new Batch(Array.Empty(), -1)); - Assert.Throws(() => new Batch(Array.Empty(), 1)); - } - - [Fact] - public void CheckValidConstructors() - { - var value = "a"; - var batch = new Batch(value); - foreach (var item in batch) - { - Assert.Equal(value, item); - } - - var circularBuffer = new CircularBuffer(1); - circularBuffer.Add(value); - batch = new Batch(circularBuffer, 1); - foreach (var item in batch) - { - Assert.Equal(value, item); - } - } - - [Fact] - public void CheckDispose() - { - var value = "a"; - var batch = new Batch(value); - batch.Dispose(); // A test to make sure it doesn't bomb on a null CircularBuffer. - - var circularBuffer = new CircularBuffer(10); - circularBuffer.Add(value); - circularBuffer.Add(value); - circularBuffer.Add(value); - batch = new Batch(circularBuffer, 10); // Max size = 10 - batch.GetEnumerator().MoveNext(); - Assert.Equal(3, circularBuffer.AddedCount); - Assert.Equal(1, circularBuffer.RemovedCount); - batch.Dispose(); // Test anything remaining in the batch is drained when disposed. - Assert.Equal(3, circularBuffer.AddedCount); - Assert.Equal(3, circularBuffer.RemovedCount); - batch.Dispose(); // Verify we don't go into an infinite loop or thrown when empty. - - circularBuffer = new CircularBuffer(10); - circularBuffer.Add(value); - circularBuffer.Add(value); - circularBuffer.Add(value); - batch = new Batch(circularBuffer, 2); // Max size = 2 - Assert.Equal(3, circularBuffer.AddedCount); - Assert.Equal(0, circularBuffer.RemovedCount); - batch.Dispose(); // Test the batch is drained up to max size. - Assert.Equal(3, circularBuffer.AddedCount); - Assert.Equal(2, circularBuffer.RemovedCount); - } - - [Fact] - public void CheckEnumerator() - { - var value = "a"; - var batch = new Batch(value); - var enumerator = batch.GetEnumerator(); - ValidateEnumerator(enumerator, value); - - var circularBuffer = new CircularBuffer(1); - circularBuffer.Add(value); - batch = new Batch(circularBuffer, 1); - enumerator = batch.GetEnumerator(); - ValidateEnumerator(enumerator, value); - } - - [Fact] - public void CheckMultipleEnumerator() - { - var value = "a"; - var circularBuffer = new CircularBuffer(10); - circularBuffer.Add(value); - circularBuffer.Add(value); - circularBuffer.Add(value); - var batch = new Batch(circularBuffer, 10); - - int itemsProcessed = 0; - foreach (var item in batch) - { - itemsProcessed++; - } - - Assert.Equal(3, itemsProcessed); - - itemsProcessed = 0; - foreach (var item in batch) - { - itemsProcessed++; - } - - Assert.Equal(0, itemsProcessed); - } - - [Fact] - public void CheckEnumeratorResetException() - { - var value = "a"; - var batch = new Batch(value); - var enumerator = batch.GetEnumerator(); - Assert.Throws(() => enumerator.Reset()); - } - - [Fact] - public void DrainIntoNewBatchTest() - { - var circularBuffer = new CircularBuffer(100); - circularBuffer.Add("a"); - circularBuffer.Add("b"); - - Batch batch = new Batch(circularBuffer, 10); - - Assert.Equal(2, batch.Count); - - string[] storage = new string[10]; - int selectedItemCount = 0; - foreach (string item in batch) - { - if (item == "b") - { - storage[selectedItemCount++] = item; - } - } - - batch = new Batch(storage, selectedItemCount); - - Assert.Equal(1, batch.Count); - - ValidateEnumerator(batch.GetEnumerator(), "b"); - } - - private static void ValidateEnumerator(Batch.Enumerator enumerator, string expected) - { - if (enumerator.Current != null) - { - Assert.Equal(expected, enumerator.Current); - } - - if (enumerator.MoveNext()) - { - Assert.Equal(expected, enumerator.Current); - } - } -}