From efbca73292c5a687b9d6cea7099c5f8bb1743dd8 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Mon, 1 Jul 2024 13:56:07 -0700 Subject: [PATCH 1/4] Add WithProcessor API on batch. --- src/OpenTelemetry/Batch.cs | 80 ++++++++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 12 deletions(-) diff --git a/src/OpenTelemetry/Batch.cs b/src/OpenTelemetry/Batch.cs index b4ddd08cf1..6297f67ef3 100644 --- a/src/OpenTelemetry/Batch.cs +++ b/src/OpenTelemetry/Batch.cs @@ -16,10 +16,11 @@ namespace OpenTelemetry; public readonly struct Batch : IDisposable where T : class { - private readonly T? item = null; - private readonly CircularBuffer? circularBuffer = null; - private readonly T[]? items = null; + private readonly T? item; + private readonly CircularBuffer? circularBuffer; + private readonly T[]? items; private readonly long targetCount; + private readonly Predicate? itemProcessor; /// /// Initializes a new instance of the struct. @@ -53,6 +54,15 @@ internal Batch(CircularBuffer circularBuffer, int maxSize) this.targetCount = circularBuffer.RemovedCount + this.Count; } + private Batch(T? item, CircularBuffer? circularBuffer, long count, long targetCount, Predicate itemProcessor) + { + this.item = item; + this.circularBuffer = circularBuffer; + this.Count = count; + this.targetCount = targetCount; + this.itemProcessor = itemProcessor; + } + private delegate bool BatchEnumeratorMoveNextFunc(ref Enumerator enumerator); /// @@ -60,6 +70,25 @@ internal Batch(CircularBuffer circularBuffer, int maxSize) /// public long Count { get; } + public Batch WithProcessor( + Predicate itemProcessor) + { + Guard.ThrowIfNull(itemProcessor); + + var currentProcessor = this.itemProcessor; + if (currentProcessor != null) + { + itemProcessor = i => currentProcessor(i) && itemProcessor(i); + } + + return new Batch( + this.item, + this.circularBuffer, + this.Count, + this.targetCount, + itemProcessor); + } + /// public void Dispose() { @@ -88,11 +117,11 @@ public void Dispose() public Enumerator GetEnumerator() { return this.circularBuffer != null - ? new Enumerator(this.circularBuffer, this.targetCount) + ? new Enumerator(this.circularBuffer, this.targetCount, this.itemProcessor) : this.item != null - ? new Enumerator(this.item) + ? new Enumerator(this.item, this.itemProcessor) /* In the event someone uses default/new Batch() to create Batch we fallback to empty items mode. */ - : new Enumerator(this.items ?? Array.Empty(), this.targetCount); + : new Enumerator(this.items ?? Array.Empty(), this.targetCount, this.itemProcessor); } /// @@ -179,34 +208,38 @@ public struct Enumerator : IEnumerator [AllowNull] private T current; - internal Enumerator(T item) + internal Enumerator(T item, Predicate? itemProcessor) { this.current = item; this.circularBuffer = null; this.items = null; this.targetCount = -1; this.itemIndex = 0; - this.moveNextFunc = MoveNextSingleItem; + + this.moveNextFunc = BindMoveNextDelegate(MoveNextSingleItem, itemProcessor); } - internal Enumerator(CircularBuffer circularBuffer, long targetCount) + internal Enumerator(CircularBuffer circularBuffer, long targetCount, Predicate? itemProcessor) { this.current = null; this.items = null; this.circularBuffer = circularBuffer; this.targetCount = targetCount; this.itemIndex = 0; - this.moveNextFunc = typeof(T) == typeof(LogRecord) ? MoveNextCircularBufferLogRecord : MoveNextCircularBuffer; + + this.moveNextFunc = BindMoveNextDelegate( + typeof(T) == typeof(LogRecord) ? MoveNextCircularBufferLogRecord : MoveNextCircularBuffer, + itemProcessor); } - internal Enumerator(T[] items, long targetCount) + internal Enumerator(T[] items, long targetCount, Predicate? itemProcessor) { this.current = null; this.circularBuffer = null; this.items = items; this.targetCount = targetCount; this.itemIndex = 0; - this.moveNextFunc = MoveNextArray; + this.moveNextFunc = BindMoveNextDelegate(MoveNextArray, itemProcessor); } /// @@ -243,5 +276,28 @@ public bool MoveNext() /// public readonly void Reset() => throw new NotSupportedException(); + + private static BatchEnumeratorMoveNextFunc BindMoveNextDelegate(BatchEnumeratorMoveNextFunc moveNextFunc, Predicate? itemProcessor) + { + if (itemProcessor != null) + { + return (ref Enumerator enumerator) => + { + while (moveNextFunc(ref enumerator)) + { + if (itemProcessor(enumerator.current)) + { + return true; + } + } + + return false; + }; + } + else + { + return moveNextFunc; + } + } } } From f61e273b38b2df0011e28aa02c736daac6b481b7 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Fri, 5 Jul 2024 13:36:06 -0700 Subject: [PATCH 2/4] Design tweaks. --- .../.publicApi/Stable/PublicAPI.Unshipped.txt | 3 + src/OpenTelemetry/Batch.cs | 255 ++++++++++++------ .../Internal/OpenTelemetrySdkEventSource.cs | 15 ++ .../Logs/Pool/LogRecordPoolHelper.cs | 4 +- 4 files changed, 192 insertions(+), 85 deletions(-) diff --git a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt index e69de29bb2..470d59e293 100644 --- a/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt @@ -0,0 +1,3 @@ +OpenTelemetry.Batch.Transform(OpenTelemetry.BatchTransformationPredicate! transformation, ref TState state) -> void +OpenTelemetry.BatchTransformationPredicate +virtual OpenTelemetry.BatchTransformationPredicate.Invoke(TItem! item, ref TState state) -> bool diff --git a/src/OpenTelemetry/Batch.cs b/src/OpenTelemetry/Batch.cs index 6297f67ef3..129503dd68 100644 --- a/src/OpenTelemetry/Batch.cs +++ b/src/OpenTelemetry/Batch.cs @@ -1,14 +1,29 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +using System.Buffers; using System.Collections; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; using OpenTelemetry.Internal; using OpenTelemetry.Logs; namespace OpenTelemetry; +/// +/// Represents a callback action for transforming and filtering items contained +/// in a . +/// +/// Item type. +/// State type. +/// Item being transformed. +/// The state supplied for the transformation. +/// Return to indicate the item should be +/// removed from the . +public delegate bool BatchTransformationPredicate(TItem item, ref TState state) + where TItem : class; + /// /// Stores a batch of completed objects to be exported. /// @@ -19,8 +34,8 @@ namespace OpenTelemetry; private readonly T? item; private readonly CircularBuffer? circularBuffer; private readonly T[]? items; + private readonly bool rented; private readonly long targetCount; - private readonly Predicate? itemProcessor; /// /// Initializes a new instance of the struct. @@ -28,11 +43,17 @@ namespace OpenTelemetry; /// The items to store in the batch. /// The number of items in the batch. public Batch(T[] items, int count) + : this(items, count, rented: false) + { + } + + internal Batch(T[] items, int count, bool rented) { Guard.ThrowIfNull(items); Guard.ThrowIfOutOfRange(count, min: 0, max: items.Length); this.items = items; + this.rented = rented; this.Count = this.targetCount = count; } @@ -54,15 +75,6 @@ internal Batch(CircularBuffer circularBuffer, int maxSize) this.targetCount = circularBuffer.RemovedCount + this.Count; } - private Batch(T? item, CircularBuffer? circularBuffer, long count, long targetCount, Predicate itemProcessor) - { - this.item = item; - this.circularBuffer = circularBuffer; - this.Count = count; - this.targetCount = targetCount; - this.itemProcessor = itemProcessor; - } - private delegate bool BatchEnumeratorMoveNextFunc(ref Enumerator enumerator); /// @@ -70,23 +82,88 @@ private Batch(T? item, CircularBuffer? circularBuffer, long count, long targe /// public long Count { get; } - public Batch WithProcessor( - Predicate itemProcessor) + /// + /// Transforms and filters the items of a using the + /// supplied and state. + /// + /// State type. + /// Transformation function. Return to remove an item from the . + /// State to be passed into . + public void Transform(BatchTransformationPredicate transformation, ref TState state) { - Guard.ThrowIfNull(itemProcessor); + Guard.ThrowIfNull(transformation); + + if (this.Count <= 0) + { + return; + } - var currentProcessor = this.itemProcessor; - if (currentProcessor != null) + if (this.item != null) { - itemProcessor = i => currentProcessor(i) && itemProcessor(i); + Debug.Assert( + typeof(T) != typeof(LogRecord) + || ((LogRecord)(object)this.item).Source != LogRecord.LogRecordSource.FromSharedPool, + "Batch contained a single item rented from the shared pool"); + + // Special case for a batch of a single item + + if (!TransformItem(transformation, ref state, this.item)) + { + Unsafe.AsRef(in this) = new Batch(Array.Empty(), 0, rented: false); + } + + return; } - return new Batch( - this.item, - this.circularBuffer, - this.Count, - this.targetCount, - itemProcessor); + var rentedArray = ArrayPool.Shared.Rent((int)this.Count); + + var i = 0; + + if (typeof(T) == typeof(LogRecord)) + { + foreach (var item in this) + { + if (TransformItem(transformation, ref state, item)) + { + var logRecord = (LogRecord)(object)item; + if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) + { + logRecord.AddReference(); + } + + rentedArray[i++] = item; + } + } + } + else + { + foreach (var item in this) + { + if (TransformItem(transformation, ref state, item)) + { + rentedArray[i++] = item; + } + } + } + + this.Dispose(); + + Unsafe.AsRef(in this) = new Batch(rentedArray, i, rented: true); + + static bool TransformItem(BatchTransformationPredicate transformation, ref TState state, T item) + { + try + { + return transformation(item, ref state); + } + catch (Exception ex) + { + OpenTelemetrySdkEventSource.Log.BatchTransformationException(ex); + return true; + } + } } /// @@ -94,20 +171,29 @@ public void Dispose() { if (this.circularBuffer != null) { - // Drain anything left in the batch. + // Note: Drain anything left in the batch and return to the pool if + // needed. while (this.circularBuffer.RemovedCount < this.targetCount) { T item = this.circularBuffer.Read(); if (typeof(T) == typeof(LogRecord)) { - var logRecord = (LogRecord)(object)item; - if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) - { - LogRecordSharedPool.Current.Return(logRecord); - } + Enumerator.TryReturnLogRecordToPool(item); } } } + else if (this.items != null && this.rented) + { + // Note: We don't attempt to return individual LogRecords to the + // pool. If the batch wasn't drained fully some records may get + // garbage collected but the pool will recreate more as needed. The + // idea is most batches are expected to be drained during export so + // it isn't worth the effort to track what was/was not returned. + + ArrayPool.Shared.Return(this.items); + + Unsafe.AsRef(in this) = new Batch(Array.Empty(), 0); + } } /// @@ -117,11 +203,11 @@ public void Dispose() public Enumerator GetEnumerator() { return this.circularBuffer != null - ? new Enumerator(this.circularBuffer, this.targetCount, this.itemProcessor) + ? new Enumerator(this.circularBuffer, this.targetCount) : this.item != null - ? new Enumerator(this.item, this.itemProcessor) + ? new Enumerator(this.item) /* In the event someone uses default/new Batch() to create Batch we fallback to empty items mode. */ - : new Enumerator(this.items ?? Array.Empty(), this.targetCount, this.itemProcessor); + : new Enumerator(this.items ?? Array.Empty(), this.targetCount); } /// @@ -143,6 +229,8 @@ public struct Enumerator : IEnumerator private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBuffer = (ref Enumerator enumerator) => { + Debug.Assert(typeof(T) != typeof(LogRecord), "T was an unexpected type"); + var circularBuffer = enumerator.circularBuffer; if (circularBuffer!.RemovedCount < enumerator.targetCount) @@ -157,37 +245,45 @@ public struct Enumerator : IEnumerator private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBufferLogRecord = (ref Enumerator enumerator) => { - // Note: This type check here is to give the JIT a hint it can - // remove all of this code when T != LogRecord - if (typeof(T) == typeof(LogRecord)) + Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type"); + + TryReturnCurrentLogRecordToPool(ref enumerator); + + var circularBuffer = enumerator.circularBuffer; + + if (circularBuffer!.RemovedCount < enumerator.targetCount) { - var circularBuffer = enumerator.circularBuffer; + enumerator.current = circularBuffer.Read(); + return true; + } - var currentItem = enumerator.Current; + enumerator.current = null; - if (currentItem != null) - { - var logRecord = (LogRecord)(object)currentItem; - if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) - { - LogRecordSharedPool.Current.Return(logRecord); - } - } + return false; + }; - if (circularBuffer!.RemovedCount < enumerator.targetCount) - { - enumerator.current = circularBuffer.Read(); - return true; - } + private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) => + { + Debug.Assert(typeof(T) != typeof(LogRecord), "T was an unexpected type"); - enumerator.current = null; + var items = enumerator.items; + + if (enumerator.itemIndex < enumerator.targetCount) + { + enumerator.current = items![enumerator.itemIndex++]; + return true; } + enumerator.current = null; return false; }; - private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) => + private static readonly BatchEnumeratorMoveNextFunc MoveNextArrayLogRecord = (ref Enumerator enumerator) => { + Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type"); + + TryReturnCurrentLogRecordToPool(ref enumerator); + var items = enumerator.items; if (enumerator.itemIndex < enumerator.targetCount) @@ -197,6 +293,7 @@ public struct Enumerator : IEnumerator } enumerator.current = null; + return false; }; @@ -208,38 +305,34 @@ public struct Enumerator : IEnumerator [AllowNull] private T current; - internal Enumerator(T item, Predicate? itemProcessor) + internal Enumerator(T item) { this.current = item; this.circularBuffer = null; this.items = null; this.targetCount = -1; this.itemIndex = 0; - - this.moveNextFunc = BindMoveNextDelegate(MoveNextSingleItem, itemProcessor); + this.moveNextFunc = MoveNextSingleItem; } - internal Enumerator(CircularBuffer circularBuffer, long targetCount, Predicate? itemProcessor) + internal Enumerator(CircularBuffer circularBuffer, long targetCount) { this.current = null; this.items = null; this.circularBuffer = circularBuffer; this.targetCount = targetCount; this.itemIndex = 0; - - this.moveNextFunc = BindMoveNextDelegate( - typeof(T) == typeof(LogRecord) ? MoveNextCircularBufferLogRecord : MoveNextCircularBuffer, - itemProcessor); + this.moveNextFunc = typeof(T) == typeof(LogRecord) ? MoveNextCircularBufferLogRecord : MoveNextCircularBuffer; } - internal Enumerator(T[] items, long targetCount, Predicate? itemProcessor) + internal Enumerator(T[] items, long targetCount) { this.current = null; this.circularBuffer = null; this.items = items; this.targetCount = targetCount; this.itemIndex = 0; - this.moveNextFunc = BindMoveNextDelegate(MoveNextArray, itemProcessor); + this.moveNextFunc = typeof(T) == typeof(LogRecord) ? MoveNextArrayLogRecord : MoveNextArray; } /// @@ -256,12 +349,7 @@ public void Dispose() var currentItem = this.current; if (currentItem != null) { - var logRecord = (LogRecord)(object)currentItem; - if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) - { - LogRecordSharedPool.Current.Return(logRecord); - } - + TryReturnLogRecordToPool(currentItem); this.current = null; } } @@ -277,26 +365,27 @@ public bool MoveNext() public readonly void Reset() => throw new NotSupportedException(); - private static BatchEnumeratorMoveNextFunc BindMoveNextDelegate(BatchEnumeratorMoveNextFunc moveNextFunc, Predicate? itemProcessor) + internal static void TryReturnLogRecordToPool(T currentItem) { - if (itemProcessor != null) - { - return (ref Enumerator enumerator) => - { - while (moveNextFunc(ref enumerator)) - { - if (itemProcessor(enumerator.current)) - { - return true; - } - } + Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type"); + Debug.Assert(currentItem != null, "currentItem was null"); - return false; - }; + var logRecord = (LogRecord)(object)currentItem!; + if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool) + { + LogRecordSharedPool.Current.Return(logRecord); } - else + } + + private static void TryReturnCurrentLogRecordToPool(ref Enumerator enumerator) + { + Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type"); + + var currentItem = enumerator.Current; + + if (currentItem != null) { - return moveNextFunc; + TryReturnLogRecordToPool(currentItem); } } } diff --git a/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs b/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs index 88312e3a14..dcc5feb2f7 100644 --- a/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs +++ b/src/OpenTelemetry/Internal/OpenTelemetrySdkEventSource.cs @@ -167,6 +167,15 @@ public void MetricViewException(string source, Exception ex) } } + [NonEvent] + public void BatchTransformationException(Exception ex) + { + if (this.IsEnabled(EventLevel.Error, EventKeywords.All)) + { + this.BatchTransformationException(typeof(T).FullName!, ex.ToInvariantString()); + } + } + [Event(4, Message = "Unknown error in SpanProcessor event '{0}': '{1}'.", Level = EventLevel.Error)] public void SpanProcessorException(string evnt, string ex) { @@ -374,6 +383,12 @@ public void MetricViewException(string source, string ex) this.WriteEvent(56, source, ex); } + [Event(57, Message = "Exception thrown by user code supplied as batch of type '{0}' transformation function: '{1}'.", Level = EventLevel.Error)] + public void BatchTransformationException(string type, string ex) + { + this.WriteEvent(57, type, ex); + } + void IConfigurationExtensionsLogger.LogInvalidConfigurationValue(string key, string value) { this.InvalidConfigurationValue(key, value); diff --git a/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs b/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs index c298bef430..d7a1cd723b 100644 --- a/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs +++ b/src/OpenTelemetry/Logs/Pool/LogRecordPoolHelper.cs @@ -15,7 +15,7 @@ public static void Clear(LogRecord logRecord) { if (attributeStorage.Count > DefaultMaxNumberOfAttributes) { - // Don't allow the pool to grow unconstained. + // Don't allow the pool to grow unconstrained. logRecord.AttributeStorage = null; } else @@ -31,7 +31,7 @@ underlying array (capacity). */ { if (scopeStorage.Count > DefaultMaxNumberOfScopes) { - // Don't allow the pool to grow unconstained. + // Don't allow the pool to grow unconstrained. logRecord.ScopeStorage = null; } else From 1ceed1f3a71894b29e929e972d6a604a7294563c Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Wed, 17 Jul 2024 14:08:04 -0700 Subject: [PATCH 3/4] Add tests. --- src/OpenTelemetry/Batch.cs | 42 +-- test/OpenTelemetry.Tests/BatchTest.cs | 383 ++++++++++++++++++++ test/OpenTelemetry.Tests/Trace/BatchTest.cs | 161 -------- 3 files changed, 404 insertions(+), 182 deletions(-) create mode 100644 test/OpenTelemetry.Tests/BatchTest.cs delete mode 100644 test/OpenTelemetry.Tests/Trace/BatchTest.cs diff --git a/src/OpenTelemetry/Batch.cs b/src/OpenTelemetry/Batch.cs index 129503dd68..d0cd04be9f 100644 --- a/src/OpenTelemetry/Batch.cs +++ b/src/OpenTelemetry/Batch.cs @@ -31,10 +31,10 @@ public delegate bool BatchTransformationPredicate(TItem item, ref public readonly struct Batch : IDisposable where T : class { - private readonly T? item; - private readonly CircularBuffer? circularBuffer; - private readonly T[]? items; - private readonly bool rented; + internal readonly T? Item; + internal readonly CircularBuffer? CircularBuffer; + internal readonly T[]? Items; + internal readonly bool Rented; private readonly long targetCount; /// @@ -52,8 +52,8 @@ internal Batch(T[] items, int count, bool rented) Guard.ThrowIfNull(items); Guard.ThrowIfOutOfRange(count, min: 0, max: items.Length); - this.items = items; - this.rented = rented; + this.Items = items; + this.Rented = rented; this.Count = this.targetCount = count; } @@ -61,7 +61,7 @@ internal Batch(T item) { Debug.Assert(item != null, $"{nameof(item)} was null."); - this.item = item; + this.Item = item; this.Count = this.targetCount = 1; } @@ -70,7 +70,7 @@ internal Batch(CircularBuffer circularBuffer, int maxSize) Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number."); Debug.Assert(circularBuffer != null, $"{nameof(circularBuffer)} was null."); - this.circularBuffer = circularBuffer; + this.CircularBuffer = circularBuffer; this.Count = Math.Min(maxSize, circularBuffer!.Count); this.targetCount = circularBuffer.RemovedCount + this.Count; } @@ -100,16 +100,16 @@ public void Transform(BatchTransformationPredicate transforma return; } - if (this.item != null) + if (this.Item != null) { Debug.Assert( typeof(T) != typeof(LogRecord) - || ((LogRecord)(object)this.item).Source != LogRecord.LogRecordSource.FromSharedPool, + || ((LogRecord)(object)this.Item).Source != LogRecord.LogRecordSource.FromSharedPool, "Batch contained a single item rented from the shared pool"); // Special case for a batch of a single item - if (!TransformItem(transformation, ref state, this.item)) + if (!TransformItem(transformation, ref state, this.Item)) { Unsafe.AsRef(in this) = new Batch(Array.Empty(), 0, rented: false); } @@ -169,20 +169,20 @@ static bool TransformItem(BatchTransformationPredicate transformation /// public void Dispose() { - if (this.circularBuffer != null) + if (this.CircularBuffer != null) { // Note: Drain anything left in the batch and return to the pool if // needed. - while (this.circularBuffer.RemovedCount < this.targetCount) + while (this.CircularBuffer.RemovedCount < this.targetCount) { - T item = this.circularBuffer.Read(); + T item = this.CircularBuffer.Read(); if (typeof(T) == typeof(LogRecord)) { Enumerator.TryReturnLogRecordToPool(item); } } } - else if (this.items != null && this.rented) + else if (this.Items != null && this.Rented) { // Note: We don't attempt to return individual LogRecords to the // pool. If the batch wasn't drained fully some records may get @@ -190,7 +190,7 @@ public void Dispose() // idea is most batches are expected to be drained during export so // it isn't worth the effort to track what was/was not returned. - ArrayPool.Shared.Return(this.items); + ArrayPool.Shared.Return(this.Items); Unsafe.AsRef(in this) = new Batch(Array.Empty(), 0); } @@ -202,12 +202,12 @@ public void Dispose() /// . public Enumerator GetEnumerator() { - return this.circularBuffer != null - ? new Enumerator(this.circularBuffer, this.targetCount) - : this.item != null - ? new Enumerator(this.item) + return this.CircularBuffer != null + ? new Enumerator(this.CircularBuffer, this.targetCount) + : this.Item != null + ? new Enumerator(this.Item) /* In the event someone uses default/new Batch() to create Batch we fallback to empty items mode. */ - : new Enumerator(this.items ?? Array.Empty(), this.targetCount); + : new Enumerator(this.Items ?? Array.Empty(), this.targetCount); } /// diff --git a/test/OpenTelemetry.Tests/BatchTest.cs b/test/OpenTelemetry.Tests/BatchTest.cs new file mode 100644 index 0000000000..4f8e98ecc5 --- /dev/null +++ b/test/OpenTelemetry.Tests/BatchTest.cs @@ -0,0 +1,383 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#nullable enable + +using OpenTelemetry.Internal; +using OpenTelemetry.Logs; +using Xunit; + +namespace OpenTelemetry.Tests; + +public class BatchTest +{ + [Fact] + public void CheckConstructorExceptions() + { + Assert.Throws(() => new Batch((string[])null!, 0)); + Assert.Throws(() => new Batch(Array.Empty(), -1)); + Assert.Throws(() => new Batch(Array.Empty(), 1)); + } + + [Fact] + public void CheckValidConstructors() + { + var value = "a"; + var batch = new Batch(value); + foreach (var item in batch) + { + Assert.Equal(value, item); + } + + var circularBuffer = new CircularBuffer(1); + circularBuffer.Add(value); + batch = new Batch(circularBuffer, 1); + foreach (var item in batch) + { + Assert.Equal(value, item); + } + } + + [Fact] + public void CheckDispose() + { + var value = "a"; + var batch = new Batch(value); + batch.Dispose(); // A test to make sure it doesn't bomb on a null CircularBuffer. + + var circularBuffer = new CircularBuffer(10); + circularBuffer.Add(value); + circularBuffer.Add(value); + circularBuffer.Add(value); + batch = new Batch(circularBuffer, 10); // Max size = 10 + batch.GetEnumerator().MoveNext(); + Assert.Equal(3, circularBuffer.AddedCount); + Assert.Equal(1, circularBuffer.RemovedCount); + batch.Dispose(); // Test anything remaining in the batch is drained when disposed. + Assert.Equal(3, circularBuffer.AddedCount); + Assert.Equal(3, circularBuffer.RemovedCount); + batch.Dispose(); // Verify we don't go into an infinite loop or thrown when empty. + + circularBuffer = new CircularBuffer(10); + circularBuffer.Add(value); + circularBuffer.Add(value); + circularBuffer.Add(value); + batch = new Batch(circularBuffer, 2); // Max size = 2 + Assert.Equal(3, circularBuffer.AddedCount); + Assert.Equal(0, circularBuffer.RemovedCount); + batch.Dispose(); // Test the batch is drained up to max size. + Assert.Equal(3, circularBuffer.AddedCount); + Assert.Equal(2, circularBuffer.RemovedCount); + } + + [Fact] + public void CheckEnumerator() + { + var value = "a"; + var batch = new Batch(value); + var enumerator = batch.GetEnumerator(); + ValidateEnumerator(enumerator, value); + + var circularBuffer = new CircularBuffer(1); + circularBuffer.Add(value); + batch = new Batch(circularBuffer, 1); + enumerator = batch.GetEnumerator(); + ValidateEnumerator(enumerator, value); + } + + [Fact] + public void CheckMultipleEnumerator() + { + var value = "a"; + var circularBuffer = new CircularBuffer(10); + circularBuffer.Add(value); + circularBuffer.Add(value); + circularBuffer.Add(value); + var batch = new Batch(circularBuffer, 10); + + int itemsProcessed = 0; + foreach (var item in batch) + { + itemsProcessed++; + } + + Assert.Equal(3, itemsProcessed); + + itemsProcessed = 0; + foreach (var item in batch) + { + itemsProcessed++; + } + + Assert.Equal(0, itemsProcessed); + } + + [Fact] + public void CheckEnumeratorResetException() + { + var value = "a"; + var batch = new Batch(value); + var enumerator = batch.GetEnumerator(); + Assert.Throws(() => enumerator.Reset()); + } + + [Fact] + public void DrainIntoNewBatchTest() + { + var circularBuffer = new CircularBuffer(100); + circularBuffer.Add("a"); + circularBuffer.Add("b"); + + Batch batch = new Batch(circularBuffer, 10); + + Assert.Equal(2, batch.Count); + + string[] storage = new string[10]; + int selectedItemCount = 0; + foreach (string item in batch) + { + if (item == "b") + { + storage[selectedItemCount++] = item; + } + } + + batch = new Batch(storage, selectedItemCount); + + Assert.Equal(1, batch.Count); + + ValidateEnumerator(batch.GetEnumerator(), "b"); + } + + [Fact] + public void TransformBatchUsingCircularBuffer() + { + var circularBuffer = new CircularBuffer(100); + circularBuffer.Add("a"); + circularBuffer.Add("b"); + + Batch batch = new Batch(circularBuffer, 10); + + Assert.Equal(2, batch.Count); + Assert.NotNull(batch.CircularBuffer); + Assert.Null(batch.Items); + Assert.False(batch.Rented); + + object? state = null; + + batch.Transform( + static (string item, ref object? state) => + { + return item == "a"; + }, + ref state); + + Assert.Equal(0, circularBuffer.Count); + Assert.Equal(1, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.True(batch.Rented); + + var previousRentedArray = batch.Items; + + batch.Transform( + static (string item, ref object? state) => + { + return false; + }, + ref state); + + Assert.Equal(0, batch.Count); + Assert.NotNull(batch.Items); + Assert.True(batch.Rented); + + Assert.NotEqual(previousRentedArray, batch.Items); + + previousRentedArray = batch.Items; + + batch.Dispose(); + + Assert.NotEqual(previousRentedArray, batch.Items); + Assert.False(batch.Rented); + } + + [Fact] + public void TransformBatchUsingCircularBufferOfLogRecords() + { + var pool = LogRecordSharedPool.Current; + + var circularBuffer = new CircularBuffer(100); + + var record = pool.Rent(); + record.CategoryName = "Category1"; + circularBuffer.Add(record); + + record = pool.Rent(); + record.CategoryName = "Category2"; + circularBuffer.Add(record); + + record = pool.Rent(); + record.CategoryName = "Category3"; + circularBuffer.Add(record); + + Batch batch = new Batch(circularBuffer, 10); + + Assert.Equal(3, batch.Count); + Assert.NotNull(batch.CircularBuffer); + Assert.Null(batch.Items); + Assert.False(batch.Rented); + + object? state = null; + + batch.Transform( + static (LogRecord item, ref object? state) => + { + return item.CategoryName != "Category3"; + }, + ref state); + + Assert.Equal(0, circularBuffer.Count); + Assert.Equal(2, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.True(batch.Rented); + + Assert.Equal(1, pool.Count); + + var previousRentedArray = batch.Items; + + batch.Transform( + static (LogRecord item, ref object? state) => + { + return item.CategoryName != "Category2"; + }, + ref state); + + Assert.Equal(1, batch.Count); + Assert.NotNull(batch.Items); + Assert.True(batch.Rented); + + Assert.NotEqual(previousRentedArray, batch.Items); + + previousRentedArray = batch.Items; + + var enumerator = batch.GetEnumerator(); + Assert.True(enumerator.MoveNext()); + enumerator.Dispose(); + + Assert.Equal(3, pool.Count); + + batch.Dispose(); + + Assert.NotEqual(previousRentedArray, batch.Items); + Assert.False(batch.Rented); + } + + [Fact] + public void TransformBatchEmpty() + { + var source = Array.Empty(); + + Batch batch = new Batch(source, 0); + + Assert.Equal(0, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.False(batch.Rented); + + object? state = null; + bool transformExecuted = false; + + batch.Transform( + (string item, ref object? state) => + { + transformExecuted = true; + return true; + }, + ref state); + + Assert.False(transformExecuted); + + Assert.Equal(0, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.False(batch.Rented); + + Assert.Equal(source, batch.Items); + } + + [Fact] + public void TransformBatchSingleItem() + { + Batch batch = new Batch("Item"); + + Assert.Equal(1, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.Null(batch.Items); + Assert.NotNull(batch.Item); + Assert.False(batch.Rented); + + object? state = null; + + batch.Transform( + (string item, ref object? state) => + { + return true; + }, + ref state); + + Assert.Equal(1, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.Null(batch.Items); + Assert.NotNull(batch.Item); + Assert.False(batch.Rented); + + batch.Transform( + static (string item, ref object? state) => + { + return false; + }, + ref state); + + Assert.Equal(0, batch.Count); + Assert.Null(batch.CircularBuffer); + Assert.NotNull(batch.Items); + Assert.Null(batch.Item); + Assert.False(batch.Rented); + } + + [Fact] + public void TransformBatchWithExceptionThrownInDelegate() + { + Batch batch = new Batch("Item"); + + Assert.Equal(1, batch.Count); + + object? state = null; + bool transformExecuted = false; + + batch.Transform( + (string item, ref object? state) => + { + transformExecuted = true; + throw new InvalidOperationException(); + }, + ref state); + + Assert.True(transformExecuted); + Assert.Equal(1, batch.Count); + } + + private static void ValidateEnumerator(Batch.Enumerator enumerator, string expected) + { + if (enumerator.Current != null) + { + Assert.Equal(expected, enumerator.Current); + } + + if (enumerator.MoveNext()) + { + Assert.Equal(expected, enumerator.Current); + } + } +} diff --git a/test/OpenTelemetry.Tests/Trace/BatchTest.cs b/test/OpenTelemetry.Tests/Trace/BatchTest.cs deleted file mode 100644 index b1d5b170d5..0000000000 --- a/test/OpenTelemetry.Tests/Trace/BatchTest.cs +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -using OpenTelemetry.Internal; -using Xunit; - -namespace OpenTelemetry.Trace.Tests; - -public class BatchTest -{ - [Fact] - public void CheckConstructorExceptions() - { - Assert.Throws(() => new Batch((string[])null, 0)); - Assert.Throws(() => new Batch(Array.Empty(), -1)); - Assert.Throws(() => new Batch(Array.Empty(), 1)); - } - - [Fact] - public void CheckValidConstructors() - { - var value = "a"; - var batch = new Batch(value); - foreach (var item in batch) - { - Assert.Equal(value, item); - } - - var circularBuffer = new CircularBuffer(1); - circularBuffer.Add(value); - batch = new Batch(circularBuffer, 1); - foreach (var item in batch) - { - Assert.Equal(value, item); - } - } - - [Fact] - public void CheckDispose() - { - var value = "a"; - var batch = new Batch(value); - batch.Dispose(); // A test to make sure it doesn't bomb on a null CircularBuffer. - - var circularBuffer = new CircularBuffer(10); - circularBuffer.Add(value); - circularBuffer.Add(value); - circularBuffer.Add(value); - batch = new Batch(circularBuffer, 10); // Max size = 10 - batch.GetEnumerator().MoveNext(); - Assert.Equal(3, circularBuffer.AddedCount); - Assert.Equal(1, circularBuffer.RemovedCount); - batch.Dispose(); // Test anything remaining in the batch is drained when disposed. - Assert.Equal(3, circularBuffer.AddedCount); - Assert.Equal(3, circularBuffer.RemovedCount); - batch.Dispose(); // Verify we don't go into an infinite loop or thrown when empty. - - circularBuffer = new CircularBuffer(10); - circularBuffer.Add(value); - circularBuffer.Add(value); - circularBuffer.Add(value); - batch = new Batch(circularBuffer, 2); // Max size = 2 - Assert.Equal(3, circularBuffer.AddedCount); - Assert.Equal(0, circularBuffer.RemovedCount); - batch.Dispose(); // Test the batch is drained up to max size. - Assert.Equal(3, circularBuffer.AddedCount); - Assert.Equal(2, circularBuffer.RemovedCount); - } - - [Fact] - public void CheckEnumerator() - { - var value = "a"; - var batch = new Batch(value); - var enumerator = batch.GetEnumerator(); - ValidateEnumerator(enumerator, value); - - var circularBuffer = new CircularBuffer(1); - circularBuffer.Add(value); - batch = new Batch(circularBuffer, 1); - enumerator = batch.GetEnumerator(); - ValidateEnumerator(enumerator, value); - } - - [Fact] - public void CheckMultipleEnumerator() - { - var value = "a"; - var circularBuffer = new CircularBuffer(10); - circularBuffer.Add(value); - circularBuffer.Add(value); - circularBuffer.Add(value); - var batch = new Batch(circularBuffer, 10); - - int itemsProcessed = 0; - foreach (var item in batch) - { - itemsProcessed++; - } - - Assert.Equal(3, itemsProcessed); - - itemsProcessed = 0; - foreach (var item in batch) - { - itemsProcessed++; - } - - Assert.Equal(0, itemsProcessed); - } - - [Fact] - public void CheckEnumeratorResetException() - { - var value = "a"; - var batch = new Batch(value); - var enumerator = batch.GetEnumerator(); - Assert.Throws(() => enumerator.Reset()); - } - - [Fact] - public void DrainIntoNewBatchTest() - { - var circularBuffer = new CircularBuffer(100); - circularBuffer.Add("a"); - circularBuffer.Add("b"); - - Batch batch = new Batch(circularBuffer, 10); - - Assert.Equal(2, batch.Count); - - string[] storage = new string[10]; - int selectedItemCount = 0; - foreach (string item in batch) - { - if (item == "b") - { - storage[selectedItemCount++] = item; - } - } - - batch = new Batch(storage, selectedItemCount); - - Assert.Equal(1, batch.Count); - - ValidateEnumerator(batch.GetEnumerator(), "b"); - } - - private static void ValidateEnumerator(Batch.Enumerator enumerator, string expected) - { - if (enumerator.Current != null) - { - Assert.Equal(expected, enumerator.Current); - } - - if (enumerator.MoveNext()) - { - Assert.Equal(expected, enumerator.Current); - } - } -} From 5dbdced3413b1a285f4bc766b02620d55a96d69d Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Wed, 17 Jul 2024 15:33:00 -0700 Subject: [PATCH 4/4] Update CHANGELOG. --- src/OpenTelemetry/CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index ca4d195698..0eef208b04 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +* Added `Transform` API on `Batch` which can be used to modify and/or filter + records contained in a batch. + ([#5733](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5733)) + ## 1.9.0 Released 2024-Jun-14