diff --git a/builds/csharp/nuget/build_nugets.py b/builds/csharp/nuget/build_nugets.py index 0dd9b647..899c03cd 100644 --- a/builds/csharp/nuget/build_nugets.py +++ b/builds/csharp/nuget/build_nugets.py @@ -7,8 +7,8 @@ from typing import List version = "0.7.2.0" -informal_version = "0.7.2.0-dev2" -nuget_version = "0.7.2.0-dev2" +informal_version = "0.7.2.0" +nuget_version = "0.7.2.0" def updatecsproj(projfilepath): diff --git a/src/Quix.TestBase/Extensions/TestOutputHelperExtensions.cs b/src/Quix.TestBase/Extensions/TestOutputHelperExtensions.cs index 7e67ba44..94c10c59 100644 --- a/src/Quix.TestBase/Extensions/TestOutputHelperExtensions.cs +++ b/src/Quix.TestBase/Extensions/TestOutputHelperExtensions.cs @@ -51,7 +51,7 @@ public void Log( if (exception.StackTrace != null) this.helper.WriteLine(exception.StackTrace); } } - catch (System.InvalidOperationException) + catch (InvalidOperationException) { // I think this might happen with async tasks, but stop failing agent tests for this } @@ -95,7 +95,7 @@ public void Log( if (exception.StackTrace != null) this.helper.WriteLine(exception.StackTrace); } } - catch (System.InvalidOperationException) + catch (InvalidOperationException) { // I think this might happen with async tasks, but stop failing agent tests for this } diff --git a/src/QuixStreams.IntegrationTestBase/KafkaDockerTestFixtureBase.cs b/src/QuixStreams.IntegrationTestBase/KafkaDockerTestFixtureBase.cs index 7d3ad826..e86f668c 100644 --- a/src/QuixStreams.IntegrationTestBase/KafkaDockerTestFixtureBase.cs +++ b/src/QuixStreams.IntegrationTestBase/KafkaDockerTestFixtureBase.cs @@ -1,5 +1,6 @@ using System; using System.Diagnostics; +using System.Runtime.InteropServices; using Confluent.Kafka; using Ductus.FluentDocker.Builders; using Ductus.FluentDocker.Extensions; @@ -28,11 +29,11 @@ public KafkaDockerTestFixtureBase() { var builder = new Builder().UseContainer(); - if (System.Runtime.InteropServices.RuntimeInformation.OSArchitecture == System.Runtime.InteropServices.Architecture.Arm64) + if (RuntimeInformation.OSArchitecture == Architecture.Arm64) { builder = builder.UseImage("dougdonohoe/fast-data-dev:latest"); } - else if (System.Runtime.InteropServices.RuntimeInformation.OSArchitecture == System.Runtime.InteropServices.Architecture.X64) + else if (RuntimeInformation.OSArchitecture == Architecture.X64) { builder = builder.UseImage("lensesio/fast-data-dev:3.3.1"); } diff --git a/src/QuixStreams.Kafka.Tests/KafkaStreamingClientIntegrationTests.cs b/src/QuixStreams.Kafka.Tests/KafkaStreamingClientIntegrationTests.cs index 7f7857b4..f2d3262a 100644 --- a/src/QuixStreams.Kafka.Tests/KafkaStreamingClientIntegrationTests.cs +++ b/src/QuixStreams.Kafka.Tests/KafkaStreamingClientIntegrationTests.cs @@ -25,7 +25,7 @@ public KafkaStreamingClientIntegrationTests(ITestOutputHelper output, KafkaDocke { this.output = output; this.kafkaDockerTestFixture = kafkaDockerTestFixture; - QuixStreams.Logging.Factory = output.CreateLoggerFactory(); + Logging.Factory = output.CreateLoggerFactory(); output.WriteLine($"Created client with brokerlist '{kafkaDockerTestFixture.BrokerList}'"); } diff --git a/src/QuixStreams.Kafka.Transport.Samples/Samples/WriteMessage.cs b/src/QuixStreams.Kafka.Transport.Samples/Samples/WriteMessage.cs index f8aa51f4..dbeab255 100644 --- a/src/QuixStreams.Kafka.Transport.Samples/Samples/WriteMessage.cs +++ b/src/QuixStreams.Kafka.Transport.Samples/Samples/WriteMessage.cs @@ -2,7 +2,6 @@ using System.Diagnostics; using System.Threading; using System.Threading.Tasks; -using QuixStreams.Kafka.Transport.SerDes; using Timer = System.Timers.Timer; namespace QuixStreams.Kafka.Transport.Samples.Samples diff --git a/src/QuixStreams.Kafka.Transport.Samples/Samples/WritePackage.cs b/src/QuixStreams.Kafka.Transport.Samples/Samples/WritePackage.cs index 1232c4bc..2cd6e48b 100644 --- a/src/QuixStreams.Kafka.Transport.Samples/Samples/WritePackage.cs +++ b/src/QuixStreams.Kafka.Transport.Samples/Samples/WritePackage.cs @@ -1,11 +1,7 @@ using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Text; using System.Threading; using System.Threading.Tasks; -using Confluent.Kafka; -using QuixStreams.Kafka.Transport.SerDes; using QuixStreams.Kafka.Transport.SerDes.Codecs; using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs; using Timer = System.Timers.Timer; diff --git a/src/QuixStreams.Kafka.Transport.Samples/Samples/WritePackageToPartition.cs b/src/QuixStreams.Kafka.Transport.Samples/Samples/WritePackageToPartition.cs index 5b7b311d..8f6d463f 100644 --- a/src/QuixStreams.Kafka.Transport.Samples/Samples/WritePackageToPartition.cs +++ b/src/QuixStreams.Kafka.Transport.Samples/Samples/WritePackageToPartition.cs @@ -2,7 +2,6 @@ using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; -using QuixStreams.Kafka.Transport.SerDes; using QuixStreams.Kafka.Transport.SerDes.Codecs; using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs; diff --git a/src/QuixStreams.Kafka.Transport.Tests/AutoCommitterShould.cs b/src/QuixStreams.Kafka.Transport.Tests/AutoCommitterShould.cs index 1db677b7..31d14b83 100644 --- a/src/QuixStreams.Kafka.Transport.Tests/AutoCommitterShould.cs +++ b/src/QuixStreams.Kafka.Transport.Tests/AutoCommitterShould.cs @@ -18,7 +18,7 @@ public class CommitModifierShould public CommitModifierShould(ITestOutputHelper helper) { - QuixStreams.Logging.Factory = helper.CreateLoggerFactory(); + Logging.Factory = helper.CreateLoggerFactory(); this.helper = helper; } diff --git a/src/QuixStreams.Kafka.Transport.Tests/Helpers/TestModel.cs b/src/QuixStreams.Kafka.Transport.Tests/Helpers/TestModel.cs index 4ea03f6f..962a1000 100644 --- a/src/QuixStreams.Kafka.Transport.Tests/Helpers/TestModel.cs +++ b/src/QuixStreams.Kafka.Transport.Tests/Helpers/TestModel.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using System.Text; +using QuixStreams.Kafka.Transport.SerDes.Codecs; using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs; namespace QuixStreams.Kafka.Transport.Tests.Helpers @@ -9,8 +10,8 @@ public class TestModel : IEquatable { static TestModel() { - QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RegisterCodec(typeof(TestModel), new DefaultJsonCodec()); - QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RegisterCodec(typeof(TestModel[]), new DefaultJsonCodec()); + CodecRegistry.RegisterCodec(typeof(TestModel), new DefaultJsonCodec()); + CodecRegistry.RegisterCodec(typeof(TestModel[]), new DefaultJsonCodec()); } public string StringProp { get; set; } diff --git a/src/QuixStreams.Kafka.Transport.Tests/SerDes/KafkaMessageMergerShould.cs b/src/QuixStreams.Kafka.Transport.Tests/SerDes/KafkaMessageMergerShould.cs index c3dc8914..8c129dea 100644 --- a/src/QuixStreams.Kafka.Transport.Tests/SerDes/KafkaMessageMergerShould.cs +++ b/src/QuixStreams.Kafka.Transport.Tests/SerDes/KafkaMessageMergerShould.cs @@ -1,11 +1,8 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; -using System.Net; using System.Threading.Tasks; using FluentAssertions; -using NSubstitute; using QuixStreams.Kafka.Transport.SerDes; using Xunit; @@ -50,7 +47,7 @@ public async Task Modify_SplitPackageInterweavedWithOtherPackages_ShouldRaiseInE // then the outcome is [Message1_merged] [Message2_merged] [Message3] // Arrange - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode; + PackageSerializationSettings.Mode = mode; var merger = new KafkaMessageMerger(new KafkaMessageBuffer()); var message1Segments = this.CreateSplitMessage(2, out var message1); @@ -98,7 +95,7 @@ public async Task Modify_SplitPackageInterweavedWithOtherAndSplitPackageExpires_ // then the outcome is [Message2], [Message3], as segment 2 should be discarded // Arrange - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode; + PackageSerializationSettings.Mode = mode; var buffer = new KafkaMessageBuffer(); var merger = new KafkaMessageMerger(buffer); var message1Segments = this.CreateSplitMessage(2, out var message1); @@ -141,7 +138,7 @@ public async Task Modify_SplitPackageInterweavedWithOtherAndSplitPackageExpires_ public void Modify_MergeReturnsNull_ShouldNotRaisePackageAndReturnCompletedTask(PackageSerializationMode mode) { // Arrange - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode; + PackageSerializationSettings.Mode = mode; var buffer = new KafkaMessageBuffer(); var merger = new KafkaMessageMerger(buffer); @@ -171,7 +168,7 @@ public async Task Modify_SplitPackageInterweavedWithOtherAndSplitPackageRevoked_ // disappear then should raise [Message2] and [Message3] // Arrange - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode; + PackageSerializationSettings.Mode = mode; var buffer = new KafkaMessageBuffer(); var merger = new KafkaMessageMerger(buffer); var message1Segments = this.CreateSplitMessage(3, out var message1); diff --git a/src/QuixStreams.Kafka.Transport.Tests/SerDes/KafkaMessageSplitterShould.cs b/src/QuixStreams.Kafka.Transport.Tests/SerDes/KafkaMessageSplitterShould.cs index 93ca8fa2..09ecf69f 100644 --- a/src/QuixStreams.Kafka.Transport.Tests/SerDes/KafkaMessageSplitterShould.cs +++ b/src/QuixStreams.Kafka.Transport.Tests/SerDes/KafkaMessageSplitterShould.cs @@ -1,9 +1,9 @@ using System; -using System.Collections.Generic; using System.Linq; using System.Runtime.Serialization; using System.Text; using FluentAssertions; +using Newtonsoft.Json; using QuixStreams.Kafka.Transport.SerDes; using QuixStreams.Kafka.Transport.SerDes.Legacy; using Xunit; @@ -16,7 +16,7 @@ public class KafkaMessageSplitterShould public void Split_WithDataOutsideAbsoluteMaxSize_ShouldThrowSerializationException() { // Arrange - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = PackageSerializationMode.LegacyValue; + PackageSerializationSettings.Mode = PackageSerializationMode.LegacyValue; const int maxMsgLength = 50; var splitter = new KafkaMessageSplitter(maxMsgLength); var length = maxMsgLength * byte.MaxValue + 10; @@ -37,7 +37,7 @@ public void Split_WithDataOutsideAbsoluteMaxSize_ShouldThrowSerializationExcepti public void SplitLegacy_WithDataOutsideAllowedMessageSizeButWithinAbsoluteMaxSize_ShouldReturnSplitBytes() { // Arrange - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = PackageSerializationMode.LegacyValue; + PackageSerializationSettings.Mode = PackageSerializationMode.LegacyValue; const int maxMsgLength = 50; var splitter = new KafkaMessageSplitter(maxMsgLength); var length = 10199; // just a bit less than max (10200), but greater because of split info. @@ -88,7 +88,7 @@ public void SplitLegacy_WithDataOutsideAllowedMessageSizeButWithinAbsoluteMaxSiz public void Split_GreaterThanSupportedByLegacy_ShouldReturnSplitBytes() { // Arrange - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = PackageSerializationMode.Header; + PackageSerializationSettings.Mode = PackageSerializationMode.Header; int maxMsgLength = KafkaMessageSplitter.ExpectedHeaderSplitInfoSize + 50; var splitter = new KafkaMessageSplitter(maxMsgLength); var length = 40000; // legacy would die around 10200 in this config @@ -142,7 +142,7 @@ public void Split_GreaterThanSupportedByLegacy_ShouldReturnSplitBytes() public void Split_WithDataWithinAllowedMessageSize_ShouldReturnSameBytes(PackageSerializationMode mode) { // Arrange - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode; + PackageSerializationSettings.Mode = mode; var splitter = new KafkaMessageSplitter(50); var data = new byte[50]; var random = new Random(); @@ -163,7 +163,7 @@ public void Split_WithDataWithinAllowedMessageSize_ShouldReturnSameBytes(Package public void Split_WithDataWithinAllowedMessageSizeUsingHeaderAndKey_ShouldReturnSameBytes(PackageSerializationMode mode) { // Arrange - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = mode; + PackageSerializationSettings.Mode = mode; var splitter = new KafkaMessageSplitter(50); var data = new byte[5]; var random = new Random(); @@ -194,7 +194,7 @@ public void Split_UsingHeaderProtocolWithExcessiveValueSize_ShouldUseCompression data[ii] = $"Some_value_that_should_get_compressed_{ii}"; } - var dataBytes = Encoding.UTF8.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(data)); + var dataBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data)); var key = Encoding.UTF8.GetBytes("My super key"); var testCodecId = "Stuff"; @@ -232,7 +232,7 @@ public void Split_UsingHeaderProtocolNoCodecIdWithExcessiveValueSize_ShouldUseCo data[ii] = $"Some_value_that_should_get_compressed_{ii}"; } - var dataBytes = Encoding.UTF8.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(data)); + var dataBytes = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data)); var key = Encoding.UTF8.GetBytes("My super key"); var message = new KafkaMessage(key, dataBytes); diff --git a/src/QuixStreams.Kafka.Transport/AutoCommitter.cs b/src/QuixStreams.Kafka.Transport/AutoCommitter.cs index aae4bc66..0e85cd97 100644 --- a/src/QuixStreams.Kafka.Transport/AutoCommitter.cs +++ b/src/QuixStreams.Kafka.Transport/AutoCommitter.cs @@ -14,7 +14,7 @@ namespace QuixStreams.Kafka.Transport internal sealed class AutoCommitter { private readonly Action, Action> wrappingCommitCallback; - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly Func onPublish = null; private Action closeAction = null; private bool closed = false; diff --git a/src/QuixStreams.Kafka.Transport/SerDes/Codecs/Codec.cs b/src/QuixStreams.Kafka.Transport/SerDes/Codecs/Codec.cs index cab01133..a717af05 100644 --- a/src/QuixStreams.Kafka.Transport/SerDes/Codecs/Codec.cs +++ b/src/QuixStreams.Kafka.Transport/SerDes/Codecs/Codec.cs @@ -16,7 +16,7 @@ public abstract class Codec : ICodec /// protected Codec() { - logger = QuixStreams.Logging.CreateLogger(this.GetType()); + logger = Logging.CreateLogger(this.GetType()); } /// diff --git a/src/QuixStreams.Kafka.Transport/SerDes/Codecs/DefaultCodecs/DefaultJsonCodec.cs b/src/QuixStreams.Kafka.Transport/SerDes/Codecs/DefaultCodecs/DefaultJsonCodec.cs index 7503d2e1..4961fef9 100644 --- a/src/QuixStreams.Kafka.Transport/SerDes/Codecs/DefaultCodecs/DefaultJsonCodec.cs +++ b/src/QuixStreams.Kafka.Transport/SerDes/Codecs/DefaultCodecs/DefaultJsonCodec.cs @@ -1,6 +1,7 @@ using System; using System.IO; using Newtonsoft.Json; +using Newtonsoft.Json.Converters; namespace QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs { @@ -21,7 +22,7 @@ public class DefaultJsonCodec : Codec public DefaultJsonCodec() { serializer = JsonSerializer.Create(); - serializer.Converters.Add(new Newtonsoft.Json.Converters.StringEnumConverter()); + serializer.Converters.Add(new StringEnumConverter()); serializer.DefaultValueHandling = DefaultValueHandling.Ignore; } @@ -74,7 +75,7 @@ public class DefaultJsonCodec : ICodec private DefaultJsonCodec() { serializer = JsonSerializer.Create(); - serializer.Converters.Add(new Newtonsoft.Json.Converters.StringEnumConverter()); + serializer.Converters.Add(new StringEnumConverter()); serializer.DefaultValueHandling = DefaultValueHandling.Ignore; } diff --git a/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageBuffer.cs b/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageBuffer.cs index 7e32354a..b05c305b 100644 --- a/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageBuffer.cs +++ b/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageBuffer.cs @@ -59,7 +59,7 @@ public KafkaMessageBuffer(TimeSpan timeToLive, int bufferPerMessageGroupKey = 50 if (bufferPerMessageGroupKey < 1) throw new ArgumentOutOfRangeException(nameof(bufferPerMessageGroupKey), "Value must be at least 1"); this.timeToLive = timeToLive; this.bufferPerMessageGroupKey = bufferPerMessageGroupKey; - this.logger = QuixStreams.Logging.CreateLogger(typeof(KafkaMessageBuffer)); + this.logger = Logging.CreateLogger(typeof(KafkaMessageBuffer)); } /// diff --git a/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMerger.cs b/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMerger.cs index ebe554cb..168cf719 100644 --- a/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMerger.cs +++ b/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMerger.cs @@ -9,7 +9,6 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using QuixStreams.Kafka.Transport.SerDes.Legacy; namespace QuixStreams.Kafka.Transport.SerDes { @@ -18,7 +17,7 @@ namespace QuixStreams.Kafka.Transport.SerDes /// public class KafkaMessageMerger { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private long bufferCounter = 0; private readonly ConcurrentDictionary pendingMessages = new ConcurrentDictionary(); // Messages that are queued up diff --git a/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMergerHelper.cs b/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMergerHelper.cs index 047ebaba..34237bbd 100644 --- a/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMergerHelper.cs +++ b/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMergerHelper.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using QuixStreams.Kafka.Transport.SerDes.Legacy; diff --git a/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageSplitter.cs b/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageSplitter.cs index f23882be..f10a999d 100644 --- a/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageSplitter.cs +++ b/src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageSplitter.cs @@ -50,7 +50,7 @@ public class KafkaMessageSplitter : IKafkaMessageSplitter /// private static int MovingWarnAboveSize = 0; - private static readonly ILogger logger = QuixStreams.Logging.CreateLogger(typeof(KafkaMessageSplitter)); + private static readonly ILogger logger = Logging.CreateLogger(typeof(KafkaMessageSplitter)); /// /// The expected size of the the details to describe the split info diff --git a/src/QuixStreams.Kafka.Transport/SerDes/Legacy/MessageValue/TransportPackageValueCodecBinary.cs b/src/QuixStreams.Kafka.Transport/SerDes/Legacy/MessageValue/TransportPackageValueCodecBinary.cs index b8ba2546..7923c3ff 100644 --- a/src/QuixStreams.Kafka.Transport/SerDes/Legacy/MessageValue/TransportPackageValueCodecBinary.cs +++ b/src/QuixStreams.Kafka.Transport/SerDes/Legacy/MessageValue/TransportPackageValueCodecBinary.cs @@ -1,5 +1,4 @@ -using System; -using System.IO; +using System.IO; using System.Runtime.Serialization; using QuixStreams.Kafka.Transport.SerDes.Codecs; using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs; diff --git a/src/QuixStreams.Kafka.Transport/SerDes/PackageDeserializer.cs b/src/QuixStreams.Kafka.Transport/SerDes/PackageDeserializer.cs index 9b476bbf..9abd6d1f 100644 --- a/src/QuixStreams.Kafka.Transport/SerDes/PackageDeserializer.cs +++ b/src/QuixStreams.Kafka.Transport/SerDes/PackageDeserializer.cs @@ -1,5 +1,4 @@ -using System; -using System.Linq; +using System.Linq; using System.Runtime.Serialization; using QuixStreams.Kafka.Transport.SerDes.Codecs; using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs; diff --git a/src/QuixStreams.Kafka.Transport/SerDes/PackageSerializer.cs b/src/QuixStreams.Kafka.Transport/SerDes/PackageSerializer.cs index 510e5b78..589a144d 100644 --- a/src/QuixStreams.Kafka.Transport/SerDes/PackageSerializer.cs +++ b/src/QuixStreams.Kafka.Transport/SerDes/PackageSerializer.cs @@ -1,5 +1,4 @@ -using System.Collections.Generic; -using System.Linq; +using System.Linq; using System.Runtime.Serialization; using QuixStreams.Kafka.Transport.SerDes.Codecs; using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs; diff --git a/src/QuixStreams.Kafka/KafkaConsumer.cs b/src/QuixStreams.Kafka/KafkaConsumer.cs index abeea5d0..96947d28 100644 --- a/src/QuixStreams.Kafka/KafkaConsumer.cs +++ b/src/QuixStreams.Kafka/KafkaConsumer.cs @@ -15,7 +15,7 @@ namespace QuixStreams.Kafka /// public class KafkaConsumer : IKafkaConsumer { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private bool disableKafkaLogsByConnectWorkaround = false; // if enabled, no actual kafka logs should be shown private readonly ConsumerConfig config; diff --git a/src/QuixStreams.Kafka/KafkaProducer.cs b/src/QuixStreams.Kafka/KafkaProducer.cs index 7ea2ae01..9136b0a7 100644 --- a/src/QuixStreams.Kafka/KafkaProducer.cs +++ b/src/QuixStreams.Kafka/KafkaProducer.cs @@ -22,7 +22,7 @@ public class KafkaProducer : IKafkaProducer private readonly object sendLock = new object(); private readonly object openLock = new object(); - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private IDictionary brokerStates = new Dictionary(); private bool checkBrokerStateBeforeSend = false; private bool logOnNextBrokerStateUp = false; diff --git a/src/QuixStreams.ManyStreamTest/StreamingTest.cs b/src/QuixStreams.ManyStreamTest/StreamingTest.cs index b8744ccd..ac00f794 100644 --- a/src/QuixStreams.ManyStreamTest/StreamingTest.cs +++ b/src/QuixStreams.ManyStreamTest/StreamingTest.cs @@ -2,6 +2,7 @@ using System.Threading; using Microsoft.Extensions.Logging; using QuixStreams.Streaming; +using QuixStreams.Streaming.Models; namespace QuixStreams.ManyStreamTest { @@ -40,7 +41,7 @@ public void Run(CancellationToken ct) while (!ct.IsCancellationRequested) { var stream = topicProducer.CreateStream(); - var data = new QuixStreams.Streaming.Models.TimeseriesData(); + var data = new TimeseriesData(); data.AddTimestampNanoseconds(10).AddValue("test", DateTime.UtcNow.ToBinary()); stream.Timeseries.Buffer.Publish(data); stream.Events.AddTimestampNanoseconds(10).AddValue("test1", "val1"); diff --git a/src/QuixStreams.PerformanceTest/PerformanceTest.cs b/src/QuixStreams.PerformanceTest/PerformanceTest.cs index 3dfe8fda..9e297a49 100644 --- a/src/QuixStreams.PerformanceTest/PerformanceTest.cs +++ b/src/QuixStreams.PerformanceTest/PerformanceTest.cs @@ -1,6 +1,5 @@ using System; using System.Threading; -using QuixStreams.Streaming.UnitTests; using QuixStreams.Streaming.UnitTests.Helpers; using QuixStreams.Telemetry.Models; diff --git a/src/QuixStreams.PerformanceTest/Program.cs b/src/QuixStreams.PerformanceTest/Program.cs index 1d56de7c..8082f524 100644 --- a/src/QuixStreams.PerformanceTest/Program.cs +++ b/src/QuixStreams.PerformanceTest/Program.cs @@ -1,7 +1,6 @@ using System; using System.Threading; using QuixStreams.Kafka.Transport.SerDes; -using QuixStreams.Kafka.Transport.SerDes.Legacy.MessageValue; using QuixStreams.Streaming.Utils; using QuixStreams.Telemetry.Models; diff --git a/src/QuixStreams.Speedtest/StreamingTest.cs b/src/QuixStreams.Speedtest/StreamingTest.cs index a7101cf1..33dc7234 100644 --- a/src/QuixStreams.Speedtest/StreamingTest.cs +++ b/src/QuixStreams.Speedtest/StreamingTest.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Threading; using QuixStreams.Streaming; +using QuixStreams.Streaming.Models; namespace QuixStreams.Speedtest { @@ -60,7 +61,7 @@ public void Run(CancellationToken ct) while (!ct.IsCancellationRequested) { - var data = new QuixStreams.Streaming.Models.TimeseriesData(); + var data = new TimeseriesData(); data.AddTimestampNanoseconds(10).AddValue(parameterName, DateTime.UtcNow.ToBinary()); stream.Timeseries.Buffer.Publish(data); } diff --git a/src/QuixStreams.State.ParallelWriteTest/Program.cs b/src/QuixStreams.State.ParallelWriteTest/Program.cs index c554d771..561d975a 100644 --- a/src/QuixStreams.State.ParallelWriteTest/Program.cs +++ b/src/QuixStreams.State.ParallelWriteTest/Program.cs @@ -36,7 +36,7 @@ private static void RunParallelWriteTest(IStateStorage storage) int counter = 0; - var threads = new List(); + var threads = new List(); for (var i = 0; i < 15; ++i) { var thread = new Thread(() => diff --git a/src/QuixStreams.State.UnitTests/RocksDbStorageShould.cs b/src/QuixStreams.State.UnitTests/RocksDbStorageShould.cs index c0a6d42c..5fb2833c 100644 --- a/src/QuixStreams.State.UnitTests/RocksDbStorageShould.cs +++ b/src/QuixStreams.State.UnitTests/RocksDbStorageShould.cs @@ -1,10 +1,10 @@ -using FluentAssertions; -using Xunit; -using QuixStreams.State.Storage; -using System.Threading.Tasks; using System; using System.IO; +using System.Threading.Tasks; +using FluentAssertions; +using QuixStreams.State.Storage; using RocksDbSharp; +using Xunit; namespace QuixStreams.State.UnitTests { @@ -263,7 +263,7 @@ public void Dispose() { // Cleanup storage.Dispose(); - System.IO.Directory.Delete(this.dbDirectory, true); + Directory.Delete(this.dbDirectory, true); } } } \ No newline at end of file diff --git a/src/QuixStreams.State.UnitTests/Serializers/ByteValueSerializerShould.cs b/src/QuixStreams.State.UnitTests/Serializers/ByteValueSerializerShould.cs index 4e4b3be3..a80eea36 100644 --- a/src/QuixStreams.State.UnitTests/Serializers/ByteValueSerializerShould.cs +++ b/src/QuixStreams.State.UnitTests/Serializers/ByteValueSerializerShould.cs @@ -1,5 +1,4 @@ -using System; -using FluentAssertions; +using FluentAssertions; using QuixStreams.State.Serializers; using Xunit; diff --git a/src/QuixStreams.State/ScalarState.cs b/src/QuixStreams.State/ScalarState.cs index 84c47e85..3c0a3e85 100644 --- a/src/QuixStreams.State/ScalarState.cs +++ b/src/QuixStreams.State/ScalarState.cs @@ -1,8 +1,4 @@ using System; -using System.Collections; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Newtonsoft.Json; diff --git a/src/QuixStreams.Streaming.IntegrationTests/KafkaDockerTestFixture.cs b/src/QuixStreams.Streaming.IntegrationTests/KafkaDockerTestFixture.cs index 216a3614..c24a29f5 100644 --- a/src/QuixStreams.Streaming.IntegrationTests/KafkaDockerTestFixture.cs +++ b/src/QuixStreams.Streaming.IntegrationTests/KafkaDockerTestFixture.cs @@ -1,15 +1,9 @@ using System; -using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using Confluent.Kafka.Admin; -using Ductus.FluentDocker.Builders; -using Ductus.FluentDocker.Extensions; -using Ductus.FluentDocker.Services; using QuixStreams.IntegrationTestBase; -using QuixStreams.Streaming.Configuration; using Xunit; -using Xunit.Abstractions; namespace QuixStreams.Streaming.IntegrationTests { diff --git a/src/QuixStreams.Streaming.IntegrationTests/KafkaStreamingClientIntegrationTests.cs b/src/QuixStreams.Streaming.IntegrationTests/KafkaStreamingClientIntegrationTests.cs index a2b7b101..9c5bb5e3 100644 --- a/src/QuixStreams.Streaming.IntegrationTests/KafkaStreamingClientIntegrationTests.cs +++ b/src/QuixStreams.Streaming.IntegrationTests/KafkaStreamingClientIntegrationTests.cs @@ -7,11 +7,10 @@ using System.Threading.Tasks; using Confluent.Kafka; using FluentAssertions; -using FluentAssertions.Extensions; using Quix.TestBase.Extensions; using QuixStreams.Kafka; -using QuixStreams.Telemetry; using QuixStreams.Streaming.Models; +using QuixStreams.Telemetry; using QuixStreams.Telemetry.Models; using QuixStreams.Telemetry.Models.Utility; using RocksDbSharp; @@ -34,7 +33,7 @@ public KafkaStreamingClientIntegrationTests(ITestOutputHelper output, KafkaDocke { this.output = output; this.kafkaDockerTestFixture = kafkaDockerTestFixture; - QuixStreams.Logging.Factory = output.CreateLoggerFactory(); + Logging.Factory = output.CreateLoggerFactory(); client = new KafkaStreamingClient(kafkaDockerTestFixture.BrokerList, null); output.WriteLine($"Created client with brokerlist '{kafkaDockerTestFixture.BrokerList}'"); } diff --git a/src/QuixStreams.Streaming.IntegrationTests/StreamingRawIntegrationTests.cs b/src/QuixStreams.Streaming.IntegrationTests/StreamingRawIntegrationTests.cs index de7fba11..ee492bd4 100644 --- a/src/QuixStreams.Streaming.IntegrationTests/StreamingRawIntegrationTests.cs +++ b/src/QuixStreams.Streaming.IntegrationTests/StreamingRawIntegrationTests.cs @@ -7,7 +7,6 @@ using FluentAssertions; using Quix.TestBase.Extensions; using QuixStreams.Kafka; -using QuixStreams; using Xunit; using Xunit.Abstractions; using AutoOffsetReset = QuixStreams.Telemetry.Kafka.AutoOffsetReset; @@ -25,7 +24,7 @@ public StreamingRawIntegrationTests(ITestOutputHelper output, KafkaDockerTestFix { this.output = output; this.kafkaDockerTestFixture = kafkaDockerTestFixture; - QuixStreams.Logging.Factory = output.CreateLoggerFactory(); + Logging.Factory = output.CreateLoggerFactory(); client = new KafkaStreamingClient(kafkaDockerTestFixture.BrokerList, null); } diff --git a/src/QuixStreams.Streaming.UnitTests/Models/StreamEventsConsumerShould.cs b/src/QuixStreams.Streaming.UnitTests/Models/StreamEventsConsumerShould.cs index 15290d17..2f42ebfa 100644 --- a/src/QuixStreams.Streaming.UnitTests/Models/StreamEventsConsumerShould.cs +++ b/src/QuixStreams.Streaming.UnitTests/Models/StreamEventsConsumerShould.cs @@ -2,9 +2,12 @@ using System.Collections.Generic; using FluentAssertions; using NSubstitute; +using QuixStreams.Streaming.Models; +using QuixStreams.Streaming.Models.StreamConsumer; using QuixStreams.Streaming.UnitTests.Helpers; using QuixStreams.Telemetry.Models; using Xunit; +using EventDefinition = QuixStreams.Telemetry.Models.EventDefinition; namespace QuixStreams.Streaming.UnitTests.Models { @@ -17,8 +20,8 @@ public void Receive_EventData_ShouldRaiseExpectedOnReceiveEvents() // Arrange var streamConsumer = Substitute.For(); - var receivedData = new List(); - var eventsReader = new QuixStreams.Streaming.Models.StreamConsumer.StreamEventsConsumer(new TestStreamingClient().GetTopicConsumer(), streamConsumer); + var receivedData = new List(); + var eventsReader = new StreamEventsConsumer(new TestStreamingClient().GetTopicConsumer(), streamConsumer); eventsReader.OnDataReceived += (sender, args) => { @@ -28,7 +31,7 @@ public void Receive_EventData_ShouldRaiseExpectedOnReceiveEvents() //Act for (var i = 0; i < NumberEventsTest; i++) { - var eventData = new QuixStreams.Streaming.Models.EventData($"event{i}", 100 * i, $"test_event_value{i}") + var eventData = new EventData($"event{i}", 100 * i, $"test_event_value{i}") .AddTag($"tag{i}", $"{i}"); streamConsumer.OnEventData += Raise.Event>(streamConsumer, eventData.ConvertToEventDataRaw()); @@ -53,7 +56,7 @@ public void Receive_Definitions_ShouldUpdateDefinitionsProperly() { // Arrange var streamConsumer = Substitute.For(); - var eventsReader = new QuixStreams.Streaming.Models.StreamConsumer.StreamEventsConsumer(new TestStreamingClient().GetTopicConsumer(), streamConsumer); + var eventsReader = new StreamEventsConsumer(new TestStreamingClient().GetTopicConsumer(), streamConsumer); var eventDefinitions = new EventDefinitions { @@ -139,9 +142,9 @@ public void Receive_Definitions_ShouldUpdateDefinitionsProperly() } }; - var expectedDefinitions = new List + var expectedDefinitions = new List { - new QuixStreams.Streaming.Models.EventDefinition + new Streaming.Models.EventDefinition { Id = "Event1", Name = "Event One", @@ -150,33 +153,33 @@ public void Receive_Definitions_ShouldUpdateDefinitionsProperly() Location = "", Level = EventLevel.Critical }, - new QuixStreams.Streaming.Models.EventDefinition + new Streaming.Models.EventDefinition { Id = "event2", Location = "/some/nested/group" }, - new QuixStreams.Streaming.Models.EventDefinition + new Streaming.Models.EventDefinition { Id = "event3", Location = "/some/nested/group" }, - new QuixStreams.Streaming.Models.EventDefinition + new Streaming.Models.EventDefinition { Id = "event4", Location = "/some/nested/group" }, - new QuixStreams.Streaming.Models.EventDefinition + new Streaming.Models.EventDefinition { Id = "event5", Location = "/some/nested/group2" }, - new QuixStreams.Streaming.Models.EventDefinition + new Streaming.Models.EventDefinition { Id = "event6", Location = "/some/nested/group2" }, - new QuixStreams.Streaming.Models.EventDefinition + new Streaming.Models.EventDefinition { Id = "event7", Location = "/some/nested/group2/startswithtest" diff --git a/src/QuixStreams.Streaming.UnitTests/Models/StreamEventsPublisherShould.cs b/src/QuixStreams.Streaming.UnitTests/Models/StreamEventsPublisherShould.cs index e53ed56e..69863b9b 100644 --- a/src/QuixStreams.Streaming.UnitTests/Models/StreamEventsPublisherShould.cs +++ b/src/QuixStreams.Streaming.UnitTests/Models/StreamEventsPublisherShould.cs @@ -3,9 +3,12 @@ using System.Linq; using FluentAssertions; using NSubstitute; +using QuixStreams.Streaming.Models; +using QuixStreams.Streaming.Models.StreamProducer; using QuixStreams.Telemetry.Models; using QuixStreams.Telemetry.Models.Utility; using Xunit; +using EventDefinition = QuixStreams.Telemetry.Models.EventDefinition; namespace QuixStreams.Streaming.UnitTests.Models { @@ -21,7 +24,7 @@ public void Events_AddValue_ShouldPublishExpected() streamProducer.Publish(Arg.Do(x=> sentData.Add(x))); streamProducer.Publish(Arg.Do>(x => sentData.AddRange(x.ToList()))); - var eventsWriter = new QuixStreams.Streaming.Models.StreamProducer.StreamEventsProducer(streamProducer); + var eventsWriter = new StreamEventsProducer(streamProducer); var epoch = new DateTime(2000, 01, 01); eventsWriter.Epoch = epoch; @@ -78,7 +81,7 @@ public void Events_AddValuesWithTags_ShouldPublishExpected() streamProducer.Publish(Arg.Do(x => sentData.Add(x))); streamProducer.Publish(Arg.Do>(x => sentData.AddRange(x.ToList()))); - var eventsProducer = new QuixStreams.Streaming.Models.StreamProducer.StreamEventsProducer(streamProducer); + var eventsProducer = new StreamEventsProducer(streamProducer); var epoch = new DateTime(2000, 01, 01); eventsProducer.Epoch = epoch; @@ -156,7 +159,7 @@ public void Events_AddValueWithMixedTimestamps_ShouldPublishExpected() streamProducer.Publish(Arg.Do(x => sentData.Add(x))); streamProducer.Publish(Arg.Do>(x => sentData.AddRange(x.ToList()))); - var eventsWriter = new QuixStreams.Streaming.Models.StreamProducer.StreamEventsProducer(streamProducer); + var eventsWriter = new StreamEventsProducer(streamProducer); var epoch = new DateTime(2000, 01, 01); eventsWriter.Epoch = epoch; @@ -213,22 +216,22 @@ public void Events_WriteDirectWithEventDataInstancesAndDefaultEpoch_ShouldPublis streamProducer.Publish(Arg.Do(x => sentData.Add(x))); streamProducer.Publish(Arg.Do>(x => sentData.AddRange(x.ToList()))); - var eventsWriter = new QuixStreams.Streaming.Models.StreamProducer.StreamEventsProducer(streamProducer); + var eventsWriter = new StreamEventsProducer(streamProducer); var epoch = new DateTime(2000, 01, 01); eventsWriter.Epoch = epoch; // Act - var data1 = new QuixStreams.Streaming.Models.EventData( "test_param1", new DateTime(1999, 01, 01), "1"); - var data2 = new QuixStreams.Streaming.Models.EventData("test_param2", new DateTime(1999, 01, 01), "2"); - eventsWriter.Publish(new QuixStreams.Streaming.Models.EventData[] { data1, data2 }); + var data1 = new EventData( "test_param1", new DateTime(1999, 01, 01), "1"); + var data2 = new EventData("test_param2", new DateTime(1999, 01, 01), "2"); + eventsWriter.Publish(new EventData[] { data1, data2 }); eventsWriter.DefaultTags["default1"] = "value1"; eventsWriter.DefaultTags["default2"] = "value2"; - var data3 = new QuixStreams.Streaming.Models.EventData("test_param2", new TimeSpan(01, 02, 03), "2").AddTag("extraTag", "value1"); + var data3 = new EventData("test_param2", new TimeSpan(01, 02, 03), "2").AddTag("extraTag", "value1"); eventsWriter.Publish(data3); - var data4 = new QuixStreams.Streaming.Models.EventData("test_param3", 300, "3"); + var data4 = new EventData("test_param3", 300, "3"); eventsWriter.Publish(data4); // Assert @@ -281,12 +284,12 @@ public void Events_WriteTwice_ShouldUpdateTimestampsWithEpochOnFirstWrite() streamProducer.Publish(Arg.Do(x => sentData.Add(x))); streamProducer.Publish(Arg.Do>(x => sentData.AddRange(x.ToList()))); - var eventsWriter = new QuixStreams.Streaming.Models.StreamProducer.StreamEventsProducer(streamProducer); + var eventsWriter = new StreamEventsProducer(streamProducer); var epoch = new DateTime(2000, 01, 01); eventsWriter.Epoch = epoch; // Act - var data1 = new QuixStreams.Streaming.Models.EventData("test_param2", new TimeSpan(01, 02, 03), "2").AddTag("extraTag", "value1"); + var data1 = new EventData("test_param2", new TimeSpan(01, 02, 03), "2").AddTag("extraTag", "value1"); eventsWriter.Publish(data1); eventsWriter.Publish(data1); @@ -322,7 +325,7 @@ public void AddDefinition_WithLocation_ShouldPublishExpectedDefinitions() var streamProducer = Substitute.For(); var sentDefinitions = new List(); streamProducer.Publish(Arg.Do(x => sentDefinitions.Add(x))); - var eventsWriter = new QuixStreams.Streaming.Models.StreamProducer.StreamEventsProducer(streamProducer); + var eventsWriter = new StreamEventsProducer(streamProducer); // Act eventsWriter.DefaultLocation = ""; // root diff --git a/src/QuixStreams.Streaming.UnitTests/Models/StreamPropertiesProducerShould.cs b/src/QuixStreams.Streaming.UnitTests/Models/StreamPropertiesProducerShould.cs index b3cee255..61802f7b 100644 --- a/src/QuixStreams.Streaming.UnitTests/Models/StreamPropertiesProducerShould.cs +++ b/src/QuixStreams.Streaming.UnitTests/Models/StreamPropertiesProducerShould.cs @@ -2,7 +2,6 @@ using System.Threading.Tasks; using NSubstitute; using Quix.TestBase.Extensions; -using QuixStreams; using QuixStreams.Streaming.Models.StreamProducer; using Xunit; using Xunit.Abstractions; @@ -25,7 +24,7 @@ public StreamPropertiesProducerShould(ITestOutputHelper outputHelper) outputHelper.WriteLine(ci.Arg(), ci.Arg()); }); - QuixStreams.Logging.Factory = outputHelper.CreateLoggerFactory(); + Logging.Factory = outputHelper.CreateLoggerFactory(); } [Fact] diff --git a/src/QuixStreams.Streaming.UnitTests/Models/StreamTimeseriesConsumerShould.cs b/src/QuixStreams.Streaming.UnitTests/Models/StreamTimeseriesConsumerShould.cs index d4bc84a6..2ea84c70 100644 --- a/src/QuixStreams.Streaming.UnitTests/Models/StreamTimeseriesConsumerShould.cs +++ b/src/QuixStreams.Streaming.UnitTests/Models/StreamTimeseriesConsumerShould.cs @@ -2,9 +2,12 @@ using System.Collections.Generic; using FluentAssertions; using NSubstitute; +using QuixStreams.Streaming.Models; +using QuixStreams.Streaming.Models.StreamConsumer; using QuixStreams.Streaming.UnitTests.Helpers; using QuixStreams.Telemetry.Models; using Xunit; +using ParameterDefinition = QuixStreams.Telemetry.Models.ParameterDefinition; namespace QuixStreams.Streaming.UnitTests.Models { @@ -19,8 +22,8 @@ public void Receive_TimeseriesData_ShouldRaiseExpectedOnReceivedEvents() // Arrange var streamConsumer = Substitute.For(); - var receivedData = new List(); - var parametersReader = new QuixStreams.Streaming.Models.StreamConsumer.StreamTimeseriesConsumer(new TestStreamingClient().GetTopicConsumer(), streamConsumer); + var receivedData = new List(); + var parametersReader = new StreamTimeseriesConsumer(new TestStreamingClient().GetTopicConsumer(), streamConsumer); var buffer = parametersReader.CreateBuffer(); buffer.OnDataReleased += (sender, args) => @@ -32,7 +35,7 @@ public void Receive_TimeseriesData_ShouldRaiseExpectedOnReceivedEvents() //Act for (var i = 1; i <= NumberTimestampsTest; i++) { - var timeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var timeseriesData = new TimeseriesData(); timeseriesData.AddTimestampNanoseconds(100 * i) .AddValue($"test_numeric_param{i}", i) .AddValue($"test_string_param{i}", $"{i}") @@ -61,7 +64,7 @@ public void Receive_Definitions_ShouldUpdateDefinitionsProperly() { // Arrange var streamConsumer = Substitute.For(); - var parametersReader = new QuixStreams.Streaming.Models.StreamConsumer.StreamTimeseriesConsumer(new TestStreamingClient().GetTopicConsumer(), streamConsumer); + var parametersReader = new StreamTimeseriesConsumer(new TestStreamingClient().GetTopicConsumer(), streamConsumer); var parameterDefinitions = new ParameterDefinitions { @@ -150,9 +153,9 @@ public void Receive_Definitions_ShouldUpdateDefinitionsProperly() } }; - var expectedDefinitions = new List + var expectedDefinitions = new List { - new QuixStreams.Streaming.Models.ParameterDefinition + new Streaming.Models.ParameterDefinition { Id = "Param1", Name = "Parameter One", @@ -164,33 +167,33 @@ public void Receive_Definitions_ShouldUpdateDefinitionsProperly() CustomProperties = "custom prop", Location = "" }, - new QuixStreams.Streaming.Models.ParameterDefinition + new Streaming.Models.ParameterDefinition { Id = "param2", Location = "/some/nested/group" }, - new QuixStreams.Streaming.Models.ParameterDefinition + new Streaming.Models.ParameterDefinition { Id = "param3", Location = "/some/nested/group" }, - new QuixStreams.Streaming.Models.ParameterDefinition + new Streaming.Models.ParameterDefinition { Id = "param4", Location = "/some/nested/group" }, - new QuixStreams.Streaming.Models.ParameterDefinition + new Streaming.Models.ParameterDefinition { Id = "param5", Location = "/some/nested/group2" }, - new QuixStreams.Streaming.Models.ParameterDefinition + new Streaming.Models.ParameterDefinition { Id = "param6", Location = "/some/nested/group2" }, - new QuixStreams.Streaming.Models.ParameterDefinition + new Streaming.Models.ParameterDefinition { Id = "param7", Location = "/some/nested/group2/startswithtest" diff --git a/src/QuixStreams.Streaming.UnitTests/Models/StreamTimeseriesProducerShould.cs b/src/QuixStreams.Streaming.UnitTests/Models/StreamTimeseriesProducerShould.cs index 2e4a0099..7bf6f5be 100644 --- a/src/QuixStreams.Streaming.UnitTests/Models/StreamTimeseriesProducerShould.cs +++ b/src/QuixStreams.Streaming.UnitTests/Models/StreamTimeseriesProducerShould.cs @@ -2,11 +2,15 @@ using System.Collections.Generic; using System.Linq; using FluentAssertions; +using FluentAssertions.Common; using FluentAssertions.Equivalency; using NSubstitute; +using QuixStreams.Streaming.Models; +using QuixStreams.Streaming.Models.StreamProducer; using QuixStreams.Telemetry.Models; using QuixStreams.Telemetry.Models.Utility; using Xunit; +using ParameterDefinition = QuixStreams.Telemetry.Models.ParameterDefinition; namespace QuixStreams.Streaming.UnitTests.Models { @@ -22,7 +26,7 @@ public void AddValue_NumericAndString_ShouldPublishExpected() var sentData = new List(); streamProducer.Publish(Arg.Do(x=> sentData.Add(x))); - var parametersProducer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var parametersProducer = new StreamTimeseriesProducer(topicProducer, streamProducer); parametersProducer.Buffer.BufferTimeout = null; parametersProducer.Buffer.PacketSize = 100; var epoch = new DateTime(2000, 01, 01); @@ -71,7 +75,7 @@ public void AddValue_WithVariableTags_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x=> sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; writer.Buffer.PacketSize = 100; var epoch = new DateTime(2000, 01, 01); @@ -128,14 +132,14 @@ public void AddValue_WithVariableBinaryParams_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; writer.Buffer.PacketSize = 100; var epoch = new DateTime(2000, 01, 01); writer.Buffer.Epoch = epoch; // Act - var timeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var timeseriesData = new TimeseriesData(); timeseriesData.AddTimestampNanoseconds(100) .AddValue("test_param", new byte[] {1,2,3}); timeseriesData.AddTimestampNanoseconds(200) @@ -173,14 +177,14 @@ public void AddValue_WithVariableNumericParams_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; writer.Buffer.PacketSize = 100; var epoch = new DateTime(2000, 01, 01); writer.Buffer.Epoch = epoch; // Act - var timeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var timeseriesData = new TimeseriesData(); timeseriesData.AddTimestampNanoseconds(100) .AddValue("test_param", 1); timeseriesData.AddTimestampNanoseconds(200) @@ -217,14 +221,14 @@ public void AddValue_WithVariableStringParams_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; writer.Buffer.PacketSize = 100; var epoch = new DateTime(2000, 01, 01); writer.Buffer.Epoch = epoch; // Act - var timeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var timeseriesData = new TimeseriesData(); timeseriesData.AddTimestampNanoseconds(100) .AddValue("test_param", "one"); timeseriesData.AddTimestampNanoseconds(200) @@ -260,13 +264,13 @@ public void AddValue_WithDateTime_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; var epoch = new DateTime(2000, 01, 01); writer.Buffer.Epoch = epoch; // Act - var TimeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var TimeseriesData = new TimeseriesData(); TimeseriesData.AddTimestamp(new DateTime( 2020, 01, 01, 1, 2, 3)) .AddValue("test_param", "one"); writer.Buffer.Publish(TimeseriesData); @@ -296,13 +300,13 @@ public void AddValue_WithTimeSpan_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; var epoch = new DateTime(2000, 01, 01); writer.Buffer.Epoch = epoch; // Act - var timeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var timeseriesData = new TimeseriesData(); timeseriesData.AddTimestamp(new TimeSpan(2, 3, 4, 5)) .AddValue("test_param", "one"); writer.Buffer.Publish(timeseriesData); @@ -332,13 +336,13 @@ public void AddValue_WithNanoseconds_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; var epoch = new DateTime(2000, 01, 01); writer.Buffer.Epoch = epoch; // Act - var timeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var timeseriesData = new TimeseriesData(); timeseriesData.AddTimestampNanoseconds(1231123) .AddValue("test_param", "one"); writer.Buffer.Publish(timeseriesData); @@ -368,14 +372,14 @@ public void AddValue_WithMilliseconds_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; writer.Buffer.PacketSize = 100; var epoch = new DateTime(2000, 01, 01); writer.Buffer.Epoch = epoch; // Act - var TimeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var TimeseriesData = new TimeseriesData(); TimeseriesData.AddTimestampMilliseconds(1231123) .AddValue("test_param", "one"); writer.Buffer.Publish(TimeseriesData); @@ -405,13 +409,13 @@ public void AddValue_WithMultipleEpoch_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; writer.Buffer.PacketSize = 100; writer.Buffer.Epoch = new DateTime(2000, 01, 01); // Act - var timeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var timeseriesData = new TimeseriesData(); timeseriesData.AddTimestampNanoseconds(100) .AddValue("test_param1", 1); timeseriesData.AddTimestampNanoseconds(200) @@ -472,14 +476,14 @@ public void PublishData_ToBufferTwice_ShouldUpdateTimestampsWithEpochOnFirstWrit var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var parametersProducer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var parametersProducer = new StreamTimeseriesProducer(topicProducer, streamProducer); parametersProducer.Buffer.BufferTimeout = null; parametersProducer.Buffer.PacketSize = 100; var epoch = new DateTime(2000, 01, 01); parametersProducer.Buffer.Epoch = epoch; // Act - var timeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var timeseriesData = new TimeseriesData(); timeseriesData.AddTimestampNanoseconds(1231123) .AddValue("test_param", "one"); @@ -512,12 +516,12 @@ public void PublishData_DirectlyToWriterWithEpoch_ShouldPublishExpected() var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var parameters = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var parameters = new StreamTimeseriesProducer(topicProducer, streamProducer); var epoch = new DateTime(2000, 01, 01); streamProducer.Epoch = epoch; // Act - var TimeseriesData = new QuixStreams.Streaming.Models.TimeseriesData(); + var TimeseriesData = new TimeseriesData(); TimeseriesData.AddTimestampNanoseconds(1231123) .AddValue("test_param", "one"); @@ -546,14 +550,14 @@ public void AddValue_WithTags_AndConvertTo3rdLayerAgain_ShouldResultWithoutNullT var streamProducer = Substitute.For(); var sentData = new List(); streamProducer.Publish(Arg.Do(x => sentData.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = null; writer.Buffer.PacketSize = 100; var epoch = new DateTime(2000, 01, 01); writer.Buffer.Epoch = epoch; // Act - var incomingData = new QuixStreams.Streaming.Models.TimeseriesData(); + var incomingData = new TimeseriesData(); incomingData.AddTimestampNanoseconds(100) .AddValue("test_param", 1) .AddTag("tag1", "tag1val1"); @@ -592,8 +596,8 @@ public void AddValue_WithTags_AndConvertTo3rdLayerAgain_ShouldResultWithoutNullT }); // Act - var data = new QuixStreams.Streaming.Models.TimeseriesData(timeseriesDataRaw); - data.Should().BeEquivalentTo(incomingData, options => options.Including(info => info.WhichGetterHas(FluentAssertions.Common.CSharpAccessModifier.Public))); + var data = new TimeseriesData(timeseriesDataRaw); + data.Should().BeEquivalentTo(incomingData, options => options.Including(info => info.WhichGetterHas(CSharpAccessModifier.Public))); } @@ -605,7 +609,7 @@ public void AddDefinition_WithLocation_ShouldPublishExpectedDefinitions() var streamProducer = Substitute.For(); var sentDefinitions = new List(); streamProducer.Publish(Arg.Do(x => sentDefinitions.Add(x))); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); writer.Buffer.BufferTimeout = 500000; // maybe implement disable? // Act @@ -720,7 +724,7 @@ public void AddDefinition_WithIncorrectRanges_ShouldRaiseExceptions() // Arrange var topicProducer = Substitute.For(); var streamProducer = Substitute.For(); - var writer = new QuixStreams.Streaming.Models.StreamProducer.StreamTimeseriesProducer(topicProducer, streamProducer); + var writer = new StreamTimeseriesProducer(topicProducer, streamProducer); // Act Action action1 = () => writer.AddDefinition("Param1").SetRange(10, -10); diff --git a/src/QuixStreams.Streaming.UnitTests/Models/TimeseriesBufferShould.cs b/src/QuixStreams.Streaming.UnitTests/Models/TimeseriesBufferShould.cs index 9c2f9a84..f94b42c2 100644 --- a/src/QuixStreams.Streaming.UnitTests/Models/TimeseriesBufferShould.cs +++ b/src/QuixStreams.Streaming.UnitTests/Models/TimeseriesBufferShould.cs @@ -3,7 +3,6 @@ using System.Threading; using FluentAssertions; using Quix.TestBase.Extensions; -using QuixStreams; using QuixStreams.Streaming.Models; using Xunit; using Xunit.Abstractions; @@ -14,7 +13,7 @@ public class TimeseriesBufferShould { public TimeseriesBufferShould(ITestOutputHelper helper) { - QuixStreams.Logging.Factory = helper.CreateLoggerFactory(); + Logging.Factory = helper.CreateLoggerFactory(); } [Fact] @@ -34,7 +33,7 @@ public void WriteData_WithDisabledConfiguration_ShouldRaiseOnReceiveEventsStraig CustomTriggerBeforeEnqueue = null }; var buffer = new TimeseriesBuffer(bufferConfiguration); - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { @@ -65,7 +64,7 @@ public void WriteData_WithData_FlushOnDispose() CustomTriggerBeforeEnqueue = null }; var buffer = new TimeseriesBuffer(bufferConfiguration); - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { @@ -105,7 +104,7 @@ public void WriteData_WithPacketSizeConfiguration_ShouldRaiseProperOnReceiveEven if (initialConfig) bufferConfiguration.PacketSize = 2; var buffer = new TimeseriesBuffer(bufferConfiguration); if (!initialConfig) buffer.PacketSize = 2; - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { @@ -140,7 +139,7 @@ public void WriteData_WithTimeSpanMsConfiguration_ShouldRaiseProperOnReceiveEven if (initialConfig) bufferConfiguration.TimeSpanInMilliseconds = 200; var buffer = new TimeseriesBuffer(bufferConfiguration); if (!initialConfig) buffer.TimeSpanInMilliseconds = 200; - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { @@ -176,7 +175,7 @@ public void WriteData_WithTimeSpanNsConfiguration_ShouldRaiseProperOnReceiveEven if (initialConfig) bufferConfiguration.TimeSpanInNanoseconds = 200 * (long) 1e6; var buffer = new TimeseriesBuffer(bufferConfiguration); if (!initialConfig) buffer.TimeSpanInNanoseconds = 200 * (long) 1e6; - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { @@ -207,7 +206,7 @@ public void WriteData_WithTimeSpanAndBufferTimeoutConfiguration_ShouldRaisePrope CustomTriggerBeforeEnqueue = null }; var buffer = new TimeseriesBuffer(bufferConfiguration); - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { @@ -247,7 +246,7 @@ public void WriteData_WithFilterConfiguration_ShouldRaiseProperOnReceiveEvents(b if (initialConfig) bufferConfiguration.Filter = (timestamp) => timestamp.Parameters["param2"].NumericValue == 2; var buffer = new TimeseriesBuffer(bufferConfiguration); if (!initialConfig) buffer.Filter = (timestamp) => timestamp.Parameters["param2"].NumericValue == 2; - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { @@ -283,7 +282,7 @@ public void WriteData_WithBufferTimeout_ShouldRaiseProperOnReceiveEvents(bool in if (initialConfig) bufferConfiguration.BufferTimeout = 100; var buffer = new TimeseriesBuffer(bufferConfiguration); if (!initialConfig) buffer.BufferTimeout = 100; - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { @@ -319,7 +318,7 @@ public void WriteData_WithCustomTriggerConfiguration_ShouldRaiseProperOnReceiveE if (initialConfig) bufferConfiguration.CustomTrigger = (fdata) => fdata.Timestamps.Count == 2; var buffer = new TimeseriesBuffer(bufferConfiguration); if (!initialConfig) buffer.CustomTrigger = (fdata) => fdata.Timestamps.Count == 2; - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { @@ -356,7 +355,7 @@ public void WriteData_WithCustomTriggerBeforeEnqueueConfiguration_ShouldRaisePro if (initialConfig) bufferConfiguration.CustomTriggerBeforeEnqueue = timestamp => timestamp.Tags["tag2"] == "value2"; var buffer = new TimeseriesBuffer(bufferConfiguration); if (!initialConfig) buffer.CustomTriggerBeforeEnqueue = timestamp => timestamp.Tags["tag2"] == "value2"; - var receivedData = new List(); + var receivedData = new List(); buffer.OnDataReleased += (sender, args) => { diff --git a/src/QuixStreams.Streaming.UnitTests/Models/TimeseriesDataShould.cs b/src/QuixStreams.Streaming.UnitTests/Models/TimeseriesDataShould.cs index 7da685be..d9d5a8f9 100644 --- a/src/QuixStreams.Streaming.UnitTests/Models/TimeseriesDataShould.cs +++ b/src/QuixStreams.Streaming.UnitTests/Models/TimeseriesDataShould.cs @@ -3,8 +3,10 @@ using System.Linq; using System.Text; using FluentAssertions; +using FluentAssertions.Common; using FluentAssertions.Equivalency; using QuixStreams.Streaming.Models; +using QuixStreams.Telemetry.Models; using Xunit; namespace QuixStreams.Streaming.UnitTests.Models @@ -89,7 +91,7 @@ public void Copy_WithParameterFilter_OnTelemetryData_ShouldCreateInstanceAsExpec data.rawData.StringValues.Remove("param4"); data.Timestamps.RemoveAt(1); - data.Should().BeEquivalentTo(filtered, options => options.Including(info => info.WhichGetterHas(FluentAssertions.Common.CSharpAccessModifier.Public))); + data.Should().BeEquivalentTo(filtered, options => options.Including(info => info.WhichGetterHas(CSharpAccessModifier.Public))); } [Fact] @@ -168,7 +170,7 @@ public void ConvertToTelemetrysData_WithDuplicatedTimestamps_ShouldCreateInstanc public void LoadFromTimeseriesData_WithDuplicatedTimestamps_ShouldCreateInstanceAsExpected() { // Arrange - var dataDuplicatedTimestamps = new QuixStreams.Telemetry.Models.TimeseriesDataRaw() + var dataDuplicatedTimestamps = new TimeseriesDataRaw() { Epoch = 0, Timestamps = new long[] { 100, 100, 100, 200, 200 }, @@ -188,7 +190,7 @@ public void LoadFromTimeseriesData_WithDuplicatedTimestamps_ShouldCreateInstance } }; - var dataWithoutDuplicatedTimestamps = new QuixStreams.Telemetry.Models.TimeseriesDataRaw() + var dataWithoutDuplicatedTimestamps = new TimeseriesDataRaw() { Epoch = 0, Timestamps = new long[] { 100 , 200, 200 }, @@ -214,14 +216,14 @@ public void LoadFromTimeseriesData_WithDuplicatedTimestamps_ShouldCreateInstance var data2 = new TimeseriesData(dataWithoutDuplicatedTimestamps); // Assert - data1.Should().BeEquivalentTo(data2, options => options.Including(info => info.WhichGetterHas(FluentAssertions.Common.CSharpAccessModifier.Public))); + data1.Should().BeEquivalentTo(data2, options => options.Including(info => info.WhichGetterHas(CSharpAccessModifier.Public))); } [Fact] public void LoadFromTimeseriesData_WithNullValues_ShouldCreateInstanceAsExpected() { // Arrange - var dataWithNulls = new QuixStreams.Telemetry.Models.TimeseriesDataRaw() + var dataWithNulls = new TimeseriesDataRaw() { Epoch = 0, Timestamps = new long[] { 100, 200, 300, 400, 500 }, @@ -336,7 +338,7 @@ public void TimeseriesDataWithEpoch_TimeseriesDataWithoutEpoch_EqualityCompariso dataEpoch.Should().BeEquivalentTo(dataNoEpoch); } - private static QuixStreams.Streaming.Models.TimeseriesData GenerateTimeseriesData(int offset, int amount, int capacity = 0, long epoch = 0, bool includeEpoch = false) + private static TimeseriesData GenerateTimeseriesData(int offset, int amount, int capacity = 0, long epoch = 0, bool includeEpoch = false) { var tsdata = new TimeseriesData(); if (capacity > 0) diff --git a/src/QuixStreams.Streaming.UnitTests/StreamProducerShould.cs b/src/QuixStreams.Streaming.UnitTests/StreamProducerShould.cs index 7dd545dd..c6c63404 100644 --- a/src/QuixStreams.Streaming.UnitTests/StreamProducerShould.cs +++ b/src/QuixStreams.Streaming.UnitTests/StreamProducerShould.cs @@ -3,7 +3,6 @@ using System.Runtime.Serialization; using FluentAssertions; using NSubstitute; -using QuixStreams.Kafka.Transport.SerDes; using QuixStreams.Kafka.Transport.Tests.Helpers; using QuixStreams.Streaming.Models; using QuixStreams.Streaming.UnitTests.Helpers; diff --git a/src/QuixStreams.Streaming.UnitTests/StreamingClientTests.cs b/src/QuixStreams.Streaming.UnitTests/StreamingClientTests.cs index b26049e8..dae319d4 100644 --- a/src/QuixStreams.Streaming.UnitTests/StreamingClientTests.cs +++ b/src/QuixStreams.Streaming.UnitTests/StreamingClientTests.cs @@ -3,10 +3,8 @@ using System.Linq; using System.Text; using System.Threading; -using System.Threading.Tasks; using FluentAssertions; using Quix.TestBase.Extensions; -using QuixStreams; using QuixStreams.Streaming.UnitTests.Helpers; using QuixStreams.Telemetry.Models; using Xunit; @@ -18,7 +16,7 @@ public class StreamingClientTests { public StreamingClientTests(ITestOutputHelper helper) { - QuixStreams.Logging.Factory = helper.CreateLoggerFactory(); + Logging.Factory = helper.CreateLoggerFactory(); } private static TimeseriesDataRaw GenerateTimeseriesData(int offset) diff --git a/src/QuixStreams.Streaming/App.cs b/src/QuixStreams.Streaming/App.cs index 6619b3b4..82e4a96d 100644 --- a/src/QuixStreams.Streaming/App.cs +++ b/src/QuixStreams.Streaming/App.cs @@ -48,7 +48,7 @@ private enum CtrlType { /// Whether the consumer defined should be automatically subscribed to start receiving messages public static void Run(CancellationToken cancellationToken = default, Action beforeShutdown = null, bool subscribe = true) { - var logger = QuixStreams.Logging.CreateLogger(); + var logger = Logging.CreateLogger(); var waitForProcessShutdownStart = new ManualResetEventSlim(); var waitForMainExit = new ManualResetEventSlim(); Action actualBeforeShutdown = () => @@ -234,11 +234,11 @@ public static void Run(CancellationToken cancellationToken = default, Action bef /// The state storage to use for app's state manager public static void SetStateStorageType(StateStorageTypes type) { - if (App.stateStorageType != null) throw new InvalidOperationException("State storage type may only be set once"); + if (stateStorageType != null) throw new InvalidOperationException("State storage type may only be set once"); if (type == StateStorageTypes.RocksDb || type == StateStorageTypes.InMemory) { - App.stateStorageType = type; + stateStorageType = type; } else { @@ -248,8 +248,8 @@ public static void SetStateStorageType(StateStorageTypes type) public static StateStorageTypes GetStateStorageType() { - if (App.stateStorageType == null) SetStateStorageType(StateStorageTypes.RocksDb); - return App.stateStorageType.Value; + if (stateStorageType == null) SetStateStorageType(StateStorageTypes.RocksDb); + return stateStorageType.Value; } /// @@ -258,8 +258,8 @@ public static StateStorageTypes GetStateStorageType() /// The state storage path to use for states public static void SetStateStorageRootDir(string path) { - if (App.stateStorageRootDir != null) throw new InvalidOperationException("State storage root dir is already set"); - App.stateStorageRootDir = path; + if (stateStorageRootDir != null) throw new InvalidOperationException("State storage root dir is already set"); + stateStorageRootDir = path; } /// @@ -268,8 +268,8 @@ public static void SetStateStorageRootDir(string path) /// public static string GetStateStorageRootDir() { - if (App.stateStorageRootDir == null) SetStateStorageRootDir(Path.Combine(".", "state")); - return App.stateStorageRootDir; + if (stateStorageRootDir == null) SetStateStorageRootDir(Path.Combine(".", "state")); + return stateStorageRootDir; } internal static void Register(TopicConsumer topicConsumer) diff --git a/src/QuixStreams.Streaming/IStreamConsumer.cs b/src/QuixStreams.Streaming/IStreamConsumer.cs index 6a0fbca8..a967f345 100644 --- a/src/QuixStreams.Streaming/IStreamConsumer.cs +++ b/src/QuixStreams.Streaming/IStreamConsumer.cs @@ -97,7 +97,7 @@ public class PackageReceivedEventArgs /// The topic consumer associated with the event. /// The stream consumer associated with the event. /// The stream package that was received. - public PackageReceivedEventArgs(ITopicConsumer topicConsumer, IStreamConsumer consumer, QuixStreams.Telemetry.Models.StreamPackage package) + public PackageReceivedEventArgs(ITopicConsumer topicConsumer, IStreamConsumer consumer, StreamPackage package) { this.TopicConsumer = topicConsumer; this.Stream = consumer; @@ -117,7 +117,7 @@ public PackageReceivedEventArgs(ITopicConsumer topicConsumer, IStreamConsumer co /// /// Gets the stream package that was received. /// - public QuixStreams.Telemetry.Models.StreamPackage Package { get; } + public StreamPackage Package { get; } } /// diff --git a/src/QuixStreams.Streaming/IStreamConsumerInternal.cs b/src/QuixStreams.Streaming/IStreamConsumerInternal.cs index 3fe30530..dba742b7 100644 --- a/src/QuixStreams.Streaming/IStreamConsumerInternal.cs +++ b/src/QuixStreams.Streaming/IStreamConsumerInternal.cs @@ -12,26 +12,26 @@ internal interface IStreamConsumerInternal: IStreamConsumer /// /// Event raised when the Stream Properties have changed. /// - event Action OnStreamPropertiesChanged; + event Action OnStreamPropertiesChanged; /// /// Event raised when the have been changed. /// - event Action OnParameterDefinitionsChanged; + event Action OnParameterDefinitionsChanged; /// /// Event raised when the have been changed. /// - event Action OnEventDefinitionsChanged; + event Action OnEventDefinitionsChanged; /// /// Event raised when a new package of values have been received. /// - event Action OnTimeseriesData; + event Action OnTimeseriesData; /// /// Event raised when a new package of values have been received. /// - event Action OnEventData; + event Action OnEventData; } } \ No newline at end of file diff --git a/src/QuixStreams.Streaming/IStreamProducer.cs b/src/QuixStreams.Streaming/IStreamProducer.cs index 63158618..9f848e58 100644 --- a/src/QuixStreams.Streaming/IStreamProducer.cs +++ b/src/QuixStreams.Streaming/IStreamProducer.cs @@ -1,5 +1,6 @@ using System; using QuixStreams.Streaming.Models.StreamProducer; +using QuixStreams.Telemetry.Models; namespace QuixStreams.Streaming { @@ -43,7 +44,7 @@ public interface IStreamProducer : IDisposable /// Close the stream and flush the pending data to stream. /// /// Stream closing state - void Close(QuixStreams.Telemetry.Models.StreamEndType streamState = QuixStreams.Telemetry.Models.StreamEndType.Closed); + void Close(StreamEndType streamState = StreamEndType.Closed); /// /// Event raised when an exception occurred during the publishing processes diff --git a/src/QuixStreams.Streaming/IStreamProducerInternal.cs b/src/QuixStreams.Streaming/IStreamProducerInternal.cs index 0dc59c33..4f71bda7 100644 --- a/src/QuixStreams.Streaming/IStreamProducerInternal.cs +++ b/src/QuixStreams.Streaming/IStreamProducerInternal.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using QuixStreams.Telemetry.Models; namespace QuixStreams.Streaming { @@ -22,40 +23,40 @@ internal interface IStreamProducerInternal: IStreamProducer /// /// Publish a stream properties to the stream /// - void Publish(QuixStreams.Telemetry.Models.StreamProperties properties); + void Publish(StreamProperties properties); /// /// Publish a single Timeseries data package to the stream /// - void Publish(QuixStreams.Telemetry.Models.TimeseriesDataRaw rawData); + void Publish(TimeseriesDataRaw rawData); /// /// Publish a set of Timeseries data packages to the stream /// - void Publish(List data); + void Publish(List data); /// /// Publish the optional Parameter definition properties describing the hierarchical grouping of parameters /// Please note, new calls will not result in merged set with previous calls. New calls supersede previously sent values. /// - void Publish(QuixStreams.Telemetry.Models.ParameterDefinitions definitions); + void Publish(ParameterDefinitions definitions); /// /// Publish a single event to the stream /// /// Event to send - void Publish(QuixStreams.Telemetry.Models.EventDataRaw eventDataRaw); + void Publish(EventDataRaw eventDataRaw); /// /// Publish a set of events to the stream /// /// Events to send - void Publish(ICollection events); + void Publish(ICollection events); /// /// Publish the optional Event definition properties describing the hierarchical grouping of events /// Please note, new calls will not result in merged set with previous calls. New calls supersede previously sent values. /// - void Publish(QuixStreams.Telemetry.Models.EventDefinitions definitions); + void Publish(EventDefinitions definitions); } } \ No newline at end of file diff --git a/src/QuixStreams.Streaming/KafkaStreamingClient.cs b/src/QuixStreams.Streaming/KafkaStreamingClient.cs index 27d1191b..eabfb944 100644 --- a/src/QuixStreams.Streaming/KafkaStreamingClient.cs +++ b/src/QuixStreams.Streaming/KafkaStreamingClient.cs @@ -11,6 +11,7 @@ using QuixStreams.Telemetry.Configuration; using QuixStreams.Telemetry.Kafka; using AutoOffsetReset = QuixStreams.Telemetry.Kafka.AutoOffsetReset; +using SaslMechanism = Confluent.Kafka.SaslMechanism; namespace QuixStreams.Streaming { @@ -99,7 +100,7 @@ public interface IKafkaStreamingClient /// public class KafkaStreamingClient : IKafkaStreamingClient { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly string brokerAddress; private readonly Dictionary brokerProperties; @@ -138,7 +139,7 @@ public KafkaStreamingClient(string brokerAddress, SecurityOptions securityOption if (securityOptions.UseSasl) { - if (!Enum.TryParse(securityOptions.SaslMechanism.ToString(), true, out Confluent.Kafka.SaslMechanism parsed)) + if (!Enum.TryParse(securityOptions.SaslMechanism.ToString(), true, out SaslMechanism parsed)) { throw new ArgumentOutOfRangeException(nameof(securityOptions.SaslMechanism), "Unsupported sasl mechanism " + securityOptions.SaslMechanism); } diff --git a/src/QuixStreams.Streaming/Models/EventData.cs b/src/QuixStreams.Streaming/Models/EventData.cs index 3eab4ab4..6bfd7f11 100644 --- a/src/QuixStreams.Streaming/Models/EventData.cs +++ b/src/QuixStreams.Streaming/Models/EventData.cs @@ -62,7 +62,7 @@ public EventData Clone() /// Create a new Event Data instance loading data from type instance /// /// Event Data to load from - internal EventData(QuixStreams.Telemetry.Models.EventDataRaw rawData) + internal EventData(EventDataRaw rawData) { this.LoadFromEventDataRaw(rawData); } @@ -72,7 +72,7 @@ internal void SetTags(IDictionary newTags) this.Tags = newTags; } - private void LoadFromEventDataRaw(QuixStreams.Telemetry.Models.EventDataRaw rawData) + private void LoadFromEventDataRaw(EventDataRaw rawData) { this.EpochIncluded = true; this.TimestampNanoseconds = rawData.Timestamp; @@ -82,7 +82,7 @@ private void LoadFromEventDataRaw(QuixStreams.Telemetry.Models.EventDataRaw rawD } - private void CopyFrom(Streaming.Models.EventData data) + private void CopyFrom(EventData data) { this.EpochIncluded = data.EpochIncluded; this.TimestampNanoseconds = data.TimestampNanoseconds; @@ -91,9 +91,9 @@ private void CopyFrom(Streaming.Models.EventData data) this.SetTags(data.Tags.ToDictionary(kv => kv.Key, kv => kv.Value)); } - internal QuixStreams.Telemetry.Models.EventDataRaw ConvertToEventDataRaw() + internal EventDataRaw ConvertToEventDataRaw() { - return new QuixStreams.Telemetry.Models.EventDataRaw + return new EventDataRaw { Timestamp = this.TimestampNanoseconds, Id = this.Id, diff --git a/src/QuixStreams.Streaming/Models/EventDefinition.cs b/src/QuixStreams.Streaming/Models/EventDefinition.cs index df86f01b..925fc1c7 100644 --- a/src/QuixStreams.Streaming/Models/EventDefinition.cs +++ b/src/QuixStreams.Streaming/Models/EventDefinition.cs @@ -1,4 +1,6 @@ -namespace QuixStreams.Streaming.Models +using QuixStreams.Telemetry.Models; + +namespace QuixStreams.Streaming.Models { /// @@ -35,15 +37,15 @@ public class EventDefinition /// /// Gets the level of the event. Defaults to /// - public QuixStreams.Telemetry.Models.EventLevel Level { get; internal set; } = QuixStreams.Telemetry.Models.EventLevel.Information; + public EventLevel Level { get; internal set; } = EventLevel.Information; /// /// Converts the Event definition to Telemetry layer structure /// /// Telemetry layer Event definition - internal QuixStreams.Telemetry.Models.EventDefinition ConvertToTelemetryDefinition() + internal Telemetry.Models.EventDefinition ConvertToTelemetryDefinition() { - return new QuixStreams.Telemetry.Models.EventDefinition + return new Telemetry.Models.EventDefinition { Id = this.Id, Name = this.Name, diff --git a/src/QuixStreams.Streaming/Models/ParameterDefinition.cs b/src/QuixStreams.Streaming/Models/ParameterDefinition.cs index 856d7729..db1114a0 100644 --- a/src/QuixStreams.Streaming/Models/ParameterDefinition.cs +++ b/src/QuixStreams.Streaming/Models/ParameterDefinition.cs @@ -56,9 +56,9 @@ public class ParameterDefinition /// Converts the Parameter definition to Telemetry layer structure /// /// Telemetry layer Parameter definition - internal QuixStreams.Telemetry.Models.ParameterDefinition ConvertToTelemetrysDefinition() + internal Telemetry.Models.ParameterDefinition ConvertToTelemetrysDefinition() { - return new QuixStreams.Telemetry.Models.ParameterDefinition + return new Telemetry.Models.ParameterDefinition { Id = this.Id, Name = this.Name, diff --git a/src/QuixStreams.Streaming/Models/StreamConsumer/StreamEventsConsumer.cs b/src/QuixStreams.Streaming/Models/StreamConsumer/StreamEventsConsumer.cs index 4f014161..d8765ecf 100644 --- a/src/QuixStreams.Streaming/Models/StreamConsumer/StreamEventsConsumer.cs +++ b/src/QuixStreams.Streaming/Models/StreamConsumer/StreamEventsConsumer.cs @@ -28,7 +28,7 @@ internal StreamEventsConsumer(ITopicConsumer topicConsumer, IStreamConsumerInter this.streamConsumer.OnEventData += OnEventDataHandler; } - private void OnEventDataHandler(IStreamConsumer sender, QuixStreams.Telemetry.Models.EventDataRaw eventDataRaw) + private void OnEventDataHandler(IStreamConsumer sender, EventDataRaw eventDataRaw) { var data = new EventData(eventDataRaw); @@ -51,7 +51,7 @@ private void OnEventDefinitionsChangedHandler(IStreamConsumer sender, EventDefin /// public IList Definitions { get; private set; } - private void LoadFromTelemetryDefinitions(QuixStreams.Telemetry.Models.EventDefinitions definitions) + private void LoadFromTelemetryDefinitions(EventDefinitions definitions) { // Create a new list instead of modifying publicly available list to avoid threading issues like // user iterating the list then us changing it during it @@ -65,7 +65,7 @@ private void LoadFromTelemetryDefinitions(QuixStreams.Telemetry.Models.EventDefi this.Definitions = defs; } - private List ConvertEventDefinitions(List eventDefinitions, string location) + private List ConvertEventDefinitions(List eventDefinitions, string location) { var result = eventDefinitions.Select(d => new EventDefinition { @@ -80,7 +80,7 @@ private List ConvertEventDefinitions(List ConvertGroupEventDefinitions(List eventGroupDefinitions, string location) + private List ConvertGroupEventDefinitions(List eventGroupDefinitions, string location) { var result = new List(); diff --git a/src/QuixStreams.Streaming/Models/StreamConsumer/StreamTimeseriesConsumer.cs b/src/QuixStreams.Streaming/Models/StreamConsumer/StreamTimeseriesConsumer.cs index ce980d60..f5662c3b 100644 --- a/src/QuixStreams.Streaming/Models/StreamConsumer/StreamTimeseriesConsumer.cs +++ b/src/QuixStreams.Streaming/Models/StreamConsumer/StreamTimeseriesConsumer.cs @@ -71,7 +71,7 @@ public TimeseriesBufferConsumer CreateBuffer(params string[] parametersFilter) /// internal List Buffers { get; private set; } = new List(); - private void LoadFromTelemetryDefinitions(QuixStreams.Telemetry.Models.ParameterDefinitions definitions) + private void LoadFromTelemetryDefinitions(ParameterDefinitions definitions) { var defs = new List(); @@ -83,7 +83,7 @@ private void LoadFromTelemetryDefinitions(QuixStreams.Telemetry.Models.Parameter this.Definitions = defs; } - private List ConvertParameterDefinitions(List parameterDefinitions, string location) + private List ConvertParameterDefinitions(List parameterDefinitions, string location) { var result = parameterDefinitions.Select(d => new ParameterDefinition { @@ -101,7 +101,7 @@ private List ConvertParameterDefinitions(List ConvertGroupParameterDefinitions(List parameterGroupDefinitions, string location) + private List ConvertGroupParameterDefinitions(List parameterGroupDefinitions, string location) { var result = new List(); @@ -116,14 +116,14 @@ private List ConvertGroupParameterDefinitions(List /// Represents a class for consuming data from a stream in a buffered manner. @@ -38,7 +40,7 @@ public override void Dispose() /// /// The stream consumer associated with the event. /// Data received in TimeseriesDataRaw format . - private void OnTimeseriesDataEventHandler(IStreamConsumer streamConsumer, QuixStreams.Telemetry.Models.TimeseriesDataRaw timeseriesDataRaw) + private void OnTimeseriesDataEventHandler(IStreamConsumer streamConsumer, TimeseriesDataRaw timeseriesDataRaw) { this.WriteChunk(timeseriesDataRaw); } diff --git a/src/QuixStreams.Streaming/Models/StreamConsumerId.cs b/src/QuixStreams.Streaming/Models/StreamConsumerId.cs index 866c5da4..7fa2acfa 100644 --- a/src/QuixStreams.Streaming/Models/StreamConsumerId.cs +++ b/src/QuixStreams.Streaming/Models/StreamConsumerId.cs @@ -1,5 +1,3 @@ -using System; - namespace QuixStreams.Streaming.Models { public class StreamConsumerId diff --git a/src/QuixStreams.Streaming/Models/StreamProducer/EventDataBuilder.cs b/src/QuixStreams.Streaming/Models/StreamProducer/EventDataBuilder.cs index 6244d4e0..c3888ede 100644 --- a/src/QuixStreams.Streaming/Models/StreamProducer/EventDataBuilder.cs +++ b/src/QuixStreams.Streaming/Models/StreamProducer/EventDataBuilder.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using QuixStreams.Telemetry.Models; namespace QuixStreams.Streaming.Models.StreamProducer { @@ -10,7 +11,7 @@ public class EventDataBuilder private readonly StreamEventsProducer streamEventsProducer; private readonly long timestampNanoseconds; - private readonly List events = new List(); + private readonly List events = new List(); private readonly Dictionary tags; /// @@ -33,7 +34,7 @@ public EventDataBuilder(StreamEventsProducer streamEventsProducer, long timestam /// public EventDataBuilder AddValue(string eventId, string value) { - var @event = new QuixStreams.Telemetry.Models.EventDataRaw() + var @event = new EventDataRaw() { Id = eventId, Timestamp = timestampNanoseconds, diff --git a/src/QuixStreams.Streaming/Models/StreamProducer/EventDefinitionBuilder.cs b/src/QuixStreams.Streaming/Models/StreamProducer/EventDefinitionBuilder.cs index 17bdb69e..c3c4c291 100644 --- a/src/QuixStreams.Streaming/Models/StreamProducer/EventDefinitionBuilder.cs +++ b/src/QuixStreams.Streaming/Models/StreamProducer/EventDefinitionBuilder.cs @@ -10,7 +10,7 @@ public class EventDefinitionBuilder { private readonly StreamEventsProducer streamEventsProducer; private readonly string location; - private QuixStreams.Telemetry.Models.EventDefinition properties; + private Telemetry.Models.EventDefinition properties; /// /// Initializes a new instance of @@ -18,7 +18,7 @@ public class EventDefinitionBuilder /// Events producer owner /// Location selected for the Event definition builder /// Events definition instance managed by the builder - public EventDefinitionBuilder(StreamEventsProducer streamEventsProducer, string location, QuixStreams.Telemetry.Models.EventDefinition properties = null) + public EventDefinitionBuilder(StreamEventsProducer streamEventsProducer, string location, Telemetry.Models.EventDefinition properties = null) { this.streamEventsProducer = streamEventsProducer; this.location = location; diff --git a/src/QuixStreams.Streaming/Models/StreamProducer/IStreamTimeseriesProducer.cs b/src/QuixStreams.Streaming/Models/StreamProducer/IStreamTimeseriesProducer.cs index a1fa7c22..11188f50 100644 --- a/src/QuixStreams.Streaming/Models/StreamProducer/IStreamTimeseriesProducer.cs +++ b/src/QuixStreams.Streaming/Models/StreamProducer/IStreamTimeseriesProducer.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using QuixStreams.Telemetry.Models; namespace QuixStreams.Streaming.Models.StreamProducer { @@ -30,7 +31,7 @@ public interface IStreamTimeseriesProducer : IDisposable /// Publish data in TimeseriesDataRaw format without any buffering /// /// Timeseries data to publish - void Publish(QuixStreams.Telemetry.Models.TimeseriesDataRaw data); + void Publish(TimeseriesDataRaw data); /// /// Publish single timestamp to stream without any buffering diff --git a/src/QuixStreams.Streaming/Models/StreamProducer/ParameterDefinitionBuilder.cs b/src/QuixStreams.Streaming/Models/StreamProducer/ParameterDefinitionBuilder.cs index ec8f33e1..e271b948 100644 --- a/src/QuixStreams.Streaming/Models/StreamProducer/ParameterDefinitionBuilder.cs +++ b/src/QuixStreams.Streaming/Models/StreamProducer/ParameterDefinitionBuilder.cs @@ -9,7 +9,7 @@ public class ParameterDefinitionBuilder { private readonly StreamTimeseriesProducer streamTimeseriesProducer; private readonly string location; - private QuixStreams.Telemetry.Models.ParameterDefinition definition; + private Telemetry.Models.ParameterDefinition definition; /// /// Initializes a new instance of @@ -17,7 +17,7 @@ public class ParameterDefinitionBuilder /// Parameters producer owner /// Location selected for the Parameter definition builder /// Parameter definition instance managed by the builder - public ParameterDefinitionBuilder(StreamTimeseriesProducer streamTimeseriesProducer, string location, QuixStreams.Telemetry.Models.ParameterDefinition definition = null) + public ParameterDefinitionBuilder(StreamTimeseriesProducer streamTimeseriesProducer, string location, Telemetry.Models.ParameterDefinition definition = null) { this.streamTimeseriesProducer = streamTimeseriesProducer; this.location = location; diff --git a/src/QuixStreams.Streaming/Models/StreamProducer/StreamEventsProducer.cs b/src/QuixStreams.Streaming/Models/StreamProducer/StreamEventsProducer.cs index 51cdc9a8..44bea810 100644 --- a/src/QuixStreams.Streaming/Models/StreamProducer/StreamEventsProducer.cs +++ b/src/QuixStreams.Streaming/Models/StreamProducer/StreamEventsProducer.cs @@ -15,7 +15,7 @@ namespace QuixStreams.Streaming.Models.StreamProducer /// public class StreamEventsProducer : IStreamEventsProducer { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly IStreamProducerInternal streamProducer; private long epoch = 0; @@ -148,13 +148,13 @@ public EventDefinitionBuilder AddLocation(string location) return builder; } - internal QuixStreams.Telemetry.Models.EventDefinition CreateDefinition(string location, string eventId, string name, string description) + internal Telemetry.Models.EventDefinition CreateDefinition(string location, string eventId, string name, string description) { if (isDisposed) { throw new ObjectDisposedException(nameof(StreamEventsProducer)); } - var eventDefinition = new QuixStreams.Telemetry.Models.EventDefinition + var eventDefinition = new Telemetry.Models.EventDefinition { Id = eventId, Name = name, @@ -242,7 +242,7 @@ public void Publish(ICollection events) this.logger.Log(LogLevel.Trace, "{0} event(s) sent.", events.Count); } - internal void Publish(ICollection events) + internal void Publish(ICollection events) { if (isDisposed) { diff --git a/src/QuixStreams.Streaming/Models/StreamProducer/StreamPropertiesProducer.cs b/src/QuixStreams.Streaming/Models/StreamProducer/StreamPropertiesProducer.cs index 450fe080..a07d0220 100644 --- a/src/QuixStreams.Streaming/Models/StreamProducer/StreamPropertiesProducer.cs +++ b/src/QuixStreams.Streaming/Models/StreamProducer/StreamPropertiesProducer.cs @@ -4,7 +4,6 @@ using System.Linq; using System.Threading; using Microsoft.Extensions.Logging; -using QuixStreams; using QuixStreams.Telemetry.Models; namespace QuixStreams.Streaming.Models.StreamProducer @@ -30,7 +29,7 @@ public class StreamPropertiesProducer : IStreamPropertiesProducer private long lastHeartbeatRebroadcastTime = 0; // in milliseconds private int heartbeatRebroadcastFlushInterval = 30*1000; - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); /// public int FlushInterval @@ -230,11 +229,11 @@ List GetParents() { return this.Parents.Distinct().ToList(); } - catch (System.ArgumentException ex) + catch (ArgumentException ex) { this.logger.LogTrace(ex, "Exception while trying to get stream metadata"); } - catch (System.InvalidOperationException ex) + catch (InvalidOperationException ex) { this.logger.LogTrace(ex, "Exception while trying to get stream parents"); } @@ -252,11 +251,11 @@ Dictionary GetMetadata() { return this.Metadata.ToDictionary(kv => kv.Key, kv => kv.Value); } - catch (System.ArgumentException ex) + catch (ArgumentException ex) { this.logger.LogTrace(ex, "Exception while trying to get stream metadata"); } - catch (System.InvalidOperationException ex) + catch (InvalidOperationException ex) { this.logger.LogTrace(ex, "Exception while trying to get stream metadata"); } diff --git a/src/QuixStreams.Streaming/Models/StreamProducer/StreamTimeseriesProducer.cs b/src/QuixStreams.Streaming/Models/StreamProducer/StreamTimeseriesProducer.cs index a47fddc1..10b15770 100644 --- a/src/QuixStreams.Streaming/Models/StreamProducer/StreamTimeseriesProducer.cs +++ b/src/QuixStreams.Streaming/Models/StreamProducer/StreamTimeseriesProducer.cs @@ -2,9 +2,9 @@ using System.Collections.Generic; using System.Threading; using Microsoft.Extensions.Logging; -using QuixStreams; using QuixStreams.Streaming.Exceptions; using QuixStreams.Telemetry.Managers; +using QuixStreams.Telemetry.Models; using QuixStreams.Telemetry.Models.Utility; namespace QuixStreams.Streaming.Models.StreamProducer @@ -16,7 +16,7 @@ public class StreamTimeseriesProducer : IStreamTimeseriesProducer { private readonly IStreamProducerInternal streamProducer; - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private string location; private readonly ParameterDefinitionsManager parameterDefinitionsManager = new ParameterDefinitionsManager(); @@ -71,7 +71,7 @@ public void Publish(TimeseriesData data) } /// - public void Publish(QuixStreams.Telemetry.Models.TimeseriesDataRaw data) + public void Publish(TimeseriesDataRaw data) { if (isDisposed) { @@ -93,7 +93,7 @@ public void Publish(QuixStreams.Telemetry.Models.TimeseriesDataRaw data) updatedTimestamps[i] = data.Timestamps[i] + epochDiff; } - QuixStreams.Telemetry.Models.TimeseriesDataRaw newData = new QuixStreams.Telemetry.Models.TimeseriesDataRaw( + TimeseriesDataRaw newData = new TimeseriesDataRaw( data.Epoch, updatedTimestamps, data.NumericValues, @@ -179,13 +179,13 @@ public ParameterDefinitionBuilder AddLocation(string location) return builder; } - internal QuixStreams.Telemetry.Models.ParameterDefinition CreateDefinition(string location, string parameterId, string name, string description) + internal Telemetry.Models.ParameterDefinition CreateDefinition(string location, string parameterId, string name, string description) { if (isDisposed) { throw new ObjectDisposedException(nameof(StreamTimeseriesProducer)); } - var parameterDefinition = new QuixStreams.Telemetry.Models.ParameterDefinition + var parameterDefinition = new Telemetry.Models.ParameterDefinition { Id = parameterId, Name = name, diff --git a/src/QuixStreams.Streaming/Models/StreamProducer/TimeseriesBufferProducer.cs b/src/QuixStreams.Streaming/Models/StreamProducer/TimeseriesBufferProducer.cs index 2946f5df..cc16d736 100644 --- a/src/QuixStreams.Streaming/Models/StreamProducer/TimeseriesBufferProducer.cs +++ b/src/QuixStreams.Streaming/Models/StreamProducer/TimeseriesBufferProducer.cs @@ -158,7 +158,7 @@ public void Publish(TimeseriesDataRaw data) } } - var newData = new QuixStreams.Telemetry.Models.TimeseriesDataRaw( + var newData = new TimeseriesDataRaw( data.Epoch, updatedTimestamps ?? data.Timestamps, data.NumericValues, diff --git a/src/QuixStreams.Streaming/Models/TimeseriesBuffer.cs b/src/QuixStreams.Streaming/Models/TimeseriesBuffer.cs index 9f52aa98..b28c1414 100644 --- a/src/QuixStreams.Streaming/Models/TimeseriesBuffer.cs +++ b/src/QuixStreams.Streaming/Models/TimeseriesBuffer.cs @@ -1,12 +1,8 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Net; using System.Threading; -using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using QuixStreams; using QuixStreams.Streaming.Models.StreamConsumer; using QuixStreams.Telemetry.Models; @@ -18,7 +14,7 @@ namespace QuixStreams.Streaming.Models public class TimeseriesBuffer: IDisposable { private bool isDisposed = false; - private ILogger logger = QuixStreams.Logging.CreateLogger(typeof(TimeseriesDataRaw)); + private ILogger logger = Logging.CreateLogger(typeof(TimeseriesDataRaw)); // Configuration of the buffer private int? bufferTimeout = null; @@ -246,7 +242,7 @@ public Func CustomTrigger /// Writes a chunck of data into the buffer /// /// Data in format - protected internal void WriteChunk(QuixStreams.Telemetry.Models.TimeseriesDataRaw timeseriesDataRaw) + protected internal void WriteChunk(TimeseriesDataRaw timeseriesDataRaw) { if (isDisposed) { @@ -436,7 +432,7 @@ internal void FlushData(bool force) this.InvokeOnRawReceived(this, new TimeseriesDataRawReadEventArgs(null, null, newPdrw)); if (this.OnDataReleased == null) return; - var data = new Streaming.Models.TimeseriesData(newPdrw, this.parametersFilter, false, false); + var data = new TimeseriesData(newPdrw, this.parametersFilter, false, false); this.InvokeOnReceive(this, new TimeseriesDataReadEventArgs(null, null, data)); } @@ -480,7 +476,7 @@ private TimeseriesDataRaw FilterDataFrameByFilterFunction(TimeseriesDataRaw data { var filteredRows = new List(); - var timeseriesData = new Streaming.Models.TimeseriesData(data, null, false, false); + var timeseriesData = new TimeseriesData(data, null, false, false); // Indexes of elements which ran over the filter for (var i = 0; i < timeseriesData.Timestamps.Count; i++) diff --git a/src/QuixStreams.Streaming/Models/TimeseriesData.cs b/src/QuixStreams.Streaming/Models/TimeseriesData.cs index 6b705f74..eb84a0c5 100644 --- a/src/QuixStreams.Streaming/Models/TimeseriesData.cs +++ b/src/QuixStreams.Streaming/Models/TimeseriesData.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Linq; using Microsoft.Extensions.Logging; -using QuixStreams; using QuixStreams.Streaming.Utils; using QuixStreams.Telemetry.Models; using QuixStreams.Telemetry.Models.Utility; @@ -14,8 +13,8 @@ namespace QuixStreams.Streaming.Models /// public class TimeseriesData { - private static Lazy logger = new Lazy(() => QuixStreams.Logging.CreateLogger()); - internal QuixStreams.Telemetry.Models.TimeseriesDataRaw rawData; + private static Lazy logger = new Lazy(() => Logging.CreateLogger()); + internal TimeseriesDataRaw rawData; internal Dictionary parameterList; internal List timestampsList; @@ -44,7 +43,7 @@ public TimeseriesData(int capacity = 10) /// List of parameters to filter /// Merge duplicated timestamps /// Clean timestamps without values - public TimeseriesData(QuixStreams.Telemetry.Models.TimeseriesDataRaw rawData, string[] parametersFilter = null, bool merge = true, bool clean = true) + public TimeseriesData(TimeseriesDataRaw rawData, string[] parametersFilter = null, bool merge = true, bool clean = true) { this.rawData = rawData; this.epochsIncluded = new bool[rawData.Timestamps.Count()]; @@ -78,7 +77,7 @@ public TimeseriesData Clone(params string[] parametersFilter) return data; } - private void CloneFrom(Streaming.Models.TimeseriesData data, string[] parametersFilter = null) + private void CloneFrom(TimeseriesData data, string[] parametersFilter = null) { this.rawData = data.rawData.Clone(); this.epochsIncluded = (bool[])data.epochsIncluded.Clone(); @@ -285,7 +284,7 @@ private void ResizeRawData(int newSize) } } - internal QuixStreams.Telemetry.Models.TimeseriesDataRaw ConvertToTimeseriesDataRaw(bool merge = true, bool clean = true) + internal TimeseriesDataRaw ConvertToTimeseriesDataRaw(bool merge = true, bool clean = true) { if (merge) { diff --git a/src/QuixStreams.Streaming/QuixStreamingClient.cs b/src/QuixStreams.Streaming/QuixStreamingClient.cs index 743b8e42..745d8a10 100644 --- a/src/QuixStreams.Streaming/QuixStreamingClient.cs +++ b/src/QuixStreams.Streaming/QuixStreamingClient.cs @@ -206,7 +206,7 @@ public interface IQuixStreamingClientAsync /// public class QuixStreamingClient : IQuixStreamingClient { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly IDictionary brokerProperties; private readonly string token; private readonly string workspaceId; @@ -521,7 +521,7 @@ public async Task GetTopicProducerAsync(string topicIdOrName, St }; // Hacky workaround to an issue that Kafka client can't be left with no GroupId, but it still uses it for ACL checks. - QuixStreams.Kafka.ConsumerConfiguration.ConsumerGroupIdWhenNotSet = ws.WorkspaceId + "-" + Guid.NewGuid().ToString("N").Substring(0, 10); + ConsumerConfiguration.ConsumerGroupIdWhenNotSet = ws.WorkspaceId + "-" + Guid.NewGuid().ToString("N").Substring(0, 10); return (null, newCommitOptions); } @@ -910,7 +910,7 @@ private async Task SendRequestToApi(HttpMethod method, Uri var cid = String.Empty; try { - var error = JsonConvert.DeserializeObject(responseContent, jsonSerializerSettings); + var error = JsonConvert.DeserializeObject(responseContent, jsonSerializerSettings); msg = error.Message; cid = error.CorrelationId; } diff --git a/src/QuixStreams.Streaming/Raw/RawTopicConsumer.cs b/src/QuixStreams.Streaming/Raw/RawTopicConsumer.cs index 204a04bb..68b58104 100644 --- a/src/QuixStreams.Streaming/Raw/RawTopicConsumer.cs +++ b/src/QuixStreams.Streaming/Raw/RawTopicConsumer.cs @@ -4,8 +4,6 @@ using Confluent.Kafka; using Microsoft.Extensions.Logging; using QuixStreams.Kafka; -using QuixStreams; -using QuixStreams.Kafka.Transport; using QuixStreams.Telemetry.Kafka; using AutoOffsetReset = QuixStreams.Telemetry.Kafka.AutoOffsetReset; @@ -100,7 +98,7 @@ public void Subscribe() { if (connectionStarted) { - var logger = QuixStreams.Logging.CreateLogger(); + var logger = Logging.CreateLogger(); logger.LogWarning("Attempted to subscribe to topic {0} more than once.", this.topicName); return; } diff --git a/src/QuixStreams.Streaming/States/StreamDictionaryState.cs b/src/QuixStreams.Streaming/States/StreamDictionaryState.cs index fe6d0009..bab087b7 100644 --- a/src/QuixStreams.Streaming/States/StreamDictionaryState.cs +++ b/src/QuixStreams.Streaming/States/StreamDictionaryState.cs @@ -15,7 +15,7 @@ public class StreamDictionaryState : IStreamState, IDictionary /// The underlying state storage for this StreamState, responsible for managing the actual key-value pairs. /// - private readonly State.DictionaryState dictionaryState; + private readonly DictionaryState dictionaryState; /// /// Raised immediately before a flush operation is performed. @@ -34,7 +34,7 @@ public class StreamDictionaryState : IStreamState, IDictionaryThe logger factory to use internal StreamDictionaryState(IStateStorage storage, ILoggerFactory loggerFactory) { - this.dictionaryState = new State.DictionaryState(storage, loggerFactory); + this.dictionaryState = new DictionaryState(storage, loggerFactory); } /// @@ -207,7 +207,7 @@ public class StreamDictionaryState : IStreamState, IDictionary /// /// The underlying state storage for this StreamState, responsible for managing the actual key-value pairs. /// - private readonly State.DictionaryState dictionaryState; + private readonly DictionaryState dictionaryState; /// /// Returns whether the cache keys are case-sensitive /// @@ -236,7 +236,7 @@ public class StreamDictionaryState : IStreamState, IDictionary /// The logger factory to use internal StreamDictionaryState(IStateStorage storage, StreamStateDefaultValueDelegate defaultValueFactory, ILoggerFactory loggerFactory) { - this.dictionaryState = new State.DictionaryState(storage, loggerFactory); + this.dictionaryState = new DictionaryState(storage, loggerFactory); this.defaultValueFactory = defaultValueFactory ?? (s => throw new KeyNotFoundException("The specified key was not found and there was no default value factory set.")); } diff --git a/src/QuixStreams.Streaming/States/StreamScalarState.cs b/src/QuixStreams.Streaming/States/StreamScalarState.cs index 7b5e8f21..62a2fae4 100644 --- a/src/QuixStreams.Streaming/States/StreamScalarState.cs +++ b/src/QuixStreams.Streaming/States/StreamScalarState.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using Microsoft.Extensions.Logging; using QuixStreams.State; using QuixStreams.State.Storage; @@ -14,7 +13,7 @@ public class StreamScalarState : IStreamState /// /// The underlying state storage for this StreamState, responsible for managing the actual value. /// - private readonly State.ScalarState scalarState; + private readonly ScalarState scalarState; /// /// Raised immediately before a flush operation is performed. @@ -33,7 +32,7 @@ public class StreamScalarState : IStreamState /// The logger factory to use internal StreamScalarState(IStateStorage storage, ILoggerFactory loggerFactory) { - this.scalarState = new State.ScalarState(storage, loggerFactory); + this.scalarState = new ScalarState(storage, loggerFactory); } /// @@ -86,7 +85,7 @@ public class StreamScalarState : IStreamState /// /// The underlying state storage for this StreamState, responsible for managing the actual value. /// - private readonly State.ScalarState scalarState; + private readonly ScalarState scalarState; /// /// A function that returns a default value of type T when the value has not been set yet. @@ -111,7 +110,7 @@ public class StreamScalarState : IStreamState /// The logger factory to use internal StreamScalarState(IStateStorage storage, StreamStateScalarDefaultValueDelegate defaultValueFactory, ILoggerFactory loggerFactory) { - this.scalarState = new State.ScalarState(storage, loggerFactory); + this.scalarState = new ScalarState(storage, loggerFactory); this.defaultValueFactory = defaultValueFactory ?? (() => default(T)); } diff --git a/src/QuixStreams.Streaming/StreamConsumer.cs b/src/QuixStreams.Streaming/StreamConsumer.cs index 57dc514a..8eb72b64 100644 --- a/src/QuixStreams.Streaming/StreamConsumer.cs +++ b/src/QuixStreams.Streaming/StreamConsumer.cs @@ -90,32 +90,32 @@ public StreamStateManager GetStateManager() } /// - public virtual event Action OnStreamPropertiesChanged; + public virtual event Action OnStreamPropertiesChanged; /// - public virtual event Action OnParameterDefinitionsChanged; + public virtual event Action OnParameterDefinitionsChanged; /// - public virtual event Action OnTimeseriesData; + public virtual event Action OnTimeseriesData; /// - public virtual event Action OnEventData; + public virtual event Action OnEventData; /// - public virtual event Action OnEventDefinitionsChanged; + public virtual event Action OnEventDefinitionsChanged; private void InitializeStreaming() { // Modifiers // this.AddComponent(SimpleModifier) - this.Subscribe(OnStreamPropertiesReceived); - this.Subscribe(OnTimeseriesDataReceived); - this.Subscribe(OnParameterDefinitionsReceived); - this.Subscribe(OnEventDataReceived); - this.Subscribe(OnEventDataReceived); - this.Subscribe(OnEventDefinitionsReceived); - this.Subscribe(OnStreamEndReceived); + this.Subscribe(OnStreamPropertiesReceived); + this.Subscribe(OnTimeseriesDataReceived); + this.Subscribe(OnParameterDefinitionsReceived); + this.Subscribe(OnEventDataReceived); + this.Subscribe(OnEventDataReceived); + this.Subscribe(OnEventDefinitionsReceived); + this.Subscribe(OnStreamEndReceived); this.Subscribe(OnStreamPackageReceived); this.OnClosed += () => @@ -124,7 +124,7 @@ private void InitializeStreaming() }; } - private void OnStreamPackageReceived(IStreamPipeline streamPipeline, QuixStreams.Telemetry.Models.StreamPackage package) + private void OnStreamPackageReceived(IStreamPipeline streamPipeline, StreamPackage package) { if (package.Type == typeof(byte[])) { @@ -144,31 +144,31 @@ private void OnStreamPackageReceived(IStreamPipeline streamPipeline, QuixStreams this.OnPackageReceived?.Invoke(this, new PackageReceivedEventArgs(this.topicConsumer, this, package)); } - private void OnStreamPropertiesReceived(IStreamPipeline streamPipeline, QuixStreams.Telemetry.Models.StreamProperties obj) + private void OnStreamPropertiesReceived(IStreamPipeline streamPipeline, StreamProperties obj) { this.logger.LogTrace("StreamConsumer: OnStreamPropertiesReceived"); this.OnStreamPropertiesChanged?.Invoke(this, obj); } - private void OnTimeseriesDataReceived(IStreamPipeline streamPipeline, QuixStreams.Telemetry.Models.TimeseriesDataRaw obj) + private void OnTimeseriesDataReceived(IStreamPipeline streamPipeline, TimeseriesDataRaw obj) { this.logger.LogTrace("StreamConsumer: OnTimeseriesDataReceived. Data packet of size = {0}", obj.Timestamps.Length); this.OnTimeseriesData?.Invoke(this, obj); } - private void OnParameterDefinitionsReceived(IStreamPipeline streamPipeline, QuixStreams.Telemetry.Models.ParameterDefinitions obj) + private void OnParameterDefinitionsReceived(IStreamPipeline streamPipeline, ParameterDefinitions obj) { this.logger.LogTrace("StreamConsumer: OnParameterDefinitionsReceived"); this.OnParameterDefinitionsChanged?.Invoke(this, obj); } - private void OnEventDataReceived(IStreamPipeline streamPipeline, QuixStreams.Telemetry.Models.EventDataRaw @event) + private void OnEventDataReceived(IStreamPipeline streamPipeline, EventDataRaw @event) { this.logger.LogTrace("StreamConsumer: OnEventDataReceived"); this.OnEventData?.Invoke(this, @event); } - private void OnEventDataReceived(IStreamPipeline streamPipeline, QuixStreams.Telemetry.Models.EventDataRaw[] events) + private void OnEventDataReceived(IStreamPipeline streamPipeline, EventDataRaw[] events) { this.logger.LogTrace("StreamConsumer: OnEventDataReceived"); for (var index = 0; index < events.Length; index++) @@ -178,13 +178,13 @@ private void OnEventDataReceived(IStreamPipeline streamPipeline, QuixStreams.Tel } } - private void OnEventDefinitionsReceived(IStreamPipeline streamPipeline, QuixStreams.Telemetry.Models.EventDefinitions obj) + private void OnEventDefinitionsReceived(IStreamPipeline streamPipeline, EventDefinitions obj) { this.logger.LogTrace("StreamConsumer: OnEventDefinitionsReceived"); this.OnEventDefinitionsChanged?.Invoke(this, obj); } - private void OnStreamEndReceived(IStreamPipeline streamPipeline, QuixStreams.Telemetry.Models.StreamEnd obj) + private void OnStreamEndReceived(IStreamPipeline streamPipeline, StreamEnd obj) { RaiseStreamClosed(obj.StreamEndType); } diff --git a/src/QuixStreams.Streaming/StreamProducer.cs b/src/QuixStreams.Streaming/StreamProducer.cs index 2ced54fd..325b55e9 100644 --- a/src/QuixStreams.Streaming/StreamProducer.cs +++ b/src/QuixStreams.Streaming/StreamProducer.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using QuixStreams; +using Newtonsoft.Json; using QuixStreams.Streaming.Exceptions; using QuixStreams.Streaming.Models.StreamProducer; using QuixStreams.Telemetry; @@ -21,7 +21,7 @@ namespace QuixStreams.Streaming internal class StreamProducer: StreamPipeline, IStreamProducerInternal { public event Action OnBeforeSend; - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly StreamPropertiesProducer streamPropertiesProducer; private readonly StreamTimeseriesProducer streamTimeseriesProducer; private readonly StreamEventsProducer streamEventsProducer; @@ -95,14 +95,14 @@ public DateTime Epoch public IStreamEventsProducer Events => streamEventsProducer; /// - public void Publish(QuixStreams.Telemetry.Models.StreamProperties properties) + public void Publish(StreamProperties properties) { CheckIfClosed(); this.Send(properties); } /// - public void Publish(QuixStreams.Telemetry.Models.TimeseriesDataRaw rawData) + public void Publish(TimeseriesDataRaw rawData) { CheckIfClosed(); var send = this.Send(rawData); @@ -120,7 +120,7 @@ public void Publish(QuixStreams.Telemetry.Models.TimeseriesDataRaw rawData) } /// - public void Publish(List data) + public void Publish(List data) { CheckIfClosed(); foreach(var d in data) @@ -130,11 +130,11 @@ public void Publish(List data) } /// - public void Publish(QuixStreams.Telemetry.Models.ParameterDefinitions definitions) + public void Publish(ParameterDefinitions definitions) { CheckIfClosed(); definitions.Validate(); - var hash = Newtonsoft.Json.JsonConvert.SerializeObject(definitions).GetHashCode(); + var hash = JsonConvert.SerializeObject(definitions).GetHashCode(); if (this.lastParameterDefinitionHash == hash) return; this.lastParameterDefinitionHash = hash; var send = this.Send(definitions); @@ -153,7 +153,7 @@ public void Publish(QuixStreams.Telemetry.Models.ParameterDefinitions definition } /// - public void Publish(QuixStreams.Telemetry.Models.EventDataRaw eventDataRaw) + public void Publish(EventDataRaw eventDataRaw) { CheckIfClosed(); if (eventDataRaw == null) throw new ArgumentNullException(nameof(eventDataRaw)); @@ -162,7 +162,7 @@ public void Publish(QuixStreams.Telemetry.Models.EventDataRaw eventDataRaw) } /// - public void Publish(ICollection events) + public void Publish(ICollection events) { CheckIfClosed(); if (events == null) throw new ArgumentNullException(nameof(events)); @@ -182,11 +182,11 @@ public void Publish(ICollection event } /// - public void Publish(QuixStreams.Telemetry.Models.EventDefinitions definitions) + public void Publish(EventDefinitions definitions) { CheckIfClosed(); definitions.Validate(); - var hash = Newtonsoft.Json.JsonConvert.SerializeObject(definitions).GetHashCode(); + var hash = JsonConvert.SerializeObject(definitions).GetHashCode(); if (this.lastEventDefinitionHash == hash) return; this.lastEventDefinitionHash = hash; var send = this.Send(definitions); @@ -234,7 +234,7 @@ public void Flush() } /// - public void Close(QuixStreams.Telemetry.Models.StreamEndType streamState = QuixStreams.Telemetry.Models.StreamEndType.Closed) + public void Close(StreamEndType streamState = StreamEndType.Closed) { lock (this.closeLock) { diff --git a/src/QuixStreams.Streaming/TopicConsumer.cs b/src/QuixStreams.Streaming/TopicConsumer.cs index 46732a6c..70360316 100644 --- a/src/QuixStreams.Streaming/TopicConsumer.cs +++ b/src/QuixStreams.Streaming/TopicConsumer.cs @@ -1,8 +1,6 @@ using System; -using System.Collections.Concurrent; using System.Linq; using Microsoft.Extensions.Logging; -using QuixStreams; using QuixStreams.Streaming.Models; using QuixStreams.Streaming.States; using QuixStreams.Telemetry; @@ -15,7 +13,7 @@ namespace QuixStreams.Streaming /// public class TopicConsumer : ITopicConsumer { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly TelemetryKafkaConsumer telemetryKafkaConsumer; private bool isDisposed = false; private readonly object stateLock = new object(); diff --git a/src/QuixStreams.Streaming/Utils/CodecSettings.cs b/src/QuixStreams.Streaming/Utils/CodecSettings.cs index 7c52189c..e94086cc 100644 --- a/src/QuixStreams.Streaming/Utils/CodecSettings.cs +++ b/src/QuixStreams.Streaming/Utils/CodecSettings.cs @@ -1,6 +1,6 @@ using System; using Microsoft.Extensions.Logging; -using QuixStreams; +using QuixStreams.Kafka.Transport.SerDes; using QuixStreams.Kafka.Transport.SerDes.Legacy.MessageValue; using QuixStreams.Telemetry.Models; @@ -24,12 +24,12 @@ public static class CodecSettings /// /// The logger for the class /// - private static Lazy logger = new Lazy(() => QuixStreams.Logging.CreateLogger(typeof(CodecSettings))); + private static Lazy logger = new Lazy(() => Logging.CreateLogger(typeof(CodecSettings))); static CodecSettings() { // Set the Json codec type as the default - CodecSettings.SetGlobalCodecType(CodecType.Json); + SetGlobalCodecType(CodecType.Json); } /// @@ -43,15 +43,15 @@ public static void SetGlobalCodecType(CodecType codecType) if (codecType == CodecType.Protobuf) { - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.LegacyValueCodecType = TransportPackageValueCodecType.Binary; + PackageSerializationSettings.LegacyValueCodecType = TransportPackageValueCodecType.Binary; } else { - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.LegacyValueCodecType = TransportPackageValueCodecType.Json; + PackageSerializationSettings.LegacyValueCodecType = TransportPackageValueCodecType.Json; } CurrentCodec = codecType; codecSet = true; - logger.Value.LogDebug("Codecs are configured to publish using {0} with {1} package codec.", codecType, QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.LegacyValueCodecType); + logger.Value.LogDebug("Codecs are configured to publish using {0} with {1} package codec.", codecType, PackageSerializationSettings.LegacyValueCodecType); } } } \ No newline at end of file diff --git a/src/QuixStreams.Telemetry.Samples/ConsoleStreamWriter.cs b/src/QuixStreams.Telemetry.Samples/ConsoleStreamWriter.cs index b192771b..7f39930f 100644 --- a/src/QuixStreams.Telemetry.Samples/ConsoleStreamWriter.cs +++ b/src/QuixStreams.Telemetry.Samples/ConsoleStreamWriter.cs @@ -9,7 +9,7 @@ namespace QuixStreams.Telemetry.Samples /// public class ConsoleStreamWriter : StreamComponent { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); public ConsoleStreamWriter() { diff --git a/src/QuixStreams.Telemetry.Samples/Program.cs b/src/QuixStreams.Telemetry.Samples/Program.cs index c5fd7649..729f668d 100644 --- a/src/QuixStreams.Telemetry.Samples/Program.cs +++ b/src/QuixStreams.Telemetry.Samples/Program.cs @@ -1,7 +1,6 @@ using System; using System.Threading; using QuixStreams.Kafka; - using QuixStreams.Kafka.Transport; using QuixStreams.Kafka.Transport.SerDes; using QuixStreams.Kafka.Transport.SerDes.Legacy.MessageValue; using QuixStreams.Telemetry.Kafka; diff --git a/src/QuixStreams.Telemetry.UnitTests/KafkaConsumerProducerShould.cs b/src/QuixStreams.Telemetry.UnitTests/KafkaConsumerProducerShould.cs index 060fde1c..80790771 100644 --- a/src/QuixStreams.Telemetry.UnitTests/KafkaConsumerProducerShould.cs +++ b/src/QuixStreams.Telemetry.UnitTests/KafkaConsumerProducerShould.cs @@ -10,6 +10,7 @@ using QuixStreams.Telemetry.UnitTests.Helpers; using Xunit; using Xunit.Abstractions; +using CodecRegistry = QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry; namespace QuixStreams.Telemetry.UnitTests { @@ -17,7 +18,7 @@ public class KafkaConsumerProducerShould { public KafkaConsumerProducerShould(ITestOutputHelper helper) { - QuixStreams.Logging.Factory = helper.CreateLoggerFactory(); + Logging.Factory = helper.CreateLoggerFactory(); } [Fact] @@ -175,16 +176,16 @@ public async Task KafkaConsumerProducer_WithErrorsOnSend_ShouldRaiseExceptions() private static void RegisterTestCodecs() { - QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RegisterCodec(typeof(StreamEnd).Name, new DefaultJsonCodec()); - QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RegisterCodec(new ModelKey(typeof(TestModel1)), new DefaultJsonCodec()); - QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RegisterCodec(new ModelKey(typeof(TestModel2)), new DefaultJsonCodec()); + CodecRegistry.RegisterCodec(typeof(StreamEnd).Name, new DefaultJsonCodec()); + CodecRegistry.RegisterCodec(new ModelKey(typeof(TestModel1)), new DefaultJsonCodec()); + CodecRegistry.RegisterCodec(new ModelKey(typeof(TestModel2)), new DefaultJsonCodec()); } private static void UnregisterTestCodecs() { - QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.ClearCodecs(typeof(StreamEnd).Name); - QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.ClearCodecs(new ModelKey(typeof(TestModel1))); - QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.ClearCodecs(new ModelKey(typeof(TestModel2))); + CodecRegistry.ClearCodecs(typeof(StreamEnd).Name); + CodecRegistry.ClearCodecs(new ModelKey(typeof(TestModel1))); + CodecRegistry.ClearCodecs(new ModelKey(typeof(TestModel2))); } } diff --git a/src/QuixStreams.Telemetry.UnitTests/Models/Telemetry/CodecRegistryShould.cs b/src/QuixStreams.Telemetry.UnitTests/Models/Telemetry/CodecRegistryShould.cs index 79df7daa..a56ec005 100644 --- a/src/QuixStreams.Telemetry.UnitTests/Models/Telemetry/CodecRegistryShould.cs +++ b/src/QuixStreams.Telemetry.UnitTests/Models/Telemetry/CodecRegistryShould.cs @@ -6,7 +6,7 @@ using QuixStreams.Telemetry.Models.Codecs; using QuixStreams.Telemetry.Models.Telemetry.Parameters.Codecs; using Xunit; -using CodecRegistry = QuixStreams.Telemetry.Models.CodecRegistry; +using CodecRegistry = QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry; namespace QuixStreams.Telemetry.UnitTests.Models.Telemetry { @@ -14,7 +14,7 @@ public class CodecRegistryShould { private void ValidateForDefaultJsonCodec() { - var codecs = QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RetrieveCodecs(new ModelKey(typeof(T).Name)); + var codecs = CodecRegistry.RetrieveCodecs(new ModelKey(typeof(T).Name)); var writeCodec = codecs.FirstOrDefault(); writeCodec.Should().NotBeNull(); writeCodec.GetType().IsAssignableFrom(typeof(DefaultJsonCodec)).Should().BeTrue($"expecting DefaultJsonCodec<{typeof(T).Name}>"); @@ -24,10 +24,10 @@ private void ValidateForDefaultJsonCodec() public void Register_JsonEvents_ShouldRegisterAsExpected() { // Act - CodecRegistry.Register(CodecType.Json); + QuixStreams.Telemetry.Models.CodecRegistry.Register(CodecType.Json); // Assert - var codecs = QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RetrieveCodecs("EventData[]"); + var codecs = CodecRegistry.RetrieveCodecs("EventData[]"); var writeCodec = codecs.FirstOrDefault(); writeCodec.Should().NotBeNull(); writeCodec.GetType().IsAssignableFrom(typeof(DefaultJsonCodec)).Should().BeTrue($"expecting DefaultJsonCodec"); @@ -37,10 +37,10 @@ public void Register_JsonEvents_ShouldRegisterAsExpected() public void Register_CompactJsonForBetterPerformance_ShouldRegisterAsExpected() { // Act - CodecRegistry.Register(CodecType.CompactJsonForBetterPerformance); + QuixStreams.Telemetry.Models.CodecRegistry.Register(CodecType.CompactJsonForBetterPerformance); // Assert - var codecs = QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RetrieveCodecs(new ModelKey("TimeseriesData")); + var codecs = CodecRegistry.RetrieveCodecs(new ModelKey("TimeseriesData")); codecs.Count().Should().Be(3, $"{string.Join(", ", codecs.Select(y=> y.Id))} should contain 3"); codecs.Should().Contain(x => x is TimeseriesDataReadableCodec); // for reading codecs.Should().Contain(x => x is DefaultJsonCodec); // for reading @@ -52,10 +52,10 @@ public void Register_CompactJsonForBetterPerformance_ShouldRegisterAsExpected() public void Register_JsonTimeseriesData_ShouldRegisterAsExpected() { // Act - CodecRegistry.Register(CodecType.Json); + QuixStreams.Telemetry.Models.CodecRegistry.Register(CodecType.Json); // Assert - var codecs = QuixStreams.Kafka.Transport.SerDes.Codecs.CodecRegistry.RetrieveCodecs(new ModelKey("TimeseriesData")); + var codecs = CodecRegistry.RetrieveCodecs(new ModelKey("TimeseriesData")); codecs.Count().Should().Be(3); codecs.Should().Contain(x => x is TimeseriesDataReadableCodec); // for reading codecs.Should().Contain(x => x is DefaultJsonCodec); // for reading @@ -67,7 +67,7 @@ public void Register_JsonTimeseriesData_ShouldRegisterAsExpected() public void Register_JsonStreamProperties_ShouldRegisterAsExpected() { // Act - CodecRegistry.Register(CodecType.Json); + QuixStreams.Telemetry.Models.CodecRegistry.Register(CodecType.Json); // Assert ValidateForDefaultJsonCodec(); @@ -77,7 +77,7 @@ public void Register_JsonStreamProperties_ShouldRegisterAsExpected() public void Register_JsonParameterDefinitions_ShouldRegisterAsExpected() { // Act - CodecRegistry.Register(CodecType.Json); + QuixStreams.Telemetry.Models.CodecRegistry.Register(CodecType.Json); // Assert ValidateForDefaultJsonCodec(); @@ -87,7 +87,7 @@ public void Register_JsonParameterDefinitions_ShouldRegisterAsExpected() public void Register_JsonEventDefinitions_ShouldRegisterAsExpected() { // Act - CodecRegistry.Register(CodecType.Json); + QuixStreams.Telemetry.Models.CodecRegistry.Register(CodecType.Json); // Assert ValidateForDefaultJsonCodec(); @@ -98,7 +98,7 @@ public void Register_JsonEventDefinitions_ShouldRegisterAsExpected() public void Register_JsonStreamEnd_ShouldRegisterAsExpected() { // Act - CodecRegistry.Register(CodecType.Json); + QuixStreams.Telemetry.Models.CodecRegistry.Register(CodecType.Json); // Assert ValidateForDefaultJsonCodec(); diff --git a/src/QuixStreams.Telemetry.UnitTests/StreamPipelineFactoryShould.cs b/src/QuixStreams.Telemetry.UnitTests/StreamPipelineFactoryShould.cs index 33e7617a..67042e5e 100644 --- a/src/QuixStreams.Telemetry.UnitTests/StreamPipelineFactoryShould.cs +++ b/src/QuixStreams.Telemetry.UnitTests/StreamPipelineFactoryShould.cs @@ -2,11 +2,9 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Net; using FluentAssertions; using NSubstitute; using Quix.TestBase.Extensions; -using QuixStreams; using QuixStreams.Kafka.Transport; using QuixStreams.Telemetry.Models; using Xunit; @@ -18,7 +16,7 @@ public class StreamPipelineFactoryShould { public StreamPipelineFactoryShould(ITestOutputHelper outputHelper) { - QuixStreams.Logging.Factory = outputHelper.CreateLoggerFactory(); + Logging.Factory = outputHelper.CreateLoggerFactory(); } [Fact] diff --git a/src/QuixStreams.Telemetry/Core/IOComponentConnection.cs b/src/QuixStreams.Telemetry/Core/IOComponentConnection.cs index 1878bcd9..96720837 100644 --- a/src/QuixStreams.Telemetry/Core/IOComponentConnection.cs +++ b/src/QuixStreams.Telemetry/Core/IOComponentConnection.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using QuixStreams; using QuixStreams.Telemetry.Models; namespace QuixStreams.Telemetry @@ -12,7 +11,7 @@ namespace QuixStreams.Telemetry /// public sealed class IOComponentConnection : IIOComponentConnection { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly List> packageSubscriptions = new List>(); private readonly Dictionary>> modelSubscriptions = new Dictionary>>(); diff --git a/src/QuixStreams.Telemetry/Core/StreamPipeline.cs b/src/QuixStreams.Telemetry/Core/StreamPipeline.cs index 55852285..8056c62e 100644 --- a/src/QuixStreams.Telemetry/Core/StreamPipeline.cs +++ b/src/QuixStreams.Telemetry/Core/StreamPipeline.cs @@ -14,7 +14,7 @@ namespace QuixStreams.Telemetry /// public class StreamPipeline : IStreamPipeline, IDisposable { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly CancellationToken cancellationToken; private readonly List componentsList = new List(); diff --git a/src/QuixStreams.Telemetry/Core/StreamPipelineFactory.cs b/src/QuixStreams.Telemetry/Core/StreamPipelineFactory.cs index d5085fd8..da962635 100644 --- a/src/QuixStreams.Telemetry/Core/StreamPipelineFactory.cs +++ b/src/QuixStreams.Telemetry/Core/StreamPipelineFactory.cs @@ -1,15 +1,12 @@ using System; -using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using QuixStreams.Kafka; -using QuixStreams; using QuixStreams.Kafka.Transport; using QuixStreams.Telemetry.Models; - namespace QuixStreams.Telemetry { /// @@ -18,7 +15,7 @@ namespace QuixStreams.Telemetry /// internal class StreamPipelineFactory { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly object openCloseLock = new object(); private bool isOpen; private IKafkaTransportConsumer kafkaTransportConsumer; diff --git a/src/QuixStreams.Telemetry/Interfaces/IStreamPipeline.cs b/src/QuixStreams.Telemetry/Interfaces/IStreamPipeline.cs index a671de97..a3f4fe25 100644 --- a/src/QuixStreams.Telemetry/Interfaces/IStreamPipeline.cs +++ b/src/QuixStreams.Telemetry/Interfaces/IStreamPipeline.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Threading.Tasks; using QuixStreams.Telemetry.Models; diff --git a/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaConsumer.cs b/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaConsumer.cs index 2ccc6767..031548d7 100644 --- a/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaConsumer.cs +++ b/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaConsumer.cs @@ -2,7 +2,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Transactions; using Confluent.Kafka; using Microsoft.Extensions.Logging; using QuixStreams.Kafka; diff --git a/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaProducer.cs b/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaProducer.cs index 1231d5d6..675e40b3 100644 --- a/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaProducer.cs +++ b/src/QuixStreams.Telemetry/Kafka/TelemetryKafkaProducer.cs @@ -13,7 +13,7 @@ namespace QuixStreams.Telemetry.Kafka /// public class TelemetryKafkaProducer : StreamComponent, IDisposable { - private readonly ILogger logger = QuixStreams.Logging.CreateLogger(); + private readonly ILogger logger = Logging.CreateLogger(); private readonly IKafkaTransportProducer kafkaTransportProducer; @@ -34,7 +34,7 @@ public class TelemetryKafkaProducer : StreamComponent, IDisposable /// Stream Id to use to generate the new Stream on Kafka. If not specified, it generates a new Guid. public TelemetryKafkaProducer(IKafkaProducer producer, string streamId = null) { - this.kafkaTransportProducer = new QuixStreams.Kafka.Transport.KafkaTransportProducer(producer); + this.kafkaTransportProducer = new KafkaTransportProducer(producer); this.InitializeStreaming(streamId); } diff --git a/src/QuixStreams.Telemetry/Models/Telemetry/StreamProperties/StreamProperties.cs b/src/QuixStreams.Telemetry/Models/Telemetry/StreamProperties/StreamProperties.cs index 5b261fe5..a4f2cbb4 100644 --- a/src/QuixStreams.Telemetry/Models/Telemetry/StreamProperties/StreamProperties.cs +++ b/src/QuixStreams.Telemetry/Models/Telemetry/StreamProperties/StreamProperties.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Linq; namespace QuixStreams.Telemetry.Models { diff --git a/src/QuixStreams.Tester/Configuration.cs b/src/QuixStreams.Tester/Configuration.cs index a41c124a..2a2248f9 100644 --- a/src/QuixStreams.Tester/Configuration.cs +++ b/src/QuixStreams.Tester/Configuration.cs @@ -1,7 +1,6 @@ using System; using Microsoft.Extensions.Configuration; using QuixStreams.Streaming.Configuration; -using Enum = System.Enum; namespace QuixStreams.Tester { @@ -30,7 +29,7 @@ static Configuration() throw new Exception("Failed to parse Mode. Must be either Producer or Consumer"); } - Configuration.Mode = mode; + Mode = mode; Console.WriteLine($"In {mode} setting."); ProducerConfig = new ProducerConfig(); diff --git a/src/QuixStreams.Tester/Program.cs b/src/QuixStreams.Tester/Program.cs index 4a46ed75..2d60bae0 100644 --- a/src/QuixStreams.Tester/Program.cs +++ b/src/QuixStreams.Tester/Program.cs @@ -179,7 +179,7 @@ private static void GenerateEventData(IStreamProducer[] streams, CancellationTok var builder = stream.Events.AddTimestamp(DateTime.UtcNow); - builder.AddValue("an_event", Newtonsoft.Json.JsonConvert.SerializeObject(obj)); + builder.AddValue("an_event", JsonConvert.SerializeObject(obj)); if (random.Next(0, 2) == 1) builder.AddTag("Random_Tag", $"tag{random.Next(0, 10)}"); builder.Publish(); @@ -284,7 +284,7 @@ private static void Consume(KafkaStreamingClient client, CancellationToken cance if (Configuration.ConsumerConfig.PrintTimeseries) { Console.WriteLine($"Received new timeseries data for {consumer.StreamId}"); - var asJson = Newtonsoft.Json.JsonConvert.SerializeObject(args.Data, Formatting.Indented); + var asJson = JsonConvert.SerializeObject(args.Data, Formatting.Indented); Console.WriteLine(asJson); } }; @@ -295,7 +295,7 @@ private static void Consume(KafkaStreamingClient client, CancellationToken cance if (Configuration.ConsumerConfig.PrintEvents) { Console.WriteLine($"Received new event data for {consumer.StreamId}"); - var asJson = Newtonsoft.Json.JsonConvert.SerializeObject(args.Data, Formatting.Indented); + var asJson = JsonConvert.SerializeObject(args.Data, Formatting.Indented); Console.WriteLine(asJson); } }; diff --git a/src/QuixStreams.ThroughputTest/StreamingTest.cs b/src/QuixStreams.ThroughputTest/StreamingTest.cs index 34e249d3..878d4a1a 100644 --- a/src/QuixStreams.ThroughputTest/StreamingTest.cs +++ b/src/QuixStreams.ThroughputTest/StreamingTest.cs @@ -1,13 +1,4 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; -using System.Threading; -using QuixStreams.Streaming; -using QuixStreams.Telemetry.Models; - -namespace QuixStreams.ThroughputTest +namespace QuixStreams.ThroughputTest { /* public class StreamingTest diff --git a/src/QuixStreams.ThroughputTest/StreamingTestRaw.cs b/src/QuixStreams.ThroughputTest/StreamingTestRaw.cs index 5f3ade3f..24ba9dd8 100644 --- a/src/QuixStreams.ThroughputTest/StreamingTestRaw.cs +++ b/src/QuixStreams.ThroughputTest/StreamingTestRaw.cs @@ -8,10 +8,10 @@ using FluentAssertions.Extensions; using MathNet.Numerics.Statistics; using Microsoft.Extensions.Logging; -using QuixStreams; using QuixStreams.Kafka.Transport.SerDes; using QuixStreams.Streaming.UnitTests.Helpers; using QuixStreams.Telemetry.Models; +using Timer = System.Timers.Timer; namespace QuixStreams.ThroughputTest { @@ -20,17 +20,17 @@ public class StreamingTestRaw public const string TestName = "Baseline"; public void Run(CancellationToken ct, bool useBuffer = false) { - var currentProcess = System.Diagnostics.Process.GetCurrentProcess(); + var currentProcess = Process.GetCurrentProcess(); // usage stuff - QuixStreams.Logging.UpdateFactory(LogLevel.Debug); - QuixStreams.Kafka.Transport.SerDes.PackageSerializationSettings.Mode = PackageSerializationMode.Header; + Logging.UpdateFactory(LogLevel.Debug); + PackageSerializationSettings.Mode = PackageSerializationMode.Header; var client = new TestStreamingClient(CodecType.Protobuf); var topicConsumer = client.GetTopicConsumer(); var topicProducer = client.GetTopicProducer(); - var timer = new System.Timers.Timer() + var timer = new Timer() { Interval = 1000, Enabled = false, AutoReset = false };