Skip to content

Support version 0 (zookeeper) and version 1 (kafka) protocol for commit offsets. #48

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
src/coverage
src/results.xml
src/TestResult.xml


#################
## Eclipse
#################
Expand Down
97 changes: 45 additions & 52 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,88 +1,80 @@
kafka-net
simple-kafka-net
=========

Native C# client for Apache Kafka.
Native C# client for Apache Kafka derived from [jroland/kafka-net].

License
-----------
Copyright 2014, James Roland under Apache License, V2.0. See LICENSE file.
Original kafka-net Copyright 2014, James Roland
Modified version Copyright 2015, Nick Randell

Apache License, V2.0. See LICENSE file.

Summary
-----------
This project is a .NET implementation of the [Apache Kafka] protocol. The wire protocol portion is based on the [kafka-python] library writen by [David Arthur] and the general class layout attempts to follow a similar pattern as his project. To that end, this project builds up from the low level KafkaConnection object for handling async requests to/from the kafka server, all the way up to a higher level Producer/Consumer classes.
This project is a .NET implementation of the [Apache Kafka] protocol. The wire protocol portion is based on the [kafka-python] library writen by [David Arthur] and the general class layout attempts to follow a similar pattern as his project.

It is very much work in progress but is designed to be asynchronous from the ground up, providing thin wrapper over the protocol and building up a robust broker manager with simple producer and consumer classes. Even though it is asynchronous, no threads are involved unless the runtime uses them. This does mean that it is not yet possible to have multiple messages in flight at the same time, but that will change.

The protocol encoding and decoding has also been modified to work as efficiently as possible without copying data around.

One of the aims of this fork is to allow the client to have much more control over the partitions to consume as this allows larger solutions to scale by having consumers running on different servers.

Testing makes use of docker to spin up test clusters to give control over different scenarios.

The current version 0.1 is not very robust, but generally works for single brokers ok.


Examples
-----------
##### Producer
```sh
var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));
var router = new BrokerRouter(options);
var client = new Producer(router);
```using (var broker = new KafkaBroker(new Uri("http://SERVER1:9092")))
{
var producer = KafkaProducer.Create(brokers, new StringSerializer());
await producer.SendAsync(KeyedMessage.Create("Test Topic", "Test"), CancellationToken.None);
}
```

client.SendMessageAsync("TestHarness", new[] { new Message("hello world")}).Wait();

using (client) { }
```
##### Consumer
```sh
var options = new KafkaOptions(new Uri("http://SERVER1:9092"), new Uri("http://SERVER2:9092"));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("TestHarness", new BrokerRouter(options)));

//Consume returns a blocking IEnumerable (ie: never ending stream)
foreach (var message in consumer.Consume())
```using (var broker = new KafkaBroker(new Uri("http://SERVER1:9092")))
{
Console.WriteLine("Response: P{0},O{1} : {2}",
message.Meta.PartitionId, message.Meta.Offset, message.Value);
var consumer = KafkaConsumer.Create(topic, brokers, new StringSerializer(),
new TopicSelector { Topic = "Test Topic", Partition = 0, Offset = 0 });
var result = await consumer.ReceiveAsync(CancellationToken.None);
foreach (var message in result)
{
Console.WriteLine("Received {0}", message.Value);
}
}
```

##### TestHarness
The TestHarness project it a simple example console application that will read message from a kafka server and write them to the screen. It will also take anything typed in the console and send this as a message to the kafka servers.
The topic selector is used to determine which topics and offsets to use.

Simply modify the kafka server Uri in the code to point to a functioning test server.
Things left to do
------------

- [ ] Make use of correlation id to allow multiple messages in flight at the same time
- [ ] Compression

Pieces of the Puzzle
-----------
##### Protocol
The protocol has been divided up into concrete classes for each request/response pair. Each class knows how to encode and decode itself into/from their appropriate Kafka protocol byte array. One benefit of this is that it allows for a nice generic send method on the KafkaConnection.

##### KafkaConnection
Provides async methods on a persistent connection to a kafka broker (server). The send method uses the TcpClient send async function and the read stream has a dedicated thread which uses the correlation Id to match send responses to the correct request.
This has been kept almost identical to the original version, but the encoding/decoding is now done with a preallocated buffer.

##### BrokerRouter
Provides metadata based routing of messages to the correct Kafka partition. This class also manages the multiple KafkaConnections for each Kafka server returned by the broker section in the metadata response. Routing logic is provided by the IPartitionSelector.
##### KafkaConnection
Provides async methods on a persistent connection to a kafka broker (server). Sending a message and receiving a response is carried out within a lock as only one message is in flight at any time. This will change!!

##### IPartitionSelector
Provides the logic for routing which partition the BrokerRouter should choose. The default selector is the DefaultPartitionSelector which will use round robin partition selection if the key property on the message is null and a mod/hash of the key value if present.
##### KafkaBrokers
Provides management of a group of brokers, maintaining a connection to each of the valid brokers

##### Producer
Provides a higher level class which uses the combination of the BrokerRouter and KafkaConnection to send batches of messages to a Kafka broker.
Provides a higher level class which uses the combination of the KafkaBrokers and KafkaConnection to send messages. There is no queuing or batching of messages internally. That would be the work of a higher level producer.

##### Consumer
Provides a higher level class which will consumer messages from a whitelist of partitions from a single topic. The consumption mechanism is a blocking IEnumerable of messages. If no whitelist is provided then all partitions will be consumed creating one KafkaConnection for each partition leader.



Status
-----------
[![Build status](https://ci.appveyor.com/api/projects/status/3tg02biqn5q8uijy)](https://ci.appveyor.com/project/Jroland/kafka-net)


The current version of this project is a functioning "work in progress" as it was only started in early February. The wire protocol is complete except for Offset Commits to the servers (as there is a bug in 0.8.0 which prevents testing of this feature). The library is functioning in that there is a complete Producer and Consumer class thus messages can pass to and from a Kafka server.

##### The major items that needs work are:
* Better handling of options for providing customization of internal behaviour of the base API. (right now the classes pass around option parameters)
* General structure of the classes is not finalized and breaking changes will occur.
* Only Gzip compression is implemented, snappy on the todo.
* Currently only works with .NET Framework 4.5 as it uses the await command.
* Test coverage.
* Documentation.

Comment
----------
This is a pet project for me and is not currently backed by a need for a Kafka server client. Which means the client is only currently being tested against a small set of Kafka test servers and not against any server that has any real data load.
Provides the ability to receive messages from brokers.



Expand All @@ -92,3 +84,4 @@ This is a pet project for me and is not currently backed by a need for a Kafka s
[kafka-python]:https://github.com/mumrah/kafka-python
[Apache Kafka]:http://kafka.apache.org
[David Arthur]:https://github.com/mumrah
[jroland/kafka-net]:https://github.com/jroland/kafka-net
74 changes: 5 additions & 69 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,75 +1,11 @@
kafka-net Release Notes
simple-kafka-net Release Notes
=========

Version 0.9.0.14
Version 0.1
-------
Fix memory leak in NagleBlockingCollection.
Timeout does not reset when new data is added.
Fix thread contention when producer has many threads loading data into it's buffer.
Fix many deadlock senarios on cancelling and disposing.
More unit tests around threading.


Version 0.9.0.1
-------

#### Feature: Nagle Producer
The producer class has been significantly updated to use a message batching technique similar to the [nagle algorithm].


The producer accepts messages and groups them together into a single batched transmission. The total number of messages to batch before sending and the maximum amount of time to wait for the max batch size, is configurable. Tunning these two parameters, along with gzip compression can increase the driver throughput by orders of magnitude.

```sh
var producer = new Producer(new BrokerRouter(options))
{
BatchSize = 100,
BatchDelayTime = TimeSpan.FromMilliseconds(100)
};

// BatchSize - The producer will wait until it receives 100 messages, group them together into one request and send.
// BatchDelayTime - If the producer has not received 100 messages within 100 milliseconds, the producer will send what it has received.

```


#### Feature: Memory management
The producer now has better options for managing how much memory it consumes when it starts to get backed up.

There are now two parameters on the producer constructor:
MaximumAsyncRequests
MaximumMessageBuffer

These two parameters prevents the producer from going over a maximum of resources used in terms of network and memory.

##### MaximumMessageBuffer
This parameter represents the maximum number of messages the producer will hold in its buffer at any one time. This includes all in flight messages and those buffered by the batching mechanism. This maximum will be hit if more messages arrive to the producer than it can send to Kafka. When the maximum is reached, the producer will block on any new messages until space is available.

##### MaximumAsyncRequests
This parameter represents the maximum number of queued up async TCP commands allowed in flight at any one time. This can occur when the batch size is too low and the producer creates a high number of transmission requests out to the Kafka brokers. Having thousands of queued up async messages can adversly affect memory and increase timeout errors.

```sh
var producer = new Producer(new BrokerRouter(options), maximumAsyncRequests: 30, maximumMessageBuffer:1000);

//maximum outbound async requests will be limited to 30
//maximum amount of messages in the producer at any one time will be limited to 1000
```

#### Issues/Features Summary
* Fix some integration tests to run on any Kafka configuration. More need conversion.
* Redesign of TcpKafkaSockets
* Performance improvements
* Remove several deadlock senarios
* Remove several race conditions
* Nagle producer
* Memory management
* Significant performance improvement
* Add MaximumReconnectionTimeout
* Put a maximum amount of time to wait when backing off
* Update documentation in code
* Update/extend unit tests


Initial version taken from the original jroland/kafka-net
Very basic functionality, but appears to be good enough to work with a simple broker.



[nagle algorithm]:http://en.wikipedia.org/wiki/Nagle%27s_algorithm
[jroland/kafka-net]:https://github.com/jroland/kafka-net
23 changes: 23 additions & 0 deletions build.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
@echo Off
set config=%1
if "%config%" == "" (
set config=Release
)

set version=0.1.0
if not "%PackageVersion%" == "" (
set version=%PackageVersion%
)

set nuget=
if "%nuget%" == "" (
set nuget=nuget
)

%WINDIR%\Microsoft.NET\Framework\v4.0.30319\msbuild src\simple-kafka-net.sln /p:Configuration="%config%" /m /v:M /fl /flp:LogFile=msbuild.log;Verbosity=diag /nr:false

mkdir Build
mkdir Build\lib
mkdir Build\lib\net45

%nuget% pack "src\simple-kafka-net.nuspec" -NoPackageAnalysis -verbosity detailed -o Build -Version %version% -p Configuration="%config%"
6 changes: 6 additions & 0 deletions src/.nuget/packages.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="NUnit.Console" version="3.0.0-beta-1" />
<package id="OpenCover" version="4.5.3723" />
<package id="ReportGenerator" version="2.1.4.0" />
</packages>
43 changes: 43 additions & 0 deletions src/SimpleKafka/BackoffHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using Serilog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace SimpleKafka
{
internal class BackoffHandler
{
private static readonly Random generator = new Random();

private readonly int maxRetries;
private readonly int basePeriodMs;
private readonly int jitterMs;
private int backoffCount;

public BackoffHandler(int maxRetries = 10, int basePeriodMs = 300, int jitterMs = 50)
{
this.maxRetries = maxRetries;
this.basePeriodMs = basePeriodMs;
this.jitterMs = jitterMs;
}

public async Task<bool> BackoffIfAllowedAsync(CancellationToken token)
{
if (++backoffCount >= maxRetries)
{
return false;
}
else
{
Log.Verbose("Backoff {attempt} out of {maxRetries}", backoffCount, maxRetries);
var delay = generator.Next(basePeriodMs - jitterMs, basePeriodMs + jitterMs);
await Task.Delay(delay, token).ConfigureAwait(false);
return true;
}

}
}
}
59 changes: 59 additions & 0 deletions src/SimpleKafka/Common/Crc32Provider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Damien Guard. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
// You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
// Originally published at http://damieng.com/blog/2006/08/08/calculating_crc32_in_c_and_net

using System;

namespace SimpleKafka.Common
{
/// <summary>
/// This code was originally from the copyrighted code listed above but was modified significantly
/// as the original code was not thread safe and did not match was was required of this driver. This
/// class now provides a static lib which will do the simple CRC calculation required by Kafka servers.
/// </summary>
public static class Crc32Provider
{
public const UInt32 DefaultPolynomial = 0xedb88320u;
public const UInt32 DefaultSeed = 0xffffffffu;
private static readonly UInt32[] PolynomialTable;

static Crc32Provider()
{
PolynomialTable = InitializeTable(DefaultPolynomial);
}

public static UInt32 Compute(byte[] buffer, int offset, int length)
{
return ~CalculateHash(buffer, offset, length);
}

private static UInt32[] InitializeTable(UInt32 polynomial)
{
var createTable = new UInt32[256];
for (var i = 0; i < 256; i++)
{
var entry = (UInt32)i;
for (var j = 0; j < 8; j++)
if ((entry & 1) == 1)
entry = (entry >> 1) ^ polynomial;
else
entry = entry >> 1;
createTable[i] = entry;
}

return createTable;
}

private static UInt32 CalculateHash(byte[] buffer, int offset, int length)
{
var crc = DefaultSeed;
while (length-- > 0)
{
crc = (crc >> 8) ^ PolynomialTable[buffer[offset++] ^ crc & 0xff];
}
return crc;
}

}
}
Loading