From 63cbcabc963fe09ffad02a8f92b5d815da87ee06 Mon Sep 17 00:00:00 2001 From: Yury Pliner Date: Wed, 23 Jul 2014 23:26:19 +0600 Subject: [PATCH 1/2] An attempt to #12 --- src/TestHarness/Program.cs | 21 +-- src/kafka-net-client/JsonConsumer.cs | 6 +- src/kafka-net-client/JsonProducer.cs | 5 +- src/kafka-net/BrokerRouter.cs | 35 ++-- src/kafka-net/Configuration/Bus.cs | 37 ++++ src/kafka-net/Configuration/BusFactory.cs | 16 ++ .../Configuration/DefaultContainer.cs | 37 ++++ .../DefaultServiceRegistrator.cs | 16 ++ src/kafka-net/Configuration/IContainer.cs | 6 + .../Configuration/IServiceProvider.cs | 7 + .../Configuration/IServiceRegistrator.cs | 10 ++ .../ServiceImplementationNotFound.cs | 8 + src/kafka-net/Consumer.cs | 28 +-- src/kafka-net/ConsumerFactory.cs | 21 +++ .../Default/DefaultKafkaConnectionFactory.cs | 15 +- src/kafka-net/Interfaces/IBrokerRouter.cs | 2 - src/kafka-net/Interfaces/IBus.cs | 14 ++ src/kafka-net/Interfaces/IConsumer.cs | 12 ++ src/kafka-net/Interfaces/IConsumerFactory.cs | 7 + .../Interfaces/IKafkaConnectionFactory.cs | 8 +- src/kafka-net/Interfaces/IKafkaOptions.cs | 12 ++ .../Interfaces/IPartitionSelector.cs | 4 +- src/kafka-net/Interfaces/IProducer.cs | 12 ++ src/kafka-net/KafkaConnection.cs | 6 +- src/kafka-net/Model/ConsumerOptions.cs | 12 +- src/kafka-net/Model/KafkaOptions.cs | 40 +---- src/kafka-net/Producer.cs | 10 +- src/kafka-net/kafka-net.csproj | 18 +- src/kafka-tests/Fakes/BrokerRouterProxy.cs | 13 +- .../Integration/GzipProducerConsumerTests.cs | 17 +- .../KafkaConnectionIntegrationTests.cs | 4 +- .../Integration/OffsetManagementTests.cs | 18 +- .../ProducerConsumerIntegrationTests.cs | 46 +++-- src/kafka-tests/Unit/BrokerRouterTests.cs | 21 +-- src/kafka-tests/Unit/ConsumerTests.cs | 41 ++--- src/kafka-tests/Unit/KafkaConnectionTests.cs | 12 +- src/kafka-tests/Unit/ProducerTests.cs | 159 ++++++++++-------- 37 files changed, 492 insertions(+), 264 deletions(-) create mode 100644 src/kafka-net/Configuration/Bus.cs create mode 100644 src/kafka-net/Configuration/BusFactory.cs create mode 100644 src/kafka-net/Configuration/DefaultContainer.cs create mode 100644 src/kafka-net/Configuration/DefaultServiceRegistrator.cs create mode 100644 src/kafka-net/Configuration/IContainer.cs create mode 100644 src/kafka-net/Configuration/IServiceProvider.cs create mode 100644 src/kafka-net/Configuration/IServiceRegistrator.cs create mode 100644 src/kafka-net/Configuration/ServiceImplementationNotFound.cs create mode 100644 src/kafka-net/ConsumerFactory.cs create mode 100644 src/kafka-net/Interfaces/IBus.cs create mode 100644 src/kafka-net/Interfaces/IConsumer.cs create mode 100644 src/kafka-net/Interfaces/IConsumerFactory.cs create mode 100644 src/kafka-net/Interfaces/IKafkaOptions.cs create mode 100644 src/kafka-net/Interfaces/IProducer.cs diff --git a/src/TestHarness/Program.cs b/src/TestHarness/Program.cs index 2dd3e84f..440f6612 100644 --- a/src/TestHarness/Program.cs +++ b/src/TestHarness/Program.cs @@ -1,9 +1,8 @@ using System; using System.Threading.Tasks; -using KafkaNet; +using KafkaNet.Configuration; using KafkaNet.Model; using KafkaNet.Protocol; -using System.Collections.Generic; namespace TestHarness { @@ -11,17 +10,15 @@ class Program { static void Main(string[] args) { - var options = new KafkaOptions(new Uri("http://CSDKAFKA01:9092"), new Uri("http://CSDKAFKA02:9092")) + var bus = BusFactory.Create(new KafkaOptions { - Log = new ConsoleLog() - }; - var router = new BrokerRouter(options); - var client = new Producer(router); + Hosts = new[] {new Uri("http://CSDKAFKA01:9092"), new Uri("http://CSDKAFKA02:9092")} + }, x => { }); + Task.Factory.StartNew(() => { - var consumer = new Consumer(new ConsumerOptions("TestHarness", router)); - foreach (var data in consumer.Consume()) + foreach (var data in bus.Consume("TestHarness")) { Console.WriteLine("Response: P{0},O{1} : {2}", data.Meta.PartitionId, data.Meta.Offset, data.Value); } @@ -33,13 +30,11 @@ static void Main(string[] args) { var message = Console.ReadLine(); if (message == "quit") break; - client.SendMessageAsync("TestHarness", new[] {new Message {Value = message}}); + bus.SendMessageAsync("TestHarness", new[] {new Message {Value = message}}); } - using (client) - using (router) + using (bus) { - } } } diff --git a/src/kafka-net-client/JsonConsumer.cs b/src/kafka-net-client/JsonConsumer.cs index 07efb02d..ab329a51 100644 --- a/src/kafka-net-client/JsonConsumer.cs +++ b/src/kafka-net-client/JsonConsumer.cs @@ -9,13 +9,11 @@ namespace KafkaNet.Client { public class JsonConsumer : IDisposable { - private readonly ConsumerOptions _options; private readonly Consumer _consumer; - public JsonConsumer(ConsumerOptions options) + public JsonConsumer(IBrokerRouter brokerRouter, IKafkaLog log, ConsumerOptions options) { - _options = options; - _consumer = new Consumer(options); + _consumer = new Consumer(brokerRouter, log, options); } public IEnumerable> Consume() diff --git a/src/kafka-net-client/JsonProducer.cs b/src/kafka-net-client/JsonProducer.cs index 666fec03..067da55a 100644 --- a/src/kafka-net-client/JsonProducer.cs +++ b/src/kafka-net-client/JsonProducer.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using KafkaNet.Configuration; using KafkaNet.Protocol; using Newtonsoft.Json; @@ -11,9 +12,9 @@ public class JsonProducer : IDisposable { private readonly Producer _producer; - public JsonProducer(IBrokerRouter brokerRouter) + public JsonProducer(IBrokerRouter brokerRouter, IKafkaOptions options) { - _producer = new Producer(brokerRouter); + _producer = new Producer(brokerRouter, options); } public Task> Publish(string topic, IEnumerable messages, Int16 acks = 1, int timeoutMS = 1000) where T : class diff --git a/src/kafka-net/BrokerRouter.cs b/src/kafka-net/BrokerRouter.cs index 544b02b0..f2f2c2c7 100644 --- a/src/kafka-net/BrokerRouter.cs +++ b/src/kafka-net/BrokerRouter.cs @@ -2,7 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using KafkaNet.Model; +using KafkaNet.Configuration; using KafkaNet.Protocol; namespace KafkaNet @@ -20,17 +20,23 @@ namespace KafkaNet public class BrokerRouter : IBrokerRouter { private readonly object _threadLock = new object(); - private readonly KafkaOptions _kafkaOptions; + private readonly IKafkaOptions options; + private readonly IKafkaLog log; + private readonly IPartitionSelector partitionSelector; + private readonly IKafkaConnectionFactory connectionFactory; private readonly ConcurrentDictionary _brokerConnectionIndex = new ConcurrentDictionary(); private readonly ConcurrentDictionary _topicIndex = new ConcurrentDictionary(); private readonly List _defaultConnections = new List(); - public BrokerRouter(KafkaOptions kafkaOptions) + public BrokerRouter(IKafkaOptions options, IKafkaLog log, IPartitionSelector partitionSelector, IKafkaConnectionFactory connectionFactory) { - _kafkaOptions = kafkaOptions; + this.options = options; + this.log = log; + this.partitionSelector = partitionSelector; + this.connectionFactory = connectionFactory; _defaultConnections - .AddRange(kafkaOptions.KafkaServerUri.Distinct() - .Select(uri => _kafkaOptions.KafkaConnectionFactory.Create(uri, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log))); + .AddRange(options.Hosts.Distinct() + .Select(uri => connectionFactory.Create(uri, options.Timeout))); } /// @@ -76,7 +82,7 @@ public BrokerRoute SelectBrokerRoute(string topic, string key = null) if (cachedTopic == null) throw new InvalidTopicMetadataException(string.Format("The Metadata is invalid as it returned no data for the given topic:{0}", topic)); - var partition = _kafkaOptions.PartitionSelector.Select(cachedTopic, key); + var partition = partitionSelector.Select(cachedTopic, key); return GetCachedRoute(cachedTopic.Name, partition); } @@ -119,7 +125,7 @@ public void RefreshTopicMetadata(params string[] topics) { lock (_threadLock) { - _kafkaOptions.Log.DebugFormat("BrokerRouter: Refreshing metadata for topics: {0}", string.Join(",", topics)); + log.DebugFormat("BrokerRouter: Refreshing metadata for topics: {0}", string.Join(",", topics)); //use the initial default connections to retrieve metadata if (_defaultConnections.Count > 0) @@ -216,14 +222,14 @@ private void CycleConnectionsForTopicMetadata(IEnumerable conn } catch (Exception ex) { - _kafkaOptions.Log.WarnFormat("Failed to contact Kafka server={0}. Trying next default server. Exception={1}", conn.KafkaUri, ex); + log.WarnFormat("Failed to contact Kafka server={0}. Trying next default server. Exception={1}", conn.KafkaUri, ex); } } throw new ServerUnreachableException( string.Format( "Unable to query for metadata from any of the default Kafka servers. At least one provided server must be available. Server list: {0}", - string.Join(", ", _kafkaOptions.KafkaServerUri.Select(x => x.ToString())))); + string.Join(", ", options.Hosts.Select(x => x.ToString())))); } private void UpdateInternalMetadataCache(MetadataResponse metadata) @@ -232,16 +238,13 @@ private void UpdateInternalMetadataCache(MetadataResponse metadata) { var localBroker = broker; _brokerConnectionIndex.AddOrUpdate(broker.BrokerId, - i => - { - return _kafkaOptions.KafkaConnectionFactory.Create(localBroker.Address, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log); - }, + i => connectionFactory.Create(localBroker.Address, options.Timeout), (i, connection) => { //if a connection changes for a broker close old connection and create a new one if (connection.KafkaUri == localBroker.Address) return connection; - _kafkaOptions.Log.WarnFormat("Broker:{0} Uri changed from:{1} to {2}", localBroker.BrokerId, connection.KafkaUri, localBroker.Address); - using (connection) { return _kafkaOptions.KafkaConnectionFactory.Create(localBroker.Address, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log); } + log.WarnFormat("Broker:{0} Uri changed from:{1} to {2}", localBroker.BrokerId, connection.KafkaUri, localBroker.Address); + using (connection) { return connectionFactory.Create(localBroker.Address, options.Timeout); } }); } diff --git a/src/kafka-net/Configuration/Bus.cs b/src/kafka-net/Configuration/Bus.cs new file mode 100644 index 00000000..4316ca74 --- /dev/null +++ b/src/kafka-net/Configuration/Bus.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using KafkaNet.Protocol; + +namespace KafkaNet.Configuration +{ + public class Bus : IBus + { + private readonly IConsumerFactory _consumerFactory; + private readonly IProducer _producer; + + public Bus(IProducer producer, IConsumerFactory consumerFactory) + { + _producer = producer; + _consumerFactory = consumerFactory; + } + + public Task> SendMessageAsync(string topic, IEnumerable messages, short acks = 1, int timeoutMS = 1000, MessageCodec codec = MessageCodec.CodecNone) + { + return _producer.SendMessageAsync(topic, messages, acks, timeoutMS, codec); + } + + public IEnumerable Consume(string topic, CancellationToken? token = null) + { + var consumer = _consumerFactory.GetConsumer(topic); + return consumer.Consume(token); + } + + public void Dispose() + { + _producer.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/BusFactory.cs b/src/kafka-net/Configuration/BusFactory.cs new file mode 100644 index 00000000..be75b585 --- /dev/null +++ b/src/kafka-net/Configuration/BusFactory.cs @@ -0,0 +1,16 @@ +using System; + +namespace KafkaNet.Configuration +{ + public static class BusFactory + { + public static IBus Create(IKafkaOptions settings, Action configure) + { + var container = new DefaultContainer(); + DefaultServiceRegistrator.Register(container); + container.Register(settings); + configure(container); + return container.Resolve(); + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/DefaultContainer.cs b/src/kafka-net/Configuration/DefaultContainer.cs new file mode 100644 index 00000000..52fa02ca --- /dev/null +++ b/src/kafka-net/Configuration/DefaultContainer.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; + +namespace KafkaNet.Configuration +{ + public class DefaultContainer : IContainer + { + private readonly Dictionary _implementations = new Dictionary(); + private readonly Dictionary _factories = new Dictionary(); + + public T Resolve() where T: class + { + object implementation; + if (_implementations.TryGetValue(typeof (T), out implementation)) + return (T) implementation; + object factory; + if (_factories.TryGetValue(typeof(T), out factory)) + { + var serviceFactory = (Func)factory; + var serviceImplementation = serviceFactory(this); + _implementations.Add(typeof (T), serviceImplementation); + return serviceImplementation; + } + throw new ServiceImplementationNotFound(); + } + + public void Register(T implementation) where T : class + { + _implementations.Add(typeof (T), implementation); + } + + public void Register(Func factory) where T : class + { + _factories.Add(typeof(T), factory); + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/DefaultServiceRegistrator.cs b/src/kafka-net/Configuration/DefaultServiceRegistrator.cs new file mode 100644 index 00000000..99c45bb2 --- /dev/null +++ b/src/kafka-net/Configuration/DefaultServiceRegistrator.cs @@ -0,0 +1,16 @@ +namespace KafkaNet.Configuration +{ + public static class DefaultServiceRegistrator + { + public static void Register(IServiceRegistrator registrator) + { + registrator.Register(new DefaultTraceLog()); + registrator.Register(new DefaultPartitionSelector()); + registrator.Register(x => new DefaultKafkaConnectionFactory(x.Resolve())); + registrator.Register(x => new BrokerRouter(x.Resolve(), x.Resolve(), x.Resolve(), x.Resolve())); + registrator.Register(x => new Producer(x.Resolve(), x.Resolve())); + registrator.Register(x => new ConsumerFactory(x.Resolve(), x.Resolve())); + registrator.Register(x => new Bus(x.Resolve(), x.Resolve())); + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/IContainer.cs b/src/kafka-net/Configuration/IContainer.cs new file mode 100644 index 00000000..0e8c54f5 --- /dev/null +++ b/src/kafka-net/Configuration/IContainer.cs @@ -0,0 +1,6 @@ +namespace KafkaNet.Configuration +{ + public interface IContainer : IServiceProvider, IServiceRegistrator + { + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/IServiceProvider.cs b/src/kafka-net/Configuration/IServiceProvider.cs new file mode 100644 index 00000000..c43e1a1e --- /dev/null +++ b/src/kafka-net/Configuration/IServiceProvider.cs @@ -0,0 +1,7 @@ +namespace KafkaNet.Configuration +{ + public interface IServiceProvider + { + T Resolve() where T : class; + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/IServiceRegistrator.cs b/src/kafka-net/Configuration/IServiceRegistrator.cs new file mode 100644 index 00000000..ae6c2e24 --- /dev/null +++ b/src/kafka-net/Configuration/IServiceRegistrator.cs @@ -0,0 +1,10 @@ +using System; + +namespace KafkaNet.Configuration +{ + public interface IServiceRegistrator + { + void Register(T implementation) where T : class; + void Register(Func factory) where T : class; + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/ServiceImplementationNotFound.cs b/src/kafka-net/Configuration/ServiceImplementationNotFound.cs new file mode 100644 index 00000000..3fb24aeb --- /dev/null +++ b/src/kafka-net/Configuration/ServiceImplementationNotFound.cs @@ -0,0 +1,8 @@ +using System; + +namespace KafkaNet.Configuration +{ + public class ServiceImplementationNotFound : Exception + { + } +} \ No newline at end of file diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 19b69c46..aed38a76 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -16,8 +16,10 @@ namespace KafkaNet /// TODO: provide automatic offset saving when the feature is available in 0.8.2 /// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI /// - public class Consumer : IMetadataQueries, IDisposable + public class Consumer : IMetadataQueries, IConsumer { + private readonly IBrokerRouter _brokerRouter; + private readonly IKafkaLog _log; private readonly ConsumerOptions _options; private readonly BlockingCollection _fetchResponseQueue; private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); @@ -29,11 +31,13 @@ public class Consumer : IMetadataQueries, IDisposable private int _ensureOneThread; private Topic _topic; - public Consumer(ConsumerOptions options, params OffsetPosition[] positions) + public Consumer(IBrokerRouter brokerRouter, IKafkaLog log, ConsumerOptions options, params OffsetPosition[] positions) { + _brokerRouter = brokerRouter; + _log = log; _options = options; _fetchResponseQueue = new BlockingCollection(_options.ConsumerBufferSize); - _metadataQueries = new MetadataQueries(_options.Router); + _metadataQueries = new MetadataQueries(brokerRouter); //this timer will periodically look for new partitions and automatically add them to the consuming queue //using the same whitelist logic @@ -56,7 +60,7 @@ public Consumer(ConsumerOptions options, params OffsetPosition[] positions) /// Blocking enumberable of messages from Kafka. public IEnumerable Consume(CancellationToken? cancellationToken = null) { - _options.Log.DebugFormat("Consumer: Beginning consumption of topic: {0}", _options.Topic); + _log.DebugFormat("Consumer: Beginning consumption of topic: {0}", _options.Topic); _topicPartitionQueryTimer.Begin(); return _fetchResponseQueue.GetConsumingEnumerable(cancellationToken ?? new CancellationToken(false)); @@ -91,8 +95,8 @@ private void RefreshTopicPartitions() { if (Interlocked.Increment(ref _ensureOneThread) == 1) { - _options.Log.DebugFormat("Consumer: Refreshing partitions for topic: {0}", _options.Topic); - var topic = _options.Router.GetTopicMetadata(_options.Topic); + _log.DebugFormat("Consumer: Refreshing partitions for topic: {0}", _options.Topic); + var topic = _brokerRouter.GetTopicMetadata(_options.Topic); if (topic.Count <= 0) throw new ApplicationException(string.Format("Unable to get metadata for topic:{0}.", _options.Topic)); _topic = topic.First(); @@ -111,7 +115,7 @@ private void RefreshTopicPartitions() } catch (Exception ex) { - _options.Log.ErrorFormat("Exception occured trying to setup consumer for topic:{0}. Exception={1}", _options.Topic, ex); + _log.ErrorFormat("Exception occured trying to setup consumer for topic:{0}. Exception={1}", _options.Topic, ex); } finally { @@ -125,7 +129,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) { try { - _options.Log.DebugFormat("Consumer: Creating polling task for topic: {0} on parition: {1}", topic, partitionId); + _log.DebugFormat("Consumer: Creating polling task for topic: {0} on parition: {1}", topic, partitionId); while (_disposeToken.IsCancellationRequested == false) { try @@ -151,7 +155,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) }; //make request and post to queue - var route = _options.Router.SelectBrokerRoute(topic, partitionId); + var route = _brokerRouter.SelectBrokerRoute(topic, partitionId); var responses = route.Connection.SendAsync(fetchRequest).Result; if (responses.Count > 0) @@ -179,13 +183,13 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) } catch (Exception ex) { - _options.Log.ErrorFormat("Exception occured while polling topic:{0} partition:{1}. Polling will continue. Exception={2}", topic, partitionId, ex); + _log.ErrorFormat("Exception occured while polling topic:{0} partition:{1}. Polling will continue. Exception={2}", topic, partitionId, ex); } } } finally { - _options.Log.DebugFormat("Consumer: Disabling polling task for topic: {0} on parition: {1}", topic, partitionId); + _log.DebugFormat("Consumer: Disabling polling task for topic: {0} on parition: {1}", topic, partitionId); Task tempTask; _partitionPollingIndex.TryRemove(partitionId, out tempTask); } @@ -204,7 +208,7 @@ public Task> GetTopicOffsetAsync(string topic, int maxOffse public void Dispose() { - _options.Log.DebugFormat("Consumer: Disposing..."); + _log.DebugFormat("Consumer: Disposing..."); _disposeToken.Cancel(); using (_topicPartitionQueryTimer) using (_metadataQueries) diff --git a/src/kafka-net/ConsumerFactory.cs b/src/kafka-net/ConsumerFactory.cs new file mode 100644 index 00000000..b0abb009 --- /dev/null +++ b/src/kafka-net/ConsumerFactory.cs @@ -0,0 +1,21 @@ +using KafkaNet.Model; + +namespace KafkaNet +{ + public class ConsumerFactory : IConsumerFactory + { + private readonly IBrokerRouter _brokerRouter; + private readonly IKafkaLog _log; + + public ConsumerFactory(IBrokerRouter brokerRouter, IKafkaLog log) + { + _brokerRouter = brokerRouter; + _log = log; + } + + public IConsumer GetConsumer(string topic) + { + return new Consumer(_brokerRouter, _log, new ConsumerOptions(topic)); + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs b/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs index c55bd162..9dd894df 100644 --- a/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs +++ b/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs @@ -1,16 +1,19 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace KafkaNet { public class DefaultKafkaConnectionFactory : IKafkaConnectionFactory { - public IKafkaConnection Create(Uri kafkaAddress, int responseTimeoutMs, IKafkaLog log) + private readonly IKafkaLog kafkaLog; + + public DefaultKafkaConnectionFactory(IKafkaLog kafkaLog) { - return new KafkaConnection(new KafkaTcpSocket(log, kafkaAddress), responseTimeoutMs, log); + this.kafkaLog = kafkaLog; + } + + public IKafkaConnection Create(Uri kafkaAddress, TimeSpan timeout) + { + return new KafkaConnection(new KafkaTcpSocket(kafkaLog, kafkaAddress), (int)timeout.TotalMilliseconds, kafkaLog); } } } diff --git a/src/kafka-net/Interfaces/IBrokerRouter.cs b/src/kafka-net/Interfaces/IBrokerRouter.cs index 6225232d..3908963a 100644 --- a/src/kafka-net/Interfaces/IBrokerRouter.cs +++ b/src/kafka-net/Interfaces/IBrokerRouter.cs @@ -1,7 +1,5 @@ using System; using System.Collections.Generic; -using System.Threading.Tasks; -using KafkaNet.Model; using KafkaNet.Protocol; namespace KafkaNet diff --git a/src/kafka-net/Interfaces/IBus.cs b/src/kafka-net/Interfaces/IBus.cs new file mode 100644 index 00000000..23b757ff --- /dev/null +++ b/src/kafka-net/Interfaces/IBus.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using KafkaNet.Protocol; + +namespace KafkaNet +{ + public interface IBus : IDisposable + { + Task> SendMessageAsync(string topic, IEnumerable messages, Int16 acks = 1, int timeoutMS = 1000, MessageCodec codec = MessageCodec.CodecNone); + IEnumerable Consume(string topic, CancellationToken? token = null); + } +} \ No newline at end of file diff --git a/src/kafka-net/Interfaces/IConsumer.cs b/src/kafka-net/Interfaces/IConsumer.cs new file mode 100644 index 00000000..a7c6de22 --- /dev/null +++ b/src/kafka-net/Interfaces/IConsumer.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using KafkaNet.Protocol; + +namespace KafkaNet +{ + public interface IConsumer : IDisposable + { + IEnumerable Consume(CancellationToken? cancellationToken = null); + } +} \ No newline at end of file diff --git a/src/kafka-net/Interfaces/IConsumerFactory.cs b/src/kafka-net/Interfaces/IConsumerFactory.cs new file mode 100644 index 00000000..0c64759a --- /dev/null +++ b/src/kafka-net/Interfaces/IConsumerFactory.cs @@ -0,0 +1,7 @@ +namespace KafkaNet +{ + public interface IConsumerFactory + { + IConsumer GetConsumer(string topic); + } +} \ No newline at end of file diff --git a/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs b/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs index 47ab8a61..5fd04e5f 100644 --- a/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs +++ b/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs @@ -1,13 +1,9 @@ -using KafkaNet.Protocol; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System; namespace KafkaNet { public interface IKafkaConnectionFactory { - IKafkaConnection Create(Uri kafkaAddress, int responseTimeoutMs, IKafkaLog log); + IKafkaConnection Create(Uri kafkaAddress, TimeSpan timeout); } } diff --git a/src/kafka-net/Interfaces/IKafkaOptions.cs b/src/kafka-net/Interfaces/IKafkaOptions.cs new file mode 100644 index 00000000..665ec37a --- /dev/null +++ b/src/kafka-net/Interfaces/IKafkaOptions.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; + +namespace KafkaNet +{ + public interface IKafkaOptions + { + IEnumerable Hosts { get; } + TimeSpan Timeout { get; } + int QueueSize { get; } + } +} \ No newline at end of file diff --git a/src/kafka-net/Interfaces/IPartitionSelector.cs b/src/kafka-net/Interfaces/IPartitionSelector.cs index bb877a95..5582d6c9 100644 --- a/src/kafka-net/Interfaces/IPartitionSelector.cs +++ b/src/kafka-net/Interfaces/IPartitionSelector.cs @@ -1,6 +1,4 @@ -using System.Collections.Generic; -using KafkaNet.Model; -using KafkaNet.Protocol; +using KafkaNet.Protocol; namespace KafkaNet { diff --git a/src/kafka-net/Interfaces/IProducer.cs b/src/kafka-net/Interfaces/IProducer.cs new file mode 100644 index 00000000..48ae64b8 --- /dev/null +++ b/src/kafka-net/Interfaces/IProducer.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using KafkaNet.Protocol; + +namespace KafkaNet +{ + public interface IProducer : IDisposable + { + Task> SendMessageAsync(string topic, IEnumerable messages, Int16 acks = 1, int timeoutMS = 1000, MessageCodec codec = MessageCodec.CodecNone); + } +} \ No newline at end of file diff --git a/src/kafka-net/KafkaConnection.cs b/src/kafka-net/KafkaConnection.cs index 76cad887..c3aae670 100644 --- a/src/kafka-net/KafkaConnection.cs +++ b/src/kafka-net/KafkaConnection.cs @@ -20,8 +20,6 @@ namespace KafkaNet /// public class KafkaConnection : IKafkaConnection { - private const int DefaultResponseTimeoutMs = 30000; - private readonly ConcurrentDictionary _requestIndex = new ConcurrentDictionary(); private readonly IScheduledTimer _responseTimeoutTimer; private readonly int _responseTimeoutMS; @@ -40,10 +38,10 @@ public class KafkaConnection : IKafkaConnection /// Logging interface used to record any log messages created by the connection. /// The kafka socket initialized to the kafka server. /// The amount of time to wait for a message response to be received after sending message to Kafka. - public KafkaConnection(IKafkaTcpSocket client, int responseTimeoutMs = DefaultResponseTimeoutMs, IKafkaLog log = null) + public KafkaConnection(IKafkaTcpSocket client, int responseTimeoutMs, IKafkaLog log) { _client = client; - _log = log ?? new DefaultTraceLog(); + _log = log; _responseTimeoutMS = responseTimeoutMs; _responseTimeoutTimer = new ScheduledTimer() .Do(ResponseTimeoutCheck) diff --git a/src/kafka-net/Model/ConsumerOptions.cs b/src/kafka-net/Model/ConsumerOptions.cs index d097822f..7b415878 100644 --- a/src/kafka-net/Model/ConsumerOptions.cs +++ b/src/kafka-net/Model/ConsumerOptions.cs @@ -17,14 +17,6 @@ public class ConsumerOptions /// public List PartitionWhitelist { get; set; } /// - /// Log object to record operational messages. - /// - public IKafkaLog Log { get; set; } - /// - /// The broker router used to provide connection to each partition server. - /// - public IBrokerRouter Router { get; set; } - /// /// The time in milliseconds between queries to look for any new partitions being created. /// public int TopicPartitionQueryTimeMs { get; set; } @@ -37,12 +29,10 @@ public class ConsumerOptions /// public int BackoffInterval { get; set; } - public ConsumerOptions(string topic, IBrokerRouter router) + public ConsumerOptions(string topic) { Topic = topic; - Router = router; PartitionWhitelist = new List(); - Log = new DefaultTraceLog(); TopicPartitionQueryTimeMs = (int)TimeSpan.FromMinutes(15).TotalMilliseconds; ConsumerBufferSize = DefaultMaxConsumerBufferSize; BackoffInterval = DefaultBackoffIntervalMS; diff --git a/src/kafka-net/Model/KafkaOptions.cs b/src/kafka-net/Model/KafkaOptions.cs index 7b5b178f..3201f09b 100644 --- a/src/kafka-net/Model/KafkaOptions.cs +++ b/src/kafka-net/Model/KafkaOptions.cs @@ -1,41 +1,19 @@ using System; using System.Collections.Generic; -using System.Linq; +using KafkaNet.Configuration; namespace KafkaNet.Model { - public class KafkaOptions + public class KafkaOptions : IKafkaOptions { - private const int DefaultResponseTimeout = 30000; - - /// - /// List of Uri connections to kafka servers. The are used to query for metadata from Kafka. More than one is recommended. - /// - public List KafkaServerUri { get; set; } - /// - /// Provides a factory for creating new kafka connections. - /// - public IKafkaConnectionFactory KafkaConnectionFactory { get; set; } - /// - /// Selector function for routing messages to partitions. Default is key/hash and round robin. - /// - public IPartitionSelector PartitionSelector { get; set; } - /// - /// Timeout length in milliseconds waiting for a response from kafka. - /// - public int ResponseTimeoutMs { get; set; } - /// - /// Log object to record operational messages. - /// - public IKafkaLog Log { get; set; } - - public KafkaOptions(params Uri[] kafkaServerUri) + public KafkaOptions() { - KafkaServerUri = kafkaServerUri.ToList(); - KafkaConnectionFactory = new DefaultKafkaConnectionFactory(); - PartitionSelector = new DefaultPartitionSelector(); - Log = new DefaultTraceLog(); - ResponseTimeoutMs = DefaultResponseTimeout; + Timeout = TimeSpan.FromSeconds(30); + QueueSize = int.MaxValue; } + + public IEnumerable Hosts { get; set; } + public TimeSpan Timeout { get; set; } + public int QueueSize { get; set; } } } diff --git a/src/kafka-net/Producer.cs b/src/kafka-net/Producer.cs index 3e0fabff..d3f15990 100644 --- a/src/kafka-net/Producer.cs +++ b/src/kafka-net/Producer.cs @@ -1,9 +1,8 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using KafkaNet.Model; +using KafkaNet.Configuration; using KafkaNet.Protocol; using System.Threading; @@ -13,7 +12,7 @@ namespace KafkaNet /// /// Provides a simplified high level API for producing messages on a topic. /// - public class Producer : IMetadataQueries, IDisposable + public class Producer : IMetadataQueries, IProducer { private readonly IBrokerRouter _router; private readonly SemaphoreSlim _sendSemaphore; @@ -30,6 +29,7 @@ public class Producer : IMetadataQueries, IDisposable /// /// The router used to direct produced messages to the correct partition. /// The maximum async calls allowed before blocking new requests. -1 indicates unlimited. + /// /// /// The maximumAsyncQueue parameter provides a mechanism for blocking an async request return if the amount of requests queue is /// over a certain limit. This is usefull if a client is trying to push a large stream of documents through the producer and @@ -39,11 +39,11 @@ public class Producer : IMetadataQueries, IDisposable /// messages sitting in the async queue then a message may spend its entire timeout cycle waiting in this queue and never getting /// attempted to send to Kafka before a timeout exception is thrown. /// - public Producer(IBrokerRouter brokerRouter, int maximumAsyncQueue = -1) + public Producer(IBrokerRouter brokerRouter, IKafkaOptions options) { _router = brokerRouter; _metadataQueries = new MetadataQueries(_router); - _maximumAsyncQueue = maximumAsyncQueue == -1 ? int.MaxValue : maximumAsyncQueue; + _maximumAsyncQueue = options.QueueSize; _sendSemaphore = new SemaphoreSlim(_maximumAsyncQueue, _maximumAsyncQueue); } diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj index a0a5dc6a..c66a01db 100644 --- a/src/kafka-net/kafka-net.csproj +++ b/src/kafka-net/kafka-net.csproj @@ -43,9 +43,23 @@ + + + + + + + + + + + + + + @@ -85,9 +99,7 @@ - - - + + \ No newline at end of file diff --git a/src/kafka-net.windsor/packages.config b/src/kafka-net.windsor/packages.config new file mode 100644 index 00000000..010612fa --- /dev/null +++ b/src/kafka-net.windsor/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/kafka-net/Configuration/Bus.cs b/src/kafka-net/Configuration/Bus.cs index 4316ca74..2eadc7ab 100644 --- a/src/kafka-net/Configuration/Bus.cs +++ b/src/kafka-net/Configuration/Bus.cs @@ -1,6 +1,4 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using KafkaNet.Protocol; diff --git a/src/kafka-net/Configuration/BusFactory.cs b/src/kafka-net/Configuration/BusFactory.cs index be75b585..1092fdae 100644 --- a/src/kafka-net/Configuration/BusFactory.cs +++ b/src/kafka-net/Configuration/BusFactory.cs @@ -2,14 +2,24 @@ namespace KafkaNet.Configuration { - public static class BusFactory + public class BusFactory { - public static IBus Create(IKafkaOptions settings, Action configure) + private readonly IContainer container; + + public BusFactory(IContainer container) + { + this.container = container; + } + + public BusFactory() : this(new DefaultContainer()) + { + } + + public IBus Create(IKafkaOptions settings, Action configure) { - var container = new DefaultContainer(); - DefaultServiceRegistrator.Register(container); - container.Register(settings); configure(container); + container.Register(_ => settings); + DefaultServiceRegistrator.Register(container); return container.Resolve(); } } diff --git a/src/kafka-net/Configuration/DefaultContainer.cs b/src/kafka-net/Configuration/DefaultContainer.cs index 52fa02ca..1911e2b4 100644 --- a/src/kafka-net/Configuration/DefaultContainer.cs +++ b/src/kafka-net/Configuration/DefaultContainer.cs @@ -5,33 +5,30 @@ namespace KafkaNet.Configuration { public class DefaultContainer : IContainer { - private readonly Dictionary _implementations = new Dictionary(); private readonly Dictionary _factories = new Dictionary(); + private readonly Dictionary _instances = new Dictionary(); public T Resolve() where T: class { - object implementation; - if (_implementations.TryGetValue(typeof (T), out implementation)) - return (T) implementation; + object instance; + if (_instances.TryGetValue(typeof (T), out instance)) + return (T) instance; object factory; - if (_factories.TryGetValue(typeof(T), out factory)) + if (_factories.TryGetValue(typeof (T), out factory)) { - var serviceFactory = (Func)factory; - var serviceImplementation = serviceFactory(this); - _implementations.Add(typeof (T), serviceImplementation); - return serviceImplementation; + var newInstance = ((Func) factory)(this); + _instances.Add(typeof(T), newInstance); + return newInstance; } - throw new ServiceImplementationNotFound(); + throw new ServiceNotFound(string.Format("No service of type {0} has been registered", typeof(T).Name)); } - public void Register(T implementation) where T : class - { - _implementations.Add(typeof (T), implementation); - } - - public void Register(Func factory) where T : class + public IServiceRegistrator Register(Func factory) where T : class { + if(_factories.ContainsKey(typeof(T))) + return this; _factories.Add(typeof(T), factory); + return this; } } } \ No newline at end of file diff --git a/src/kafka-net/Configuration/DefaultServiceRegistrator.cs b/src/kafka-net/Configuration/DefaultServiceRegistrator.cs index 99c45bb2..a8084004 100644 --- a/src/kafka-net/Configuration/DefaultServiceRegistrator.cs +++ b/src/kafka-net/Configuration/DefaultServiceRegistrator.cs @@ -4,13 +4,13 @@ public static class DefaultServiceRegistrator { public static void Register(IServiceRegistrator registrator) { - registrator.Register(new DefaultTraceLog()); - registrator.Register(new DefaultPartitionSelector()); - registrator.Register(x => new DefaultKafkaConnectionFactory(x.Resolve())); - registrator.Register(x => new BrokerRouter(x.Resolve(), x.Resolve(), x.Resolve(), x.Resolve())); - registrator.Register(x => new Producer(x.Resolve(), x.Resolve())); - registrator.Register(x => new ConsumerFactory(x.Resolve(), x.Resolve())); - registrator.Register(x => new Bus(x.Resolve(), x.Resolve())); + registrator.Register(_ => new DefaultTraceLog()) + .Register(_ => new DefaultPartitionSelector()) + .Register(x => new DefaultKafkaConnectionFactory(x.Resolve())) + .Register(x => new BrokerRouter(x.Resolve(), x.Resolve(), x.Resolve(), x.Resolve())) + .Register(x => new Producer(x.Resolve(), x.Resolve())) + .Register(x => new ConsumerFactory(x.Resolve(), x.Resolve())) + .Register(x => new Bus(x.Resolve(), x.Resolve())); } } } \ No newline at end of file diff --git a/src/kafka-net/Configuration/IServiceRegistrator.cs b/src/kafka-net/Configuration/IServiceRegistrator.cs index ae6c2e24..cea39368 100644 --- a/src/kafka-net/Configuration/IServiceRegistrator.cs +++ b/src/kafka-net/Configuration/IServiceRegistrator.cs @@ -4,7 +4,6 @@ namespace KafkaNet.Configuration { public interface IServiceRegistrator { - void Register(T implementation) where T : class; - void Register(Func factory) where T : class; + IServiceRegistrator Register(Func factory) where T : class; } } \ No newline at end of file diff --git a/src/kafka-net/Configuration/ServiceImplementationNotFound.cs b/src/kafka-net/Configuration/ServiceImplementationNotFound.cs deleted file mode 100644 index 3fb24aeb..00000000 --- a/src/kafka-net/Configuration/ServiceImplementationNotFound.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System; - -namespace KafkaNet.Configuration -{ - public class ServiceImplementationNotFound : Exception - { - } -} \ No newline at end of file diff --git a/src/kafka-net/Configuration/ServiceNotFound.cs b/src/kafka-net/Configuration/ServiceNotFound.cs new file mode 100644 index 00000000..e9e8774e --- /dev/null +++ b/src/kafka-net/Configuration/ServiceNotFound.cs @@ -0,0 +1,16 @@ +using System; + +namespace KafkaNet.Configuration +{ + public class ServiceNotFound : Exception + { + public ServiceNotFound(string message, Exception exception) : base(message, exception) + { + } + + public ServiceNotFound(string message) + : base(message) + { + } + } +} \ No newline at end of file diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj index c66a01db..6571b99e 100644 --- a/src/kafka-net/kafka-net.csproj +++ b/src/kafka-net/kafka-net.csproj @@ -55,7 +55,7 @@ - + diff --git a/src/kafka-tests/Configuration/AnotherService.cs b/src/kafka-tests/Configuration/AnotherService.cs new file mode 100644 index 00000000..b3fa468a --- /dev/null +++ b/src/kafka-tests/Configuration/AnotherService.cs @@ -0,0 +1,6 @@ +namespace kafka_tests.Configuration +{ + public class AnotherService : IService + { + } +} \ No newline at end of file diff --git a/src/kafka-tests/Configuration/DefaultContainerTest.cs b/src/kafka-tests/Configuration/DefaultContainerTest.cs new file mode 100644 index 00000000..6e80de9e --- /dev/null +++ b/src/kafka-tests/Configuration/DefaultContainerTest.cs @@ -0,0 +1,42 @@ +using KafkaNet.Configuration; +using NUnit.Framework; + +namespace kafka_tests.Configuration +{ + [TestFixture] + public class DefaultContainerTest + { + private DefaultContainer _defaultContainer; + + [SetUp] + public void SetUp() + { + _defaultContainer = new DefaultContainer(); + } + + [Test] + public void FirstRegistrationShouldWin() + { + var oneService = new OneService(); + var anotherService = new AnotherService(); + _defaultContainer.Register(_ => oneService); + _defaultContainer.Register(_ => anotherService); + Assert.AreEqual(oneService, _defaultContainer.Resolve()); + } + + [Test] + [ExpectedException(typeof (ServiceNotFound))] + public void ShouldThrowExceptionIfServiceNotRegistered() + { + _defaultContainer.Resolve(); + } + + [Test] + public void ShouldRememberCreatedInstance() + { + _defaultContainer.Register(_ => new OneService()); + var instance = _defaultContainer.Resolve(); + Assert.AreSame(instance, _defaultContainer.Resolve()); + } + } +} \ No newline at end of file diff --git a/src/kafka-tests/Configuration/IService.cs b/src/kafka-tests/Configuration/IService.cs new file mode 100644 index 00000000..32ca517a --- /dev/null +++ b/src/kafka-tests/Configuration/IService.cs @@ -0,0 +1,6 @@ +namespace kafka_tests.Configuration +{ + public interface IService + { + } +} \ No newline at end of file diff --git a/src/kafka-tests/Configuration/OneService.cs b/src/kafka-tests/Configuration/OneService.cs new file mode 100644 index 00000000..7fec1c1c --- /dev/null +++ b/src/kafka-tests/Configuration/OneService.cs @@ -0,0 +1,6 @@ +namespace kafka_tests.Configuration +{ + public class OneService : IService + { + } +} \ No newline at end of file diff --git a/src/kafka-tests/Configuration/WindsorAdapterTest.cs b/src/kafka-tests/Configuration/WindsorAdapterTest.cs new file mode 100644 index 00000000..70a6cd3d --- /dev/null +++ b/src/kafka-tests/Configuration/WindsorAdapterTest.cs @@ -0,0 +1,44 @@ +using Castle.Windsor; +using KafkaNet.Configuration; +using KafkaNet.Windsor; +using NUnit.Framework; + +namespace kafka_tests.Configuration +{ + [TestFixture] + public class WindsorAdapterTest + { + private WindsorAdapter _defaultContainer; + + [SetUp] + public void SetUp() + { + _defaultContainer = new WindsorAdapter(new WindsorContainer()); + } + + [Test] + public void FirstRegistrationShouldWin() + { + var oneService = new OneService(); + var anotherService = new AnotherService(); + _defaultContainer.Register(_ => oneService); + _defaultContainer.Register(_ => anotherService); + Assert.AreEqual(oneService, _defaultContainer.Resolve()); + } + + [Test] + [ExpectedException(typeof (ServiceNotFound))] + public void ShouldThrowExceptionIfServiceNotRegistered() + { + _defaultContainer.Resolve(); + } + + [Test] + public void ShouldRememberCreatedInstance() + { + _defaultContainer.Register(_ => new OneService()); + var instance = _defaultContainer.Resolve(); + Assert.AreSame(instance, _defaultContainer.Resolve()); + } + } +} \ No newline at end of file diff --git a/src/kafka-tests/kafka-tests.csproj b/src/kafka-tests/kafka-tests.csproj index 7feec390..75b63040 100644 --- a/src/kafka-tests/kafka-tests.csproj +++ b/src/kafka-tests/kafka-tests.csproj @@ -32,6 +32,12 @@ 4 + + ..\packages\Castle.Core.3.3.0\lib\net45\Castle.Core.dll + + + ..\packages\Castle.Windsor.3.3.0\lib\net45\Castle.Windsor.dll + ..\packages\Moq.4.0.10827\lib\NET40\Moq.dll @@ -57,6 +63,11 @@ + + + + + @@ -86,6 +97,10 @@ + + {29ed0088-40f5-452f-95d4-9f8cd33156d3} + kafka-net.windsor + {1343eb68-55cb-4452-8386-24a9989de1c0} kafka-net diff --git a/src/kafka-tests/packages.config b/src/kafka-tests/packages.config index 3a923378..054c8768 100644 --- a/src/kafka-tests/packages.config +++ b/src/kafka-tests/packages.config @@ -1,5 +1,7 @@  + +