Replies: 1 comment 1 reply
-
Hey Tommy, The way we deal with partitions migrating during the lifetime of an outstanding FetchRequest is a per-partition fetch-state-version which is a running counter on each partition that gets increased by one each time there's a change to the fetcher - be it a seek, reset or leader change. When we send a FetchRequest we store this per-partition version along with the request object, and when the FetchResponse comes back we verify, for each partition in the FetchResponse, that the per-partition version stored on the request object matches the current partition version: if not we simply throw away the response MessageSets for that partition. https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_broker.c#L4548-L4575
It definitely happens in production, where all the rarities and corner cases can't wait to show their pretty faces. Hope that helps! |
Beta Was this translation helpful? Give feedback.
-
Hey there! I read in the FAQ that the high level behavior of partition fetching in librdkafka is that there's a thread per broker, and that thread will issue fetch requests for a given set of partitions based on a few criteria (having an offset to fetch from and the fetch queue being within min and max bounds).
We are currently in the process of redesigning the consumer concurrency model in KafkaJS, which now uses a fairly similar model. We don't use multithreading, but the high level behavior is similar due to the async non-blocking nature of NodeJS.
One question that came up is what happens when a partition is reassigned from one broker to another while this is going on. To give an example, imagine that we have 3 brokers, and a topic with 3 partitions.
Assuming that we had not committed offsets for p2 yet at the time we got to point 5, and that the partitions were small enough that b3 got in-sync before this, my expectation would be that we have now fetched messages from p2 in the two different threads and that the fetch queue now potentially contains data for the same offsets multiple times.
Our current solution to this is to to have some synchronization between the different fetchers to filter out batches for partitions that another fetcher is currently fetching for or for which there are still batches in the queue. While this is a rather rare case, I'm not super happy with our solution, as it causes us to still fetch data that we then just discard. So I was curious to find out how librdkafka handles this case, and see if maybe you have a better solution in place. Or maybe you know something that makes this a case that we don't need to worry about. I couldn't really find any KIP discussing this case, but it's possible I missed one.
Beta Was this translation helpful? Give feedback.
All reactions