Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak with CancellationToken when error occurs. Version 2.5.0 #2268

Open
1 of 8 tasks
MRzeczkowski opened this issue Jul 24, 2024 · 1 comment
Open
1 of 8 tasks

Comments

@MRzeczkowski
Copy link

MRzeczkowski commented Jul 24, 2024

Description

This is a similar issue to #1287

When ProduceAsync is called with a CancellationToken a callback is registered and disposed but only when no error occurs.

How to reproduce

Call ProduceAsync with CancellationToken and ensure sending the message fails. My setup was sending >1MB messages that would throw ProduceException with error code ErrorCode.MsgSizeTooLarge.

The callback is registered here:
https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Producer.cs#L803

Disposed by calling HandleDeliveryReport here:
https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Producer.cs#L261

Dispose is called here:
https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Producer.cs#L962

When I was debugging and the error happened I entered this step of freeing the gch:
https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Producer.cs#L321

The callback registration lives on in the CancellationToken that I've passed to ProduceAsync and more are added each time an error occurs.

Here is a snippet from one of my snapshots:
image

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • Confluent.Kafka nuget version.
  • Apache Kafka version.
  • Client configuration.
  • Operating system.
  • Provide logs (with "debug" : "..." as necessary in configuration).
  • Provide broker log excerpts.
  • Critical issue.
@DS-AdamMilazzo
Copy link

DS-AdamMilazzo commented Jul 25, 2024

I came to report the same problem. With the latest library version (2.5), as well as previous versions going back to 2.2 at least, publishing a too-large message with ProduceAsync leaks memory. In particular, the callback registered with the cancellation token - assuming you pass in a cancelable token - is never removed. This implies that TypedTaskDeliveryHandlerShim.HandleDeliveryReport is never called in that case.

Looking at a memory dump showed many large byte arrays allocated and live. They had an average size of about 1.1 MB, about the size of our slightly-too-large messages. I looked at 20 or so to see where they were rooted. All were referenced like so:

...
-> 020102004420     System.Threading.CancellationTokenSource+CallbackNode
-> 0201014b9f80     System.Threading.CancellationTokenSource+CallbackNode
-> 0200fd84ac70     System.Action
-> 0200fd84abd0     Confluent.Kafka.Producer<System.String, System.Byte[]>+<>c__DisplayClass57_0
-> 0200fd84abe8     Confluent.Kafka.Producer<System.String, System.Byte[]>+TypedTaskDeliveryHandlerShim
-> 02010becfe80     System.Byte[]

The ellipsis (...) hides the full chain of cancellation token registrations. Most of the arrays showed well over 100 copies of the "System.Threading.CancellationTokenSource+CallbackNode" line. One had 157 copies of that line. Because the registrations form a linked list, that implies that there were at least 157 active registrations on our cancellation token, and if I examined more arrays I'd surely find longer chains. I had nowhere near 157+ messages actively being sent - more like 5 - so this implies to me that in some cases the cancellation token registrations from the kafka library don't get disposed.

I reran the same test but without any too-large messages, limiting the message size to about 900KB. There were no apparent memory leaks, and a memory dump during a moment of quiescence showed zero large byte arrays connected to kafka, so the success case seems to work properly. (If anything is leaked in the success case, it's not anything big like the serialized message.)

It's quite possible that any publishing failure will result in a memory leak.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants