diff --git a/src/TestHarness/Program.cs b/src/TestHarness/Program.cs index 2dd3e84f..228a8c1a 100644 --- a/src/TestHarness/Program.cs +++ b/src/TestHarness/Program.cs @@ -1,9 +1,8 @@ using System; using System.Threading.Tasks; -using KafkaNet; +using KafkaNet.Configuration; using KafkaNet.Model; using KafkaNet.Protocol; -using System.Collections.Generic; namespace TestHarness { @@ -11,17 +10,15 @@ class Program { static void Main(string[] args) { - var options = new KafkaOptions(new Uri("http://CSDKAFKA01:9092"), new Uri("http://CSDKAFKA02:9092")) + var bus = new BusFactory().Create(new KafkaOptions { - Log = new ConsoleLog() - }; - var router = new BrokerRouter(options); - var client = new Producer(router); + Hosts = new[] {new Uri("http://CSDKAFKA01:9092"), new Uri("http://CSDKAFKA02:9092")} + }, x => { }); + Task.Factory.StartNew(() => { - var consumer = new Consumer(new ConsumerOptions("TestHarness", router)); - foreach (var data in consumer.Consume()) + foreach (var data in bus.Consume("TestHarness")) { Console.WriteLine("Response: P{0},O{1} : {2}", data.Meta.PartitionId, data.Meta.Offset, data.Value); } @@ -33,13 +30,11 @@ static void Main(string[] args) { var message = Console.ReadLine(); if (message == "quit") break; - client.SendMessageAsync("TestHarness", new[] {new Message {Value = message}}); + bus.SendMessageAsync("TestHarness", new[] {new Message {Value = message}}); } - using (client) - using (router) + using (bus) { - } } } diff --git a/src/kafka-net-client/JsonConsumer.cs b/src/kafka-net-client/JsonConsumer.cs index 07efb02d..ab329a51 100644 --- a/src/kafka-net-client/JsonConsumer.cs +++ b/src/kafka-net-client/JsonConsumer.cs @@ -9,13 +9,11 @@ namespace KafkaNet.Client { public class JsonConsumer : IDisposable { - private readonly ConsumerOptions _options; private readonly Consumer _consumer; - public JsonConsumer(ConsumerOptions options) + public JsonConsumer(IBrokerRouter brokerRouter, IKafkaLog log, ConsumerOptions options) { - _options = options; - _consumer = new Consumer(options); + _consumer = new Consumer(brokerRouter, log, options); } public IEnumerable> Consume() diff --git a/src/kafka-net-client/JsonProducer.cs b/src/kafka-net-client/JsonProducer.cs index 666fec03..067da55a 100644 --- a/src/kafka-net-client/JsonProducer.cs +++ b/src/kafka-net-client/JsonProducer.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using KafkaNet.Configuration; using KafkaNet.Protocol; using Newtonsoft.Json; @@ -11,9 +12,9 @@ public class JsonProducer : IDisposable { private readonly Producer _producer; - public JsonProducer(IBrokerRouter brokerRouter) + public JsonProducer(IBrokerRouter brokerRouter, IKafkaOptions options) { - _producer = new Producer(brokerRouter); + _producer = new Producer(brokerRouter, options); } public Task> Publish(string topic, IEnumerable messages, Int16 acks = 1, int timeoutMS = 1000) where T : class diff --git a/src/kafka-net.sln b/src/kafka-net.sln index e73809e7..835f34ee 100644 --- a/src/kafka-net.sln +++ b/src/kafka-net.sln @@ -1,8 +1,6 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 2013 -VisualStudioVersion = 12.0.30110.0 -MinimumVisualStudioVersion = 10.0.40219.1 +# Visual Studio 2012 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-net", "kafka-net\kafka-net.csproj", "{1343EB68-55CB-4452-8386-24A9989DE1C0}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-tests", "kafka-tests\kafka-tests.csproj", "{D80AE407-BB81-4C11-BFDC-5DD463F8B1BF}" @@ -28,6 +26,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution ..\version = ..\version EndProjectSection EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "kafka-net.windsor", "kafka-net.windsor\kafka-net.windsor.csproj", "{29ED0088-40F5-452F-95D4-9F8CD33156D3}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -50,6 +50,10 @@ Global {FF7E1490-7453-47B0-A12E-64FED927825A}.Debug|Any CPU.Build.0 = Debug|Any CPU {FF7E1490-7453-47B0-A12E-64FED927825A}.Release|Any CPU.ActiveCfg = Release|Any CPU {FF7E1490-7453-47B0-A12E-64FED927825A}.Release|Any CPU.Build.0 = Release|Any CPU + {29ED0088-40F5-452F-95D4-9F8CD33156D3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {29ED0088-40F5-452F-95D4-9F8CD33156D3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {29ED0088-40F5-452F-95D4-9F8CD33156D3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {29ED0088-40F5-452F-95D4-9F8CD33156D3}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/kafka-net.windsor/Properties/AssemblyInfo.cs b/src/kafka-net.windsor/Properties/AssemblyInfo.cs new file mode 100644 index 00000000..d161c1db --- /dev/null +++ b/src/kafka-net.windsor/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("kafka-net.windsor")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("kafka-net.windsor")] +[assembly: AssemblyCopyright("Copyright © 2014")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("fdefc18c-e9e3-4150-aecf-bbe1da358dae")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/kafka-net.windsor/WindsorAdapter.cs b/src/kafka-net.windsor/WindsorAdapter.cs new file mode 100644 index 00000000..fb1f7941 --- /dev/null +++ b/src/kafka-net.windsor/WindsorAdapter.cs @@ -0,0 +1,40 @@ +using Castle.MicroKernel.Registration; +using Castle.Windsor; +using KafkaNet.Configuration; + +namespace KafkaNet.Windsor +{ + public class WindsorAdapter : IContainer + { + private readonly IWindsorContainer _container; + + public WindsorAdapter(IWindsorContainer container) + { + this._container = container; + } + + public T Resolve() where T : class + { + try + { + return _container.Resolve(); + } + catch (Castle.MicroKernel.ComponentNotFoundException exception) + { + throw new ServiceNotFound(string.Format("No service of type {0} has been registered", typeof(T).Name), exception); + } + } + + public IServiceRegistrator Register(System.Func factory) where T : class + { + if (!_container.Kernel.HasComponent(typeof(T))) + { + _container.Register( + Component.For().UsingFactoryMethod(() => factory(this)).LifeStyle.Singleton + ); + } + return this; + + } + } +} \ No newline at end of file diff --git a/src/kafka-net.windsor/kafka-net.windsor.csproj b/src/kafka-net.windsor/kafka-net.windsor.csproj new file mode 100644 index 00000000..bc764b53 --- /dev/null +++ b/src/kafka-net.windsor/kafka-net.windsor.csproj @@ -0,0 +1,82 @@ + + + + + Debug + AnyCPU + {29ED0088-40F5-452F-95D4-9F8CD33156D3} + Library + Properties + KafkaNet.Windsor + kafka-net.windsor + v4.5 + 512 + ..\ + true + + + AnyCPU + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + AnyCPU + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + ..\packages\Castle.Core.3.3.0\lib\net45\Castle.Core.dll + + + ..\packages\Castle.Windsor.3.3.0\lib\net45\Castle.Windsor.dll + + + + + + + + + + + + + + + + + + + {1343eb68-55cb-4452-8386-24a9989de1c0} + kafka-net + + + + + + + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + \ No newline at end of file diff --git a/src/kafka-net.windsor/packages.config b/src/kafka-net.windsor/packages.config new file mode 100644 index 00000000..010612fa --- /dev/null +++ b/src/kafka-net.windsor/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/kafka-net/BrokerRouter.cs b/src/kafka-net/BrokerRouter.cs index 544b02b0..f2f2c2c7 100644 --- a/src/kafka-net/BrokerRouter.cs +++ b/src/kafka-net/BrokerRouter.cs @@ -2,7 +2,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using KafkaNet.Model; +using KafkaNet.Configuration; using KafkaNet.Protocol; namespace KafkaNet @@ -20,17 +20,23 @@ namespace KafkaNet public class BrokerRouter : IBrokerRouter { private readonly object _threadLock = new object(); - private readonly KafkaOptions _kafkaOptions; + private readonly IKafkaOptions options; + private readonly IKafkaLog log; + private readonly IPartitionSelector partitionSelector; + private readonly IKafkaConnectionFactory connectionFactory; private readonly ConcurrentDictionary _brokerConnectionIndex = new ConcurrentDictionary(); private readonly ConcurrentDictionary _topicIndex = new ConcurrentDictionary(); private readonly List _defaultConnections = new List(); - public BrokerRouter(KafkaOptions kafkaOptions) + public BrokerRouter(IKafkaOptions options, IKafkaLog log, IPartitionSelector partitionSelector, IKafkaConnectionFactory connectionFactory) { - _kafkaOptions = kafkaOptions; + this.options = options; + this.log = log; + this.partitionSelector = partitionSelector; + this.connectionFactory = connectionFactory; _defaultConnections - .AddRange(kafkaOptions.KafkaServerUri.Distinct() - .Select(uri => _kafkaOptions.KafkaConnectionFactory.Create(uri, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log))); + .AddRange(options.Hosts.Distinct() + .Select(uri => connectionFactory.Create(uri, options.Timeout))); } /// @@ -76,7 +82,7 @@ public BrokerRoute SelectBrokerRoute(string topic, string key = null) if (cachedTopic == null) throw new InvalidTopicMetadataException(string.Format("The Metadata is invalid as it returned no data for the given topic:{0}", topic)); - var partition = _kafkaOptions.PartitionSelector.Select(cachedTopic, key); + var partition = partitionSelector.Select(cachedTopic, key); return GetCachedRoute(cachedTopic.Name, partition); } @@ -119,7 +125,7 @@ public void RefreshTopicMetadata(params string[] topics) { lock (_threadLock) { - _kafkaOptions.Log.DebugFormat("BrokerRouter: Refreshing metadata for topics: {0}", string.Join(",", topics)); + log.DebugFormat("BrokerRouter: Refreshing metadata for topics: {0}", string.Join(",", topics)); //use the initial default connections to retrieve metadata if (_defaultConnections.Count > 0) @@ -216,14 +222,14 @@ private void CycleConnectionsForTopicMetadata(IEnumerable conn } catch (Exception ex) { - _kafkaOptions.Log.WarnFormat("Failed to contact Kafka server={0}. Trying next default server. Exception={1}", conn.KafkaUri, ex); + log.WarnFormat("Failed to contact Kafka server={0}. Trying next default server. Exception={1}", conn.KafkaUri, ex); } } throw new ServerUnreachableException( string.Format( "Unable to query for metadata from any of the default Kafka servers. At least one provided server must be available. Server list: {0}", - string.Join(", ", _kafkaOptions.KafkaServerUri.Select(x => x.ToString())))); + string.Join(", ", options.Hosts.Select(x => x.ToString())))); } private void UpdateInternalMetadataCache(MetadataResponse metadata) @@ -232,16 +238,13 @@ private void UpdateInternalMetadataCache(MetadataResponse metadata) { var localBroker = broker; _brokerConnectionIndex.AddOrUpdate(broker.BrokerId, - i => - { - return _kafkaOptions.KafkaConnectionFactory.Create(localBroker.Address, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log); - }, + i => connectionFactory.Create(localBroker.Address, options.Timeout), (i, connection) => { //if a connection changes for a broker close old connection and create a new one if (connection.KafkaUri == localBroker.Address) return connection; - _kafkaOptions.Log.WarnFormat("Broker:{0} Uri changed from:{1} to {2}", localBroker.BrokerId, connection.KafkaUri, localBroker.Address); - using (connection) { return _kafkaOptions.KafkaConnectionFactory.Create(localBroker.Address, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log); } + log.WarnFormat("Broker:{0} Uri changed from:{1} to {2}", localBroker.BrokerId, connection.KafkaUri, localBroker.Address); + using (connection) { return connectionFactory.Create(localBroker.Address, options.Timeout); } }); } diff --git a/src/kafka-net/Configuration/Bus.cs b/src/kafka-net/Configuration/Bus.cs new file mode 100644 index 00000000..2eadc7ab --- /dev/null +++ b/src/kafka-net/Configuration/Bus.cs @@ -0,0 +1,35 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using KafkaNet.Protocol; + +namespace KafkaNet.Configuration +{ + public class Bus : IBus + { + private readonly IConsumerFactory _consumerFactory; + private readonly IProducer _producer; + + public Bus(IProducer producer, IConsumerFactory consumerFactory) + { + _producer = producer; + _consumerFactory = consumerFactory; + } + + public Task> SendMessageAsync(string topic, IEnumerable messages, short acks = 1, int timeoutMS = 1000, MessageCodec codec = MessageCodec.CodecNone) + { + return _producer.SendMessageAsync(topic, messages, acks, timeoutMS, codec); + } + + public IEnumerable Consume(string topic, CancellationToken? token = null) + { + var consumer = _consumerFactory.GetConsumer(topic); + return consumer.Consume(token); + } + + public void Dispose() + { + _producer.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/BusFactory.cs b/src/kafka-net/Configuration/BusFactory.cs new file mode 100644 index 00000000..1092fdae --- /dev/null +++ b/src/kafka-net/Configuration/BusFactory.cs @@ -0,0 +1,26 @@ +using System; + +namespace KafkaNet.Configuration +{ + public class BusFactory + { + private readonly IContainer container; + + public BusFactory(IContainer container) + { + this.container = container; + } + + public BusFactory() : this(new DefaultContainer()) + { + } + + public IBus Create(IKafkaOptions settings, Action configure) + { + configure(container); + container.Register(_ => settings); + DefaultServiceRegistrator.Register(container); + return container.Resolve(); + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/DefaultContainer.cs b/src/kafka-net/Configuration/DefaultContainer.cs new file mode 100644 index 00000000..1911e2b4 --- /dev/null +++ b/src/kafka-net/Configuration/DefaultContainer.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; + +namespace KafkaNet.Configuration +{ + public class DefaultContainer : IContainer + { + private readonly Dictionary _factories = new Dictionary(); + private readonly Dictionary _instances = new Dictionary(); + + public T Resolve() where T: class + { + object instance; + if (_instances.TryGetValue(typeof (T), out instance)) + return (T) instance; + object factory; + if (_factories.TryGetValue(typeof (T), out factory)) + { + var newInstance = ((Func) factory)(this); + _instances.Add(typeof(T), newInstance); + return newInstance; + } + throw new ServiceNotFound(string.Format("No service of type {0} has been registered", typeof(T).Name)); + } + + public IServiceRegistrator Register(Func factory) where T : class + { + if(_factories.ContainsKey(typeof(T))) + return this; + _factories.Add(typeof(T), factory); + return this; + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/DefaultServiceRegistrator.cs b/src/kafka-net/Configuration/DefaultServiceRegistrator.cs new file mode 100644 index 00000000..a8084004 --- /dev/null +++ b/src/kafka-net/Configuration/DefaultServiceRegistrator.cs @@ -0,0 +1,16 @@ +namespace KafkaNet.Configuration +{ + public static class DefaultServiceRegistrator + { + public static void Register(IServiceRegistrator registrator) + { + registrator.Register(_ => new DefaultTraceLog()) + .Register(_ => new DefaultPartitionSelector()) + .Register(x => new DefaultKafkaConnectionFactory(x.Resolve())) + .Register(x => new BrokerRouter(x.Resolve(), x.Resolve(), x.Resolve(), x.Resolve())) + .Register(x => new Producer(x.Resolve(), x.Resolve())) + .Register(x => new ConsumerFactory(x.Resolve(), x.Resolve())) + .Register(x => new Bus(x.Resolve(), x.Resolve())); + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/IContainer.cs b/src/kafka-net/Configuration/IContainer.cs new file mode 100644 index 00000000..0e8c54f5 --- /dev/null +++ b/src/kafka-net/Configuration/IContainer.cs @@ -0,0 +1,6 @@ +namespace KafkaNet.Configuration +{ + public interface IContainer : IServiceProvider, IServiceRegistrator + { + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/IServiceProvider.cs b/src/kafka-net/Configuration/IServiceProvider.cs new file mode 100644 index 00000000..c43e1a1e --- /dev/null +++ b/src/kafka-net/Configuration/IServiceProvider.cs @@ -0,0 +1,7 @@ +namespace KafkaNet.Configuration +{ + public interface IServiceProvider + { + T Resolve() where T : class; + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/IServiceRegistrator.cs b/src/kafka-net/Configuration/IServiceRegistrator.cs new file mode 100644 index 00000000..cea39368 --- /dev/null +++ b/src/kafka-net/Configuration/IServiceRegistrator.cs @@ -0,0 +1,9 @@ +using System; + +namespace KafkaNet.Configuration +{ + public interface IServiceRegistrator + { + IServiceRegistrator Register(Func factory) where T : class; + } +} \ No newline at end of file diff --git a/src/kafka-net/Configuration/ServiceNotFound.cs b/src/kafka-net/Configuration/ServiceNotFound.cs new file mode 100644 index 00000000..e9e8774e --- /dev/null +++ b/src/kafka-net/Configuration/ServiceNotFound.cs @@ -0,0 +1,16 @@ +using System; + +namespace KafkaNet.Configuration +{ + public class ServiceNotFound : Exception + { + public ServiceNotFound(string message, Exception exception) : base(message, exception) + { + } + + public ServiceNotFound(string message) + : base(message) + { + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 19b69c46..aed38a76 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -16,8 +16,10 @@ namespace KafkaNet /// TODO: provide automatic offset saving when the feature is available in 0.8.2 /// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI /// - public class Consumer : IMetadataQueries, IDisposable + public class Consumer : IMetadataQueries, IConsumer { + private readonly IBrokerRouter _brokerRouter; + private readonly IKafkaLog _log; private readonly ConsumerOptions _options; private readonly BlockingCollection _fetchResponseQueue; private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); @@ -29,11 +31,13 @@ public class Consumer : IMetadataQueries, IDisposable private int _ensureOneThread; private Topic _topic; - public Consumer(ConsumerOptions options, params OffsetPosition[] positions) + public Consumer(IBrokerRouter brokerRouter, IKafkaLog log, ConsumerOptions options, params OffsetPosition[] positions) { + _brokerRouter = brokerRouter; + _log = log; _options = options; _fetchResponseQueue = new BlockingCollection(_options.ConsumerBufferSize); - _metadataQueries = new MetadataQueries(_options.Router); + _metadataQueries = new MetadataQueries(brokerRouter); //this timer will periodically look for new partitions and automatically add them to the consuming queue //using the same whitelist logic @@ -56,7 +60,7 @@ public Consumer(ConsumerOptions options, params OffsetPosition[] positions) /// Blocking enumberable of messages from Kafka. public IEnumerable Consume(CancellationToken? cancellationToken = null) { - _options.Log.DebugFormat("Consumer: Beginning consumption of topic: {0}", _options.Topic); + _log.DebugFormat("Consumer: Beginning consumption of topic: {0}", _options.Topic); _topicPartitionQueryTimer.Begin(); return _fetchResponseQueue.GetConsumingEnumerable(cancellationToken ?? new CancellationToken(false)); @@ -91,8 +95,8 @@ private void RefreshTopicPartitions() { if (Interlocked.Increment(ref _ensureOneThread) == 1) { - _options.Log.DebugFormat("Consumer: Refreshing partitions for topic: {0}", _options.Topic); - var topic = _options.Router.GetTopicMetadata(_options.Topic); + _log.DebugFormat("Consumer: Refreshing partitions for topic: {0}", _options.Topic); + var topic = _brokerRouter.GetTopicMetadata(_options.Topic); if (topic.Count <= 0) throw new ApplicationException(string.Format("Unable to get metadata for topic:{0}.", _options.Topic)); _topic = topic.First(); @@ -111,7 +115,7 @@ private void RefreshTopicPartitions() } catch (Exception ex) { - _options.Log.ErrorFormat("Exception occured trying to setup consumer for topic:{0}. Exception={1}", _options.Topic, ex); + _log.ErrorFormat("Exception occured trying to setup consumer for topic:{0}. Exception={1}", _options.Topic, ex); } finally { @@ -125,7 +129,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) { try { - _options.Log.DebugFormat("Consumer: Creating polling task for topic: {0} on parition: {1}", topic, partitionId); + _log.DebugFormat("Consumer: Creating polling task for topic: {0} on parition: {1}", topic, partitionId); while (_disposeToken.IsCancellationRequested == false) { try @@ -151,7 +155,7 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) }; //make request and post to queue - var route = _options.Router.SelectBrokerRoute(topic, partitionId); + var route = _brokerRouter.SelectBrokerRoute(topic, partitionId); var responses = route.Connection.SendAsync(fetchRequest).Result; if (responses.Count > 0) @@ -179,13 +183,13 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) } catch (Exception ex) { - _options.Log.ErrorFormat("Exception occured while polling topic:{0} partition:{1}. Polling will continue. Exception={2}", topic, partitionId, ex); + _log.ErrorFormat("Exception occured while polling topic:{0} partition:{1}. Polling will continue. Exception={2}", topic, partitionId, ex); } } } finally { - _options.Log.DebugFormat("Consumer: Disabling polling task for topic: {0} on parition: {1}", topic, partitionId); + _log.DebugFormat("Consumer: Disabling polling task for topic: {0} on parition: {1}", topic, partitionId); Task tempTask; _partitionPollingIndex.TryRemove(partitionId, out tempTask); } @@ -204,7 +208,7 @@ public Task> GetTopicOffsetAsync(string topic, int maxOffse public void Dispose() { - _options.Log.DebugFormat("Consumer: Disposing..."); + _log.DebugFormat("Consumer: Disposing..."); _disposeToken.Cancel(); using (_topicPartitionQueryTimer) using (_metadataQueries) diff --git a/src/kafka-net/ConsumerFactory.cs b/src/kafka-net/ConsumerFactory.cs new file mode 100644 index 00000000..b0abb009 --- /dev/null +++ b/src/kafka-net/ConsumerFactory.cs @@ -0,0 +1,21 @@ +using KafkaNet.Model; + +namespace KafkaNet +{ + public class ConsumerFactory : IConsumerFactory + { + private readonly IBrokerRouter _brokerRouter; + private readonly IKafkaLog _log; + + public ConsumerFactory(IBrokerRouter brokerRouter, IKafkaLog log) + { + _brokerRouter = brokerRouter; + _log = log; + } + + public IConsumer GetConsumer(string topic) + { + return new Consumer(_brokerRouter, _log, new ConsumerOptions(topic)); + } + } +} \ No newline at end of file diff --git a/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs b/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs index c55bd162..9dd894df 100644 --- a/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs +++ b/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs @@ -1,16 +1,19 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace KafkaNet { public class DefaultKafkaConnectionFactory : IKafkaConnectionFactory { - public IKafkaConnection Create(Uri kafkaAddress, int responseTimeoutMs, IKafkaLog log) + private readonly IKafkaLog kafkaLog; + + public DefaultKafkaConnectionFactory(IKafkaLog kafkaLog) { - return new KafkaConnection(new KafkaTcpSocket(log, kafkaAddress), responseTimeoutMs, log); + this.kafkaLog = kafkaLog; + } + + public IKafkaConnection Create(Uri kafkaAddress, TimeSpan timeout) + { + return new KafkaConnection(new KafkaTcpSocket(kafkaLog, kafkaAddress), (int)timeout.TotalMilliseconds, kafkaLog); } } } diff --git a/src/kafka-net/Interfaces/IBrokerRouter.cs b/src/kafka-net/Interfaces/IBrokerRouter.cs index 6225232d..3908963a 100644 --- a/src/kafka-net/Interfaces/IBrokerRouter.cs +++ b/src/kafka-net/Interfaces/IBrokerRouter.cs @@ -1,7 +1,5 @@ using System; using System.Collections.Generic; -using System.Threading.Tasks; -using KafkaNet.Model; using KafkaNet.Protocol; namespace KafkaNet diff --git a/src/kafka-net/Interfaces/IBus.cs b/src/kafka-net/Interfaces/IBus.cs new file mode 100644 index 00000000..23b757ff --- /dev/null +++ b/src/kafka-net/Interfaces/IBus.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using KafkaNet.Protocol; + +namespace KafkaNet +{ + public interface IBus : IDisposable + { + Task> SendMessageAsync(string topic, IEnumerable messages, Int16 acks = 1, int timeoutMS = 1000, MessageCodec codec = MessageCodec.CodecNone); + IEnumerable Consume(string topic, CancellationToken? token = null); + } +} \ No newline at end of file diff --git a/src/kafka-net/Interfaces/IConsumer.cs b/src/kafka-net/Interfaces/IConsumer.cs new file mode 100644 index 00000000..a7c6de22 --- /dev/null +++ b/src/kafka-net/Interfaces/IConsumer.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using KafkaNet.Protocol; + +namespace KafkaNet +{ + public interface IConsumer : IDisposable + { + IEnumerable Consume(CancellationToken? cancellationToken = null); + } +} \ No newline at end of file diff --git a/src/kafka-net/Interfaces/IConsumerFactory.cs b/src/kafka-net/Interfaces/IConsumerFactory.cs new file mode 100644 index 00000000..0c64759a --- /dev/null +++ b/src/kafka-net/Interfaces/IConsumerFactory.cs @@ -0,0 +1,7 @@ +namespace KafkaNet +{ + public interface IConsumerFactory + { + IConsumer GetConsumer(string topic); + } +} \ No newline at end of file diff --git a/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs b/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs index 47ab8a61..5fd04e5f 100644 --- a/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs +++ b/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs @@ -1,13 +1,9 @@ -using KafkaNet.Protocol; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System; namespace KafkaNet { public interface IKafkaConnectionFactory { - IKafkaConnection Create(Uri kafkaAddress, int responseTimeoutMs, IKafkaLog log); + IKafkaConnection Create(Uri kafkaAddress, TimeSpan timeout); } } diff --git a/src/kafka-net/Interfaces/IKafkaOptions.cs b/src/kafka-net/Interfaces/IKafkaOptions.cs new file mode 100644 index 00000000..665ec37a --- /dev/null +++ b/src/kafka-net/Interfaces/IKafkaOptions.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; + +namespace KafkaNet +{ + public interface IKafkaOptions + { + IEnumerable Hosts { get; } + TimeSpan Timeout { get; } + int QueueSize { get; } + } +} \ No newline at end of file diff --git a/src/kafka-net/Interfaces/IPartitionSelector.cs b/src/kafka-net/Interfaces/IPartitionSelector.cs index bb877a95..5582d6c9 100644 --- a/src/kafka-net/Interfaces/IPartitionSelector.cs +++ b/src/kafka-net/Interfaces/IPartitionSelector.cs @@ -1,6 +1,4 @@ -using System.Collections.Generic; -using KafkaNet.Model; -using KafkaNet.Protocol; +using KafkaNet.Protocol; namespace KafkaNet { diff --git a/src/kafka-net/Interfaces/IProducer.cs b/src/kafka-net/Interfaces/IProducer.cs new file mode 100644 index 00000000..48ae64b8 --- /dev/null +++ b/src/kafka-net/Interfaces/IProducer.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using KafkaNet.Protocol; + +namespace KafkaNet +{ + public interface IProducer : IDisposable + { + Task> SendMessageAsync(string topic, IEnumerable messages, Int16 acks = 1, int timeoutMS = 1000, MessageCodec codec = MessageCodec.CodecNone); + } +} \ No newline at end of file diff --git a/src/kafka-net/KafkaConnection.cs b/src/kafka-net/KafkaConnection.cs index 76cad887..c3aae670 100644 --- a/src/kafka-net/KafkaConnection.cs +++ b/src/kafka-net/KafkaConnection.cs @@ -20,8 +20,6 @@ namespace KafkaNet /// public class KafkaConnection : IKafkaConnection { - private const int DefaultResponseTimeoutMs = 30000; - private readonly ConcurrentDictionary _requestIndex = new ConcurrentDictionary(); private readonly IScheduledTimer _responseTimeoutTimer; private readonly int _responseTimeoutMS; @@ -40,10 +38,10 @@ public class KafkaConnection : IKafkaConnection /// Logging interface used to record any log messages created by the connection. /// The kafka socket initialized to the kafka server. /// The amount of time to wait for a message response to be received after sending message to Kafka. - public KafkaConnection(IKafkaTcpSocket client, int responseTimeoutMs = DefaultResponseTimeoutMs, IKafkaLog log = null) + public KafkaConnection(IKafkaTcpSocket client, int responseTimeoutMs, IKafkaLog log) { _client = client; - _log = log ?? new DefaultTraceLog(); + _log = log; _responseTimeoutMS = responseTimeoutMs; _responseTimeoutTimer = new ScheduledTimer() .Do(ResponseTimeoutCheck) diff --git a/src/kafka-net/Model/ConsumerOptions.cs b/src/kafka-net/Model/ConsumerOptions.cs index d097822f..7b415878 100644 --- a/src/kafka-net/Model/ConsumerOptions.cs +++ b/src/kafka-net/Model/ConsumerOptions.cs @@ -17,14 +17,6 @@ public class ConsumerOptions /// public List PartitionWhitelist { get; set; } /// - /// Log object to record operational messages. - /// - public IKafkaLog Log { get; set; } - /// - /// The broker router used to provide connection to each partition server. - /// - public IBrokerRouter Router { get; set; } - /// /// The time in milliseconds between queries to look for any new partitions being created. /// public int TopicPartitionQueryTimeMs { get; set; } @@ -37,12 +29,10 @@ public class ConsumerOptions /// public int BackoffInterval { get; set; } - public ConsumerOptions(string topic, IBrokerRouter router) + public ConsumerOptions(string topic) { Topic = topic; - Router = router; PartitionWhitelist = new List(); - Log = new DefaultTraceLog(); TopicPartitionQueryTimeMs = (int)TimeSpan.FromMinutes(15).TotalMilliseconds; ConsumerBufferSize = DefaultMaxConsumerBufferSize; BackoffInterval = DefaultBackoffIntervalMS; diff --git a/src/kafka-net/Model/KafkaOptions.cs b/src/kafka-net/Model/KafkaOptions.cs index 7b5b178f..3201f09b 100644 --- a/src/kafka-net/Model/KafkaOptions.cs +++ b/src/kafka-net/Model/KafkaOptions.cs @@ -1,41 +1,19 @@ using System; using System.Collections.Generic; -using System.Linq; +using KafkaNet.Configuration; namespace KafkaNet.Model { - public class KafkaOptions + public class KafkaOptions : IKafkaOptions { - private const int DefaultResponseTimeout = 30000; - - /// - /// List of Uri connections to kafka servers. The are used to query for metadata from Kafka. More than one is recommended. - /// - public List KafkaServerUri { get; set; } - /// - /// Provides a factory for creating new kafka connections. - /// - public IKafkaConnectionFactory KafkaConnectionFactory { get; set; } - /// - /// Selector function for routing messages to partitions. Default is key/hash and round robin. - /// - public IPartitionSelector PartitionSelector { get; set; } - /// - /// Timeout length in milliseconds waiting for a response from kafka. - /// - public int ResponseTimeoutMs { get; set; } - /// - /// Log object to record operational messages. - /// - public IKafkaLog Log { get; set; } - - public KafkaOptions(params Uri[] kafkaServerUri) + public KafkaOptions() { - KafkaServerUri = kafkaServerUri.ToList(); - KafkaConnectionFactory = new DefaultKafkaConnectionFactory(); - PartitionSelector = new DefaultPartitionSelector(); - Log = new DefaultTraceLog(); - ResponseTimeoutMs = DefaultResponseTimeout; + Timeout = TimeSpan.FromSeconds(30); + QueueSize = int.MaxValue; } + + public IEnumerable Hosts { get; set; } + public TimeSpan Timeout { get; set; } + public int QueueSize { get; set; } } } diff --git a/src/kafka-net/Producer.cs b/src/kafka-net/Producer.cs index 3e0fabff..d3f15990 100644 --- a/src/kafka-net/Producer.cs +++ b/src/kafka-net/Producer.cs @@ -1,9 +1,8 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using KafkaNet.Model; +using KafkaNet.Configuration; using KafkaNet.Protocol; using System.Threading; @@ -13,7 +12,7 @@ namespace KafkaNet /// /// Provides a simplified high level API for producing messages on a topic. /// - public class Producer : IMetadataQueries, IDisposable + public class Producer : IMetadataQueries, IProducer { private readonly IBrokerRouter _router; private readonly SemaphoreSlim _sendSemaphore; @@ -30,6 +29,7 @@ public class Producer : IMetadataQueries, IDisposable /// /// The router used to direct produced messages to the correct partition. /// The maximum async calls allowed before blocking new requests. -1 indicates unlimited. + /// /// /// The maximumAsyncQueue parameter provides a mechanism for blocking an async request return if the amount of requests queue is /// over a certain limit. This is usefull if a client is trying to push a large stream of documents through the producer and @@ -39,11 +39,11 @@ public class Producer : IMetadataQueries, IDisposable /// messages sitting in the async queue then a message may spend its entire timeout cycle waiting in this queue and never getting /// attempted to send to Kafka before a timeout exception is thrown. /// - public Producer(IBrokerRouter brokerRouter, int maximumAsyncQueue = -1) + public Producer(IBrokerRouter brokerRouter, IKafkaOptions options) { _router = brokerRouter; _metadataQueries = new MetadataQueries(_router); - _maximumAsyncQueue = maximumAsyncQueue == -1 ? int.MaxValue : maximumAsyncQueue; + _maximumAsyncQueue = options.QueueSize; _sendSemaphore = new SemaphoreSlim(_maximumAsyncQueue, _maximumAsyncQueue); } diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj index a0a5dc6a..6571b99e 100644 --- a/src/kafka-net/kafka-net.csproj +++ b/src/kafka-net/kafka-net.csproj @@ -43,9 +43,23 @@ + + + + + + + + + + + + + + @@ -85,9 +99,7 @@ - - - +