Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new IConsumer.Consume overload taking target ConsumeResult/Message as param for low-alloc flows. #2369

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions src/Confluent.Kafka/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,21 @@ internal Consumer(ConsumerBuilder<TKey, TValue> builder)
/// <summary>
/// Refer to <see cref="Confluent.Kafka.IConsumer{TKey, TValue}.Consume(int)" />
/// </summary>
public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
{
public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
{
ConsumeResult<TKey, TValue> result = new();
if (!Consume(millisecondsTimeout, result))
return null;
return result;
}


/// <inheritdoc/>
public bool Consume(int millisecondsTimeout, ConsumeResult<TKey, TValue> result)
{
if (result == null)
throw new ArgumentNullException(nameof(result));

var msgPtr = kafkaHandle.ConsumerPoll((IntPtr)millisecondsTimeout);

if (this.handlerException != null)
Expand All @@ -781,7 +794,7 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)

if (msgPtr == IntPtr.Zero)
{
return null;
return false;
}

try
Expand All @@ -806,14 +819,12 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)

if (msg.err == ErrorCode.Local_PartitionEOF)
{
return new ConsumeResult<TKey, TValue>
{
TopicPartitionOffset = new TopicPartitionOffset(topic,
msg.partition, msg.offset,
msgLeaderEpoch),
Message = null,
IsPartitionEOF = true
};
result.IsPartitionEOF = true;
result.Topic = topic;
result.Partition = msg.partition;
result.Offset = msg.offset;
result.LeaderEpoch = msgLeaderEpoch;
return true;
}

long timestampUnix = 0;
Expand All @@ -827,7 +838,9 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
Headers headers = null;
if (enableHeaderMarshaling)
{
headers = new Headers();
headers = result.Message?.Headers ?? new Headers();
headers.Clear();

Librdkafka.message_headers(msgPtr, out IntPtr hdrsPtr);
if (hdrsPtr != IntPtr.Zero)
{
Expand Down Expand Up @@ -938,20 +951,18 @@ public ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout)
ex);
}

return new ConsumeResult<TKey, TValue>
{
TopicPartitionOffset = new TopicPartitionOffset(topic,
msg.partition, msg.offset,
msgLeaderEpoch),
Message = new Message<TKey, TValue>
{
Timestamp = timestamp,
Headers = headers,
Key = key,
Value = val
},
IsPartitionEOF = false
};
result.Topic = topic;
result.Partition = msg.partition;
result.Offset = msg.offset;
result.LeaderEpoch = msgLeaderEpoch;

result.Message ??= new Message<TKey, TValue>();
result.Message.Timestamp = timestamp;
result.Message.Headers = headers;
result.Message.Key = key;
result.Message.Value = val;
result.IsPartitionEOF = false;
return true;
}
finally
{
Expand Down
16 changes: 4 additions & 12 deletions src/Confluent.Kafka/Header.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,17 @@ namespace Confluent.Kafka
/// </remarks>
public class Header : IHeader
{
private byte[] val;
private readonly byte[] val;

/// <summary>
/// The header key.
/// </summary>
public string Key { get; private set; }
public string Key { get; }

/// <summary>
/// Get the serialized header value data.
/// </summary>
public byte[] GetValueBytes()
{
return val;
}
public byte[] GetValueBytes() => val;

/// <summary>
/// Create a new Header instance.
Expand All @@ -53,12 +50,7 @@ public byte[] GetValueBytes()
/// </param>
public Header(string key, byte[] value)
{
if (key == null)
{
throw new ArgumentNullException("Kafka message header key cannot be null.");
}

Key = key;
Key = key ?? throw new ArgumentNullException(nameof(key), "Kafka message header key cannot be null.");
val = value;
}
}
Expand Down
101 changes: 35 additions & 66 deletions src/Confluent.Kafka/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;


namespace Confluent.Kafka
Expand All @@ -28,13 +29,16 @@ namespace Confluent.Kafka
/// Message headers are supported by v0.11 brokers and above.
/// </remarks>
public class Headers : IEnumerable<IHeader>
{
private readonly List<IHeader> headers = new List<IHeader>();
{
/// <summary>
/// Backing list is only created on first actual header
/// </summary>
private List<IHeader> headers = null;

/// <summary>
/// Gets the underlying list of headers
/// </summary>
public IReadOnlyList<IHeader> BackingList => headers;
public IReadOnlyList<IHeader> BackingList => (IReadOnlyList<IHeader>)headers ?? Array.Empty<IHeader>();

/// <summary>
/// Append a new header to the collection.
Expand All @@ -50,10 +54,9 @@ public class Headers : IEnumerable<IHeader>
public void Add(string key, byte[] val)
{
if (key == null)
{
throw new ArgumentNullException("Kafka message header key cannot be null.");
}
throw new ArgumentNullException(nameof(key), "Kafka message header key cannot be null.");

headers ??= new();
headers.Add(new Header(key, val));
}

Expand All @@ -64,8 +67,9 @@ public void Add(string key, byte[] val)
/// The header to add to the collection.
/// </param>
public void Add(Header header)
{
headers.Add(header);
{
headers ??= new();
headers.Add(header);
}

/// <summary>
Expand Down Expand Up @@ -107,16 +111,17 @@ public byte[] GetLastBytes(string key)
/// </returns>
public bool TryGetLastBytes(string key, out byte[] lastHeader)
{
for (int i=headers.Count-1; i>=0; --i)
{
if (headers[i].Key == key)
{
lastHeader = headers[i].GetValueBytes();
return true;
}
}

lastHeader = default(byte[]);
if (headers != null)
for (int i = headers.Count - 1; i >= 0; --i)
{
if (headers[i].Key == key)
{
lastHeader = headers[i].GetValueBytes();
return true;
}
}

lastHeader = default;
return false;
}

Expand All @@ -127,76 +132,40 @@ public bool TryGetLastBytes(string key, out byte[] lastHeader)
/// <param name="key">
/// The key to remove all headers for
/// </param>
public void Remove(string key)
=> headers.RemoveAll(a => a.Key == key);

internal class HeadersEnumerator : IEnumerator<IHeader>
{
private Headers headers;

private int location = -1;

public HeadersEnumerator(Headers headers)
{
this.headers = headers;
}

public object Current
=> ((IEnumerator<IHeader>)this).Current;

IHeader IEnumerator<IHeader>.Current
=> headers.headers[location];

public void Dispose() {}

public bool MoveNext()
{
location += 1;
if (location >= headers.headers.Count)
{
return false;
}

return true;
}

public void Reset()
{
this.location = -1;
}
}

public void Remove(string key) => headers?.RemoveAll(a => a.Key == key);

/// <summary>
/// Removes all headers from the collection.
/// </summary>
public void Clear() => headers?.Clear();

/// <summary>
/// Returns an enumerator that iterates through the headers collection.
/// </summary>
/// <returns>
/// An enumerator object that can be used to iterate through the headers collection.
/// </returns>
public IEnumerator<IHeader> GetEnumerator()
=> new HeadersEnumerator(this);

public IEnumerator<IHeader> GetEnumerator() => headers?.GetEnumerator() ?? Enumerable.Empty<IHeader>().GetEnumerator();

/// <summary>
/// Returns an enumerator that iterates through the headers collection.
/// </summary>
/// <returns>
/// An enumerator object that can be used to iterate through the headers collection.
/// </returns>
IEnumerator IEnumerable.GetEnumerator()
=> new HeadersEnumerator(this);
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

/// <summary>
/// Gets the header at the specified index
/// </summary>
/// <param key="index">
/// The zero-based index of the element to get.
/// </param>
public IHeader this[int index]
=> headers[index];
public IHeader this[int index] => headers?[index] ?? throw new ArgumentOutOfRangeException(nameof(index), "Header collection is empty.");

/// <summary>
/// The number of headers in the collection.
/// </summary>
public int Count
=> headers.Count;
public int Count => headers?.Count ?? 0;
}
}
18 changes: 15 additions & 3 deletions src/Confluent.Kafka/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ public interface IConsumer<TKey, TValue> : IClient
/// </exception>
ConsumeResult<TKey, TValue> Consume(int millisecondsTimeout);

/// <summary>
/// Poll for new messages / events. Blocks until a consume result is availble or until timeout period has elapsed.
/// </summary>
/// <remarks>
/// This overload takes the result instance as parameter to allow reuse of result and contained message instances.
/// </remarks>
/// <param name="millisecondsTimeout"></param>
/// <param name="result">Mandatory result instance to be filled with next message/EOF.</param>
/// <returns>True if <c>result</c> was filled with message or EOF, false if timeout elapsed.</returns>
bool Consume(int millisecondsTimeout, ConsumeResult<TKey, TValue> result);


/// <summary>
/// Poll for new messages / events. Blocks
/// until a consume result is available or the
Expand Down Expand Up @@ -666,9 +678,9 @@ public interface IConsumer<TKey, TValue> : IClient
/// <exception cref="Confluent.Kafka.KafkaException">
/// Thrown if the operation fails.
/// </exception>
void Close();


void Close();
/// <summary>
/// The current consumer group metadata associated with this consumer,
/// or null if a GroupId has not been specified for the consumer.
Expand Down
20 changes: 19 additions & 1 deletion test/Confluent.Kafka.UnitTests/Headers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public void TryGetLast()
[Fact]
public void TryGetLast_NotExist()
{
var hdrs = new Headers();
var hdrs = new Headers();
Assert.False(hdrs.TryGetLastBytes("my-header-2", out byte[] _));

hdrs.Add(new Header("my-header", new byte[] { 42 }));

Assert.False(hdrs.TryGetLastBytes("my-header-2", out byte[] val));
Expand All @@ -107,6 +109,8 @@ public void NullValue()
public void Remove()
{
var hdrs = new Headers();
hdrs.Remove("not-present");

hdrs.Add(new Header("my-header", new byte[] { 42 }));
hdrs.Add(new Header("my-header", new byte[] { 44 }));
hdrs.Add(new Header("my-header-2", new byte[] { 45 }));
Expand Down Expand Up @@ -151,6 +155,9 @@ public void Count()
public void Enumerator()
{
var hdrs = new Headers();

Assert.Empty(hdrs);

hdrs.Add(new Header("my-header", new byte[] { 42 }));
hdrs.Add(new Header("my-header", new byte[] { 44 }));
hdrs.Add(new Header("my-header-2", new byte[] { 45 }));
Expand All @@ -176,5 +183,16 @@ public void Enumerator()
Assert.Equal(3, cnt);
}

[Fact]
public void BackingList()
{
var hdrs = new Headers();
hdrs.Clear();
Assert.Empty(hdrs.BackingList);
hdrs.Add("A", null);
Assert.Single(hdrs.BackingList);
hdrs.Clear();
Assert.Empty(hdrs);
}
}
}