Skip to content

Commit

Permalink
Misc cleanup (#34)
Browse files Browse the repository at this point in the history
* Add analyzer and editor config to ensure Async suffix is used

* Rename Export to ExportAsync

* Make field readonly

* Simplify object creation

* Remove whitespace

* Add VS 2022 spell check settings

* Fix for https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD105.md

* Typo and formatting

* Rename async methods

* Ignore IDE0022 in code

* Remove test word from exclusions

* Fix whitespace

* Convert to file-scoped namespaces
  • Loading branch information
stevejgordon authored May 4, 2023
1 parent 85fc7a5 commit feb057d
Show file tree
Hide file tree
Showing 43 changed files with 1,869 additions and 1,886 deletions.
6 changes: 6 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ resharper_redundant_argument_default_value_highlighting=do_not_show
# Do not penalize code that explicitly lists generic arguments
resharper_redundant_type_arguments_of_method_highlighting=do_not_show

# Spell checker VS2022
spelling_exclusion_path = .\exclusion.dic

# Microsoft.VisualStudio.Threading.Analyzers
dotnet_diagnostic.VSTHRD200.severity = error

[Jenkinsfile]
indent_style = space
indent_size = 2
Expand Down
179 changes: 89 additions & 90 deletions examples/Elastic.Ingest.Apm.Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,111 +9,110 @@
using Elastic.Ingest.Apm.Model;
using Elastic.Transport;

namespace Elastic.Ingest.Apm.Example
namespace Elastic.Ingest.Apm.Example;

internal class Program
{
internal class Program
private static int _rejections;
private static int _requests;
private static int _responses;
private static int _retries;
private static int _maxRetriesExceeded;
private static Exception _exception;

private static int Main(string[] args)
{
private static int _rejections;
private static int _requests;
private static int _responses;
private static int _retries;
private static int _maxRetriesExceeded;
private static Exception _exception;

private static int Main(string[] args)
if (args.Length != 2)
{
if (args.Length != 2)
{
Console.Error.WriteLine("Please specify <url> <secret_token>");
return 1;
}

var config = new TransportConfiguration(new Uri(args[0]))
.EnableDebugMode()
.Authentication(new ApiKey(args[1]));
//TODO needs
var transport = new DefaultHttpTransport<TransportConfiguration>(config);

var numberOfEvents = 800;
var maxBufferSize = 200;
var handle = new CountdownEvent(numberOfEvents / maxBufferSize);

var options =
new BufferOptions
{
ExportMaxConcurrency = 1,
OutboundBufferMaxSize = 200,
OutboundBufferMaxLifetime = TimeSpan.FromSeconds(10),
WaitHandle = handle,
ExportMaxRetries = 3,
ExportBackoffPeriod = times => TimeSpan.FromMilliseconds(1),
};
var channelOptions = new ApmChannelOptions(transport)
Console.Error.WriteLine("Please specify <url> <secret_token>");
return 1;
}

var config = new TransportConfiguration(new Uri(args[0]))
.EnableDebugMode()
.Authentication(new ApiKey(args[1]));
//TODO needs
var transport = new DefaultHttpTransport<TransportConfiguration>(config);

var numberOfEvents = 800;
var maxBufferSize = 200;
var handle = new CountdownEvent(numberOfEvents / maxBufferSize);

var options =
new BufferOptions
{
BufferOptions = options,
ServerRejectionCallback = (list) => Interlocked.Increment(ref _rejections),
ExportItemsAttemptCallback = (c, a) => Interlocked.Increment(ref _requests),
ExportResponseCallback = (r, b) =>
{
Interlocked.Increment(ref _responses);
Console.WriteLine(r.ApiCallDetails.DebugInformation);
},
ExportBufferCallback = () => Console.WriteLine("Flushed"),
ExportMaxRetriesCallback = (list) => Interlocked.Increment(ref _maxRetriesExceeded),
ExportRetryCallback = (list) => Interlocked.Increment(ref _retries),
ExportExceptionCallback = (e) => _exception = e
ExportMaxConcurrency = 1,
OutboundBufferMaxSize = 200,
OutboundBufferMaxLifetime = TimeSpan.FromSeconds(10),
WaitHandle = handle,
ExportMaxRetries = 3,
ExportBackoffPeriod = times => TimeSpan.FromMilliseconds(1),
};
var channel = new ApmChannel(channelOptions);

string Id() => RandomGenerator.GenerateRandomBytesAsString(8);
var random = new Random();
for (var i = 0; i < numberOfEvents; i++)
var channelOptions = new ApmChannelOptions(transport)
{
BufferOptions = options,
ServerRejectionCallback = (list) => Interlocked.Increment(ref _rejections),
ExportItemsAttemptCallback = (c, a) => Interlocked.Increment(ref _requests),
ExportResponseCallback = (r, b) =>
{
channel.TryWrite(new Transaction("http", Id(), Id(), new SpanCount(), random.NextDouble() * random.Next(100, 1000) , Epoch.UtcNow) { Name = "x" });
}
handle.Wait(TimeSpan.FromSeconds(20));
Interlocked.Increment(ref _responses);
Console.WriteLine(r.ApiCallDetails.DebugInformation);
},
ExportBufferCallback = () => Console.WriteLine("Flushed"),
ExportMaxRetriesCallback = (list) => Interlocked.Increment(ref _maxRetriesExceeded),
ExportRetryCallback = (list) => Interlocked.Increment(ref _retries),
ExportExceptionCallback = (e) => _exception = e
};
var channel = new ApmChannel(channelOptions);

return 0;
string Id() => RandomGenerator.GenerateRandomBytesAsString(8);
var random = new Random();
for (var i = 0; i < numberOfEvents; i++)
{
channel.TryWrite(new Transaction("http", Id(), Id(), new SpanCount(), random.NextDouble() * random.Next(100, 1000) , Epoch.UtcNow) { Name = "x" });
}
}
handle.Wait(TimeSpan.FromSeconds(20));

internal static class RandomGenerator
{
[ThreadStatic]
private static Random _local;
return 0;
}
}

private static readonly Random Global = new Random();
internal static class RandomGenerator
{
[ThreadStatic]
private static Random _local;

internal static Random GetInstance()
{
var inst = _local;
if (inst == null)
{
int seed;
lock (Global) seed = Global.Next();
_local = inst = new Random(seed);
}
return inst;
}
private static readonly Random Global = new Random();

internal static void GenerateRandomBytes(byte[] bytes) => GetInstance().NextBytes(bytes);

/// <summary>
/// Creates a random generated byte array hex encoded into a string.
/// </summary>
/// <param name="bytes">
/// The byte array that will be filled with a random number - this defines the length of the generated
/// random bits
/// </param>
/// <returns>The random number hex encoded as string</returns>
internal static string GenerateRandomBytesAsString(byte[] bytes)
internal static Random GetInstance()
{
var inst = _local;
if (inst == null)
{
GenerateRandomBytes(bytes);
return BitConverter.ToString(bytes).Replace("-", "").ToLowerInvariant();
int seed;
lock (Global) seed = Global.Next();
_local = inst = new Random(seed);
}
return inst;
}

internal static string GenerateRandomBytesAsString(int numberOfBytes) => GenerateRandomBytesAsString(new byte[numberOfBytes]);
internal static void GenerateRandomBytes(byte[] bytes) => GetInstance().NextBytes(bytes);

internal static double GenerateRandomDoubleBetween0And1() => GetInstance().NextDouble();
/// <summary>
/// Creates a random generated byte array hex encoded into a string.
/// </summary>
/// <param name="bytes">
/// The byte array that will be filled with a random number - this defines the length of the generated
/// random bits
/// </param>
/// <returns>The random number hex encoded as string</returns>
internal static string GenerateRandomBytesAsString(byte[] bytes)
{
GenerateRandomBytes(bytes);
return BitConverter.ToString(bytes).Replace("-", "").ToLowerInvariant();
}

internal static string GenerateRandomBytesAsString(int numberOfBytes) => GenerateRandomBytesAsString(new byte[numberOfBytes]);

internal static double GenerateRandomDoubleBetween0And1() => GetInstance().NextDouble();
}
2 changes: 2 additions & 0 deletions exclusion.dic
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
async
retryable
4 changes: 4 additions & 0 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,9 @@
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All"/>
<PackageReference Include="ConfigureAwaitChecker.Analyzer" Version="5.0.0.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="17.5.22">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup>
</Project>
99 changes: 49 additions & 50 deletions src/Elastic.Channels/BufferOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,56 @@
using System;
using System.Threading;

namespace Elastic.Channels
namespace Elastic.Channels;

/// <summary>
/// Controls how data should be buffered in <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/> implementations
/// </summary>
public class BufferOptions
{
/// <summary>
/// Controls how data should be buffered in <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/> implementations
/// The maximum number of in flight instances that can be queued in memory. If this threshold is reached, events will be dropped
/// <para>Defaults to <c>100_000</c></para>
/// </summary>
public int InboundBufferMaxSize { get; set; } = 100_000;

/// <summary>
/// The maximum size to export to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/> at once.
/// <para>Defaults to <c>1_000</c></para>
/// </summary>
public int OutboundBufferMaxSize { get; set; } = 1_000;

/// <summary>
/// The maximum lifetime of a buffer to export to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/>.
/// If a buffer is older then the configured <see cref="OutboundBufferMaxLifetime"/> it will be flushed to
/// <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/> regardless of it's current size
/// <para>Defaults to <c>5 seconds</c></para>
/// </summary>
public TimeSpan OutboundBufferMaxLifetime { get; set; } = TimeSpan.FromSeconds(5);

/// <summary>
/// The maximum number of consumers allowed to poll for new events on the channel.
/// <para>Defaults to <c>1</c>, increase to introduce concurrency.</para>
/// </summary>
public int ExportMaxConcurrency { get; set; } = 1;

/// <summary>
/// The times to retry an export if <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.RetryBuffer"/> yields items to retry.
/// <para>Whether or not items are selected for retrying depends on the actual channel implementation</para>
/// <see cref="ExportBackoffPeriod"/> to implement a backoff period of your choosing.
/// <para>Defaults to <c>3</c>, when <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.RetryBuffer"/> yields any items</para>
/// </summary>
public int ExportMaxRetries { get; set; } = 3;


/// <summary>
/// A function to calculate the backoff period, gets passed the number of retries attempted starting at 0.
/// By default backs off in increments of 2 seconds.
/// </summary>
public Func<int, TimeSpan> ExportBackoffPeriod { get; set; } = (i) => TimeSpan.FromSeconds(2 * (i + 1));

/// <summary>
/// Allows you to inject a <see cref="CountdownEvent"/> to wait for N number of buffers to flush.
/// </summary>
public class BufferOptions
{
/// <summary>
/// The maximum number of in flight instances that can be queued in memory. If this threshold is reached, events will be dropped
/// <para>Defaults to <c>100_000</c></para>
/// </summary>
public int InboundBufferMaxSize { get; set; } = 100_000;

/// <summary>
/// The maximum size to export to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/> at once.
/// <para>Defaults to <c>1_000</c></para>
/// </summary>
public int OutboundBufferMaxSize { get; set; } = 1_000;

/// <summary>
/// The maximum lifetime of a buffer to export to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/>.
/// If a buffer is older then the configured <see cref="OutboundBufferMaxLifetime"/> it will be flushed to
/// <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/> regardless of it's current size
/// <para>Defaults to <c>5 seconds</c></para>
/// </summary>
public TimeSpan OutboundBufferMaxLifetime { get; set; } = TimeSpan.FromSeconds(5);

/// <summary>
/// The maximum number of consumers allowed to poll for new events on the channel.
/// <para>Defaults to <c>1</c>, increase to introduce concurrency.</para>
/// </summary>
public int ExportMaxConcurrency { get; set; } = 1;

/// <summary>
/// The times to retry an export if <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.RetryBuffer"/> yields items to retry.
/// <para>Whether or not items are selected for retrying depends on the actual channel implementation</para>
/// <see cref="ExportBackoffPeriod"/> to implement a backoff period of your choosing.
/// <para>Defaults to <c>3</c>, when <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.RetryBuffer"/> yields any items</para>
/// </summary>
public int ExportMaxRetries { get; set; } = 3;


/// <summary>
/// A function to calculate the backoff period, gets passed the number of retries attempted starting at 0.
/// By default backs off in increments of 2 seconds.
/// </summary>
public Func<int, TimeSpan> ExportBackoffPeriod { get; set; } = (i) => TimeSpan.FromSeconds(2 * (i + 1));

/// <summary>
/// Allows you to inject a <see cref="CountdownEvent"/> to wait for N number of buffers to flush.
/// </summary>
public CountdownEvent? WaitHandle { get; set; }
}
public CountdownEvent? WaitHandle { get; set; }
}
Loading

0 comments on commit feb057d

Please sign in to comment.