-
Notifications
You must be signed in to change notification settings - Fork 593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
datalake
: remove offset translation from translation_stm
(and add compaction_test.py
)
#24610
base: dev
Are you sure you want to change the base?
datalake
: remove offset translation from translation_stm
(and add compaction_test.py
)
#24610
Conversation
5db3f18
to
6c113d7
Compare
@@ -24,22 +24,24 @@ | |||
|
|||
class DatalakeVerifier(): | |||
""" | |||
Verifier that does the verification of the data in the redpanda Iceberg table. | |||
The verifier consumes offsets from specified topic and verifies it the data | |||
Verifier that does the verification of the data in the redpanda Iceberg table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trailing whitespace removal
Retry command for Build#59935please wait until all jobs are finished before running the slash command
|
CI test resultstest results on build#59935
test results on build#60023
test results on build#60079
|
Lot of Not sure if this is another The only related change I can see in EDIT: Probably just because of the |
6c113d7
to
0e1a24c
Compare
Force push to:
|
0e1a24c
to
2fe6c55
Compare
Retry command for Build#60016please wait until all jobs are finished before running the slash command
|
2fe6c55
to
b01095c
Compare
Force push to:
|
b.add_batch(std::move(batch)).get(); | ||
}; | ||
|
||
make_and_add_record(model::record_batch_type::raft_data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it would make sense to add a test case where we start from configuration batch as this is what it looks like in real life, configuration is always the first batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the callout here, I have added two new test cases which begin with configuration batches to translated_log_offset_test
.
And most importantly, add the function `get_translated_log_offset()`. This function will be used to compute the appropriate highest translated log offset for a given translated kafka offset while taking into account translator batches. This is an important function for removing offset translation from within the `translation_stm`, and to be less pessimistic about the `max_collectible_offset` returned by the `stm` in the future.
Make use of the added utility function `get_translated_log_offset()` throughout `partition_translator.cc` and `state_machine.cc` in order to update the `translation_stm::_highest_translated_log_offset` as translation occurs. `translation_stm::max_collectible_offset()` now returns the `_highest_translated_log_offset` instead of performing offset translation for the `highest_translated_offset`, which is currently more restrictive than it should be for housekeeping (due to it being translator batch unaware).
By handling gaps in offsets and recording seen keys, we can validate the correctness of a compacted log that has been translated (fully) into an iceberg table.
Adds a new `test_compaction` test, which uses the `KgoVerifierSeqConsumer` to validate a fully compacted log, along with the `datalake_verifier` service to validate the Iceberg table. Also moves the contents of `compaction_gaps_test.py` into `compaction_test.py`.
b01095c
to
4bd7693
Compare
Force push to:
|
Previously, the
datalake::translation::translation_stm
would return its max collectible as the following:redpanda/src/v/datalake/translation/state_machine.cc
Lines 112 to 122 in 925707c
This offset translation leads to an overly restrictive condition for the max collectible offset, due to the fact that it is translation batch unaware.
Here, the utility function
get_translated_log_offset()
is added, which returns the "equivalent" translated log offset for a given kafka offset, taking into account translation batches (which don't need to be translated, and thus shouldn't restrict the max collectible offset).Use of this function is plumbed through the
partition_translator
and thetranslation_stm
, and we now bookkeep the_highest_translated_log_offset
in thetranslation_stm
to avoid any offset translation within it.Additionally, a new test for compaction with an Iceberg enabled topic is added to
datalake/compaction_test.py
, with some enhancements to thedatalake_verifier
service to make it compaction aware.Backports Required
Release Notes
Improvements