From a05bf2c8346e1a90a0bd1510e3c29ba281357349 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 10 Apr 2024 09:22:17 +0200 Subject: [PATCH] Allow consumers to choose BoundedChannelFullMode (#54) --- examples/playground/Program.cs | 10 ++++++++-- src/Elastic.Channels/BufferOptions.cs | 10 ++++++++++ src/Elastic.Channels/BufferedChannelBase.cs | 4 ++-- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/examples/playground/Program.cs b/examples/playground/Program.cs index bf28e12..9a2b8b5 100644 --- a/examples/playground/Program.cs +++ b/examples/playground/Program.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information using System.Text.Json.Serialization; +using System.Threading.Channels; using Elastic.Channels; using Elastic.Elasticsearch.Ephemeral; using Elastic.Ingest.Elasticsearch; @@ -13,7 +14,7 @@ var ctxs = new CancellationTokenSource(); var parallelOpts = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = ctxs.Token }; const int numDocs = 1_000_000; -var bufferOptions = new BufferOptions { InboundBufferMaxSize = numDocs, OutboundBufferMaxSize = 10_000 }; +var bufferOptions = new BufferOptions { InboundBufferMaxSize = 1000, OutboundBufferMaxSize = 100, ExportMaxConcurrency = 1, BoundedChannelFullMode = BoundedChannelFullMode.Wait }; var config = new EphemeralClusterConfiguration("8.13.0"); using var cluster = new EphemeralCluster(config); using var channel = SetupElasticsearchChannel(); @@ -48,10 +49,15 @@ async Task PushToChannel(DataStreamChannel c) if (c == null) throw new ArgumentNullException(nameof(c)); await c.BootstrapElasticsearchAsync(BootstrapMethod.Failure); + + foreach (var i in Enumerable.Range(0, numDocs)) + await DoChannelWrite(i, ctxs.Token); + + /* await Parallel.ForEachAsync(Enumerable.Range(0, numDocs), parallelOpts, async (i, ctx) => { await DoChannelWrite(i, ctx); - }); + });*/ async Task DoChannelWrite(int i, CancellationToken cancellationToken) { diff --git a/src/Elastic.Channels/BufferOptions.cs b/src/Elastic.Channels/BufferOptions.cs index 9c17cb5..2477558 100644 --- a/src/Elastic.Channels/BufferOptions.cs +++ b/src/Elastic.Channels/BufferOptions.cs @@ -4,6 +4,7 @@ using System; using System.Threading; +using System.Threading.Channels; namespace Elastic.Channels; @@ -62,4 +63,13 @@ public class BufferOptions /// Allows you to inject a to wait for N number of buffers to flush. /// public CountdownEvent? WaitHandle { get; set; } + + /// + /// + /// Defaults to , this will use more memory as overproducing will need to wait to enqueue data + /// Use to minimize memory consumption at the expense of more likely to drop data + /// You might need to tweak and to ensure sufficient allocations are available + /// The defaults for both adn are quite liberal already though. + /// + public BoundedChannelFullMode BoundedChannelFullMode { get; set; } = BoundedChannelFullMode.Wait; } diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 54db5d5..395848c 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -113,10 +113,10 @@ protected BufferedChannelBase(TChannelOptions options, ICollection>( - new BoundedChannelOptions(maxOut) + new BoundedChannelOptions(_maxConcurrency * 2) { SingleReader = false, SingleWriter = true,