Skip to content

Commit

Permalink
Add netstandard2.0 support (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz authored Feb 27, 2023
1 parent 756807e commit 1d86925
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 26 deletions.
33 changes: 19 additions & 14 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,12 @@ public interface IBufferedChannel<in TEvent> : IDisposable
/// Waits for availability on the inbound channel before attempting to write each item in <paramref name="events"/>.
/// </summary>
/// <returns>A bool indicating if all writes werwase successful</returns>
async Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationToken ctx = default)
{
var allWritten = true;
foreach (var e in events)
{
var written = await WaitToWriteAsync(e, ctx).ConfigureAwait(false);
if (!written) allWritten = written;
}
return allWritten;
}
Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationToken ctx = default);

/// <summary>
/// Tries to write many <paramref name="events"/> to the channel returning true if ALL messages were written succesfully
/// </summary>
bool TryWriteMany(IEnumerable<TEvent> events) =>
events.Select(e => TryWrite(e)).All(b => b);
bool TryWriteMany(IEnumerable<TEvent> events);
}

/// <summary>
Expand Down Expand Up @@ -122,8 +112,6 @@ await ConsumeInboundEvents(maxOut, BufferOptions.OutboundBufferMaxLifetime)
/// </summary>
protected abstract Task<TResponse> Export(IReadOnlyCollection<TEvent> buffer, CancellationToken ctx = default);



/// <summary>The channel options currently in use</summary>
public TChannelOptions Options { get; }

Expand Down Expand Up @@ -152,6 +140,23 @@ public override bool TryWrite(TEvent item)
return false;
}


/// <inheritdoc cref="IBufferedChannel{TEvent}.WaitToWriteManyAsync"/>
public async Task<bool> WaitToWriteManyAsync(IEnumerable<TEvent> events, CancellationToken ctx = default)
{
var allWritten = true;
foreach (var e in events)
{
var written = await WaitToWriteAsync(e, ctx).ConfigureAwait(false);
if (!written) allWritten = written;
}
return allWritten;
}

/// <inheritdoc cref="IBufferedChannel{TEvent}.TryWriteMany"/>
public bool TryWriteMany(IEnumerable<TEvent> events) =>
events.Select(e => TryWrite(e)).All(b => b);

/// <inheritdoc cref="ChannelWriter{T}.WaitToWriteAsync"/>
public virtual async Task<bool> WaitToWriteAsync(TEvent item, CancellationToken ctx = default)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Channels/Elastic.Channels.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<Description>Provides components to build a buffer-backed channel that flushes batches of data in a controlled (Max N || Max Duration) manner.</Description>
<PackageTags>elastic, channels, buffer</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Ingest.Apm/Elastic.Ingest.Apm.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<Description>Offers an easy to use ChannelWriter implementation to push data concurrently to APM using Elastic.Transport</Description>
<PackageTags>elastic, channels, apm, ingest</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<Description>Offers an easy to use ChannelWriter implementation to push data concurrently to Elasticsearch using Elastic.Transport</Description>
<PackageTags>elastic, channels, elasticsearch, ingest</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down
17 changes: 9 additions & 8 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
using Elastic.Transport;
using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics;

namespace Elastic.Ingest.Elasticsearch
{
Expand Down Expand Up @@ -46,7 +47,7 @@ protected override bool Retry(BulkResponse response)

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RetryEvent"/>
protected override bool RetryEvent((TEvent, BulkResponseItem) @event) =>
ElasticsearchChannelStatics.RetryStatusCodes.Contains(@event.Item2.Status);
RetryStatusCodes.Contains(@event.Item2.Status);

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RejectEvent"/>
protected override bool RejectEvent((TEvent, BulkResponseItem) @event) =>
Expand All @@ -61,7 +62,7 @@ protected override Task<BulkResponse> Export(HttpTransport transport, IReadOnlyC
/* NOT USED */
},
async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, ctx).ConfigureAwait(false); })
, ElasticsearchChannelStatics.RequestParams, ctx);
, RequestParams, ctx);

/// <summary>
/// Asks implementations to create a <see cref="BulkOperationHeader"/> based on the <paramref name="event"/> being exported.
Expand All @@ -75,23 +76,23 @@ private async Task WriteBufferToStreamAsync(IReadOnlyCollection<TEvent> b, Strea
if (@event == null) continue;

var indexHeader = CreateBulkOperationHeader(@event);
await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), ElasticsearchChannelStatics.SerializerOptions, ctx)
await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), SerializerOptions, ctx)
.ConfigureAwait(false);
await stream.WriteAsync(ElasticsearchChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false);
await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);

if (indexHeader is UpdateOperation)
await stream.WriteAsync(ElasticsearchChannelStatics.DocUpdateHeaderStart, ctx).ConfigureAwait(false);
await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false);

if (Options.WriteEvent != null)
await Options.WriteEvent(stream, ctx, @event).ConfigureAwait(false);
else
await JsonSerializer.SerializeAsync(stream, @event, typeof(TEvent), ElasticsearchChannelStatics.SerializerOptions, ctx)
await JsonSerializer.SerializeAsync(stream, @event, typeof(TEvent), SerializerOptions, ctx)
.ConfigureAwait(false);

if (indexHeader is UpdateOperation)
await stream.WriteAsync(ElasticsearchChannelStatics.DocUpdateHeaderEnd, ctx).ConfigureAwait(false);
await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false);

await stream.WriteAsync(ElasticsearchChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false);
await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<Description>Provides components to build a buffer-backed channel for publishing events to distributed systems over HTTP through Elastic.Transport</Description>
<PackageTags>elastic, transport, ingest, search</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down

0 comments on commit 1d86925

Please sign in to comment.