Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sdk-logs] Add Transform API on batch #5733

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/OpenTelemetry/.publicApi/Stable/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
OpenTelemetry.Batch<T>.Transform<TState>(OpenTelemetry.BatchTransformationPredicate<T!, TState>! transformation, ref TState state) -> void
OpenTelemetry.BatchTransformationPredicate<TItem, TState>
virtual OpenTelemetry.BatchTransformationPredicate<TItem, TState>.Invoke(TItem! item, ref TState state) -> bool
237 changes: 191 additions & 46 deletions src/OpenTelemetry/Batch.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Buffers;
using System.Collections;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using OpenTelemetry.Internal;
using OpenTelemetry.Logs;

namespace OpenTelemetry;

/// <summary>
/// Represents a callback action for transforming and filtering items contained
/// in a <see cref="Batch{T}"/>.
/// </summary>
/// <typeparam name="TItem">Item type.</typeparam>
/// <typeparam name="TState">State type.</typeparam>
/// <param name="item">Item being transformed.</param>
/// <param name="state">The state supplied for the transformation.</param>
/// <returns>Return <see langword="false"/> to indicate the item should be
/// removed from the <see cref="Batch{T}"/>.</returns>
public delegate bool BatchTransformationPredicate<TItem, TState>(TItem item, ref TState state)
where TItem : class;

/// <summary>
/// Stores a batch of completed <typeparamref name="T"/> objects to be exported.
/// </summary>
/// <typeparam name="T">The type of object in the <see cref="Batch{T}"/>.</typeparam>
public readonly struct Batch<T> : IDisposable
where T : class
{
private readonly T? item = null;
private readonly CircularBuffer<T>? circularBuffer = null;
private readonly T[]? items = null;
internal readonly T? Item;
internal readonly CircularBuffer<T>? CircularBuffer;
internal readonly T[]? Items;
internal readonly bool Rented;
private readonly long targetCount;

/// <summary>
Expand All @@ -27,19 +43,25 @@ namespace OpenTelemetry;
/// <param name="items">The items to store in the batch.</param>
/// <param name="count">The number of items in the batch.</param>
public Batch(T[] items, int count)
: this(items, count, rented: false)
{
}

internal Batch(T[] items, int count, bool rented)
{
Guard.ThrowIfNull(items);
Guard.ThrowIfOutOfRange(count, min: 0, max: items.Length);

this.items = items;
this.Items = items;
this.Rented = rented;
this.Count = this.targetCount = count;
}

internal Batch(T item)
{
Debug.Assert(item != null, $"{nameof(item)} was null.");

this.item = item;
this.Item = item;
this.Count = this.targetCount = 1;
}

Expand All @@ -48,7 +70,7 @@ internal Batch(CircularBuffer<T> circularBuffer, int maxSize)
Debug.Assert(maxSize > 0, $"{nameof(maxSize)} should be a positive number.");
Debug.Assert(circularBuffer != null, $"{nameof(circularBuffer)} was null.");

this.circularBuffer = circularBuffer;
this.CircularBuffer = circularBuffer;
this.Count = Math.Min(maxSize, circularBuffer!.Count);
this.targetCount = circularBuffer.RemovedCount + this.Count;
}
Expand All @@ -60,25 +82,118 @@ internal Batch(CircularBuffer<T> circularBuffer, int maxSize)
/// </summary>
public long Count { get; }

/// <inheritdoc/>
public void Dispose()
/// <summary>
/// Transforms and filters the items of a <see cref="Batch{T}"/> using the
/// supplied <see cref="BatchTransformationPredicate{TItem,TState}"/> and state.
/// </summary>
/// <typeparam name="TState">State type.</typeparam>
/// <param name="transformation">Transformation function. Return <see
/// langword="false"/> to remove an item from the <see
/// cref="Batch{T}"/>.</param>
/// <param name="state">State to be passed into <paramref name="transformation"/>.</param>
public void Transform<TState>(BatchTransformationPredicate<T, TState> transformation, ref TState state)
{
if (this.circularBuffer != null)
Guard.ThrowIfNull(transformation);

if (this.Count <= 0)
{
return;
}

if (this.Item != null)
{
// Drain anything left in the batch.
while (this.circularBuffer.RemovedCount < this.targetCount)
Debug.Assert(
typeof(T) != typeof(LogRecord)
|| ((LogRecord)(object)this.Item).Source != LogRecord.LogRecordSource.FromSharedPool,
"Batch contained a single item rented from the shared pool");

// Special case for a batch of a single item

if (!TransformItem(transformation, ref state, this.Item))
{
T item = this.circularBuffer.Read();
if (typeof(T) == typeof(LogRecord))
Unsafe.AsRef(in this) = new Batch<T>(Array.Empty<T>(), 0, rented: false);
}

return;
}

var rentedArray = ArrayPool<T>.Shared.Rent((int)this.Count);

var i = 0;

if (typeof(T) == typeof(LogRecord))
{
foreach (var item in this)
{
if (TransformItem(transformation, ref state, item))
{
var logRecord = (LogRecord)(object)item;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
logRecord.AddReference();
}

rentedArray[i++] = item;
}
}
}
else
{
foreach (var item in this)
{
if (TransformItem(transformation, ref state, item))
{
rentedArray[i++] = item;
}
}
}

this.Dispose();

Unsafe.AsRef(in this) = new Batch<T>(rentedArray, i, rented: true);

static bool TransformItem(BatchTransformationPredicate<T, TState> transformation, ref TState state, T item)
{
try
{
return transformation(item, ref state);
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.BatchTransformationException<T>(ex);
return true;
}
}
}

/// <inheritdoc/>
public void Dispose()
{
if (this.CircularBuffer != null)
{
// Note: Drain anything left in the batch and return to the pool if
// needed.
while (this.CircularBuffer.RemovedCount < this.targetCount)
{
T item = this.CircularBuffer.Read();
if (typeof(T) == typeof(LogRecord))
{
Enumerator.TryReturnLogRecordToPool(item);
}
}
}
else if (this.Items != null && this.Rented)
{
// Note: We don't attempt to return individual LogRecords to the
// pool. If the batch wasn't drained fully some records may get
// garbage collected but the pool will recreate more as needed. The
// idea is most batches are expected to be drained during export so
// it isn't worth the effort to track what was/was not returned.

ArrayPool<T>.Shared.Return(this.Items);

Unsafe.AsRef(in this) = new Batch<T>(Array.Empty<T>(), 0);
}
}

/// <summary>
Expand All @@ -87,12 +202,12 @@ public void Dispose()
/// <returns><see cref="Enumerator"/>.</returns>
public Enumerator GetEnumerator()
{
return this.circularBuffer != null
? new Enumerator(this.circularBuffer, this.targetCount)
: this.item != null
? new Enumerator(this.item)
return this.CircularBuffer != null
? new Enumerator(this.CircularBuffer, this.targetCount)
: this.Item != null
? new Enumerator(this.Item)
/* In the event someone uses default/new Batch() to create Batch we fallback to empty items mode. */
: new Enumerator(this.items ?? Array.Empty<T>(), this.targetCount);
: new Enumerator(this.Items ?? Array.Empty<T>(), this.targetCount);
}

/// <summary>
Expand All @@ -114,6 +229,8 @@ public struct Enumerator : IEnumerator<T>

private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBuffer = (ref Enumerator enumerator) =>
{
Debug.Assert(typeof(T) != typeof(LogRecord), "T was an unexpected type");

var circularBuffer = enumerator.circularBuffer;

if (circularBuffer!.RemovedCount < enumerator.targetCount)
Expand All @@ -128,37 +245,45 @@ public struct Enumerator : IEnumerator<T>

private static readonly BatchEnumeratorMoveNextFunc MoveNextCircularBufferLogRecord = (ref Enumerator enumerator) =>
{
// Note: This type check here is to give the JIT a hint it can
// remove all of this code when T != LogRecord
if (typeof(T) == typeof(LogRecord))
Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type");

TryReturnCurrentLogRecordToPool(ref enumerator);

var circularBuffer = enumerator.circularBuffer;

if (circularBuffer!.RemovedCount < enumerator.targetCount)
{
var circularBuffer = enumerator.circularBuffer;
enumerator.current = circularBuffer.Read();
return true;
}

var currentItem = enumerator.Current;
enumerator.current = null;

if (currentItem != null)
{
var logRecord = (LogRecord)(object)currentItem;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
}
}
return false;
};

if (circularBuffer!.RemovedCount < enumerator.targetCount)
{
enumerator.current = circularBuffer.Read();
return true;
}
private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) =>
{
Debug.Assert(typeof(T) != typeof(LogRecord), "T was an unexpected type");

enumerator.current = null;
var items = enumerator.items;

if (enumerator.itemIndex < enumerator.targetCount)
{
enumerator.current = items![enumerator.itemIndex++];
return true;
}

enumerator.current = null;
return false;
};

private static readonly BatchEnumeratorMoveNextFunc MoveNextArray = (ref Enumerator enumerator) =>
private static readonly BatchEnumeratorMoveNextFunc MoveNextArrayLogRecord = (ref Enumerator enumerator) =>
{
Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type");

TryReturnCurrentLogRecordToPool(ref enumerator);

var items = enumerator.items;

if (enumerator.itemIndex < enumerator.targetCount)
Expand All @@ -168,6 +293,7 @@ public struct Enumerator : IEnumerator<T>
}

enumerator.current = null;

return false;
};

Expand Down Expand Up @@ -206,7 +332,7 @@ internal Enumerator(T[] items, long targetCount)
this.items = items;
this.targetCount = targetCount;
this.itemIndex = 0;
this.moveNextFunc = MoveNextArray;
this.moveNextFunc = typeof(T) == typeof(LogRecord) ? MoveNextArrayLogRecord : MoveNextArray;
}

/// <inheritdoc/>
Expand All @@ -223,12 +349,7 @@ public void Dispose()
var currentItem = this.current;
if (currentItem != null)
{
var logRecord = (LogRecord)(object)currentItem;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
}

TryReturnLogRecordToPool(currentItem);
this.current = null;
}
}
Expand All @@ -243,5 +364,29 @@ public bool MoveNext()
/// <inheritdoc/>
public readonly void Reset()
=> throw new NotSupportedException();

internal static void TryReturnLogRecordToPool(T currentItem)
{
Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type");
Debug.Assert(currentItem != null, "currentItem was null");

var logRecord = (LogRecord)(object)currentItem!;
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
{
LogRecordSharedPool.Current.Return(logRecord);
}
}

private static void TryReturnCurrentLogRecordToPool(ref Enumerator enumerator)
{
Debug.Assert(typeof(T) == typeof(LogRecord), "T was an unexpected type");

var currentItem = enumerator.Current;

if (currentItem != null)
{
TryReturnLogRecordToPool(currentItem);
}
}
}
}
4 changes: 4 additions & 0 deletions src/OpenTelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

* Added `Transform` API on `Batch<T>` which can be used to modify and/or filter
records contained in a batch.
([#5733](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5733))

## 1.9.0

Released 2024-Jun-14
Expand Down
Loading