Skip to content

Commit 1f4207d

Browse files
authored
commitlog: Include latest commit offset in segment metadata (#2733)
1 parent d1ed964 commit 1f4207d

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

crates/commitlog/src/repo/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ pub fn resume_segment_writer<R: Repo>(
243243
tx_range,
244244
size_in_bytes,
245245
max_epoch,
246+
max_commit_offset: _,
246247
} = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
247248
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
248249
warn!("invalid commit in segment {offset}: {source}");

crates/commitlog/src/segment.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -526,14 +526,24 @@ impl<R: io::BufRead> Iterator for CommitsWithVersion<R> {
526526

527527
#[derive(Clone, Debug, Eq, PartialEq)]
528528
pub struct Metadata {
529+
/// The segment header.
529530
pub header: Header,
531+
/// The range of transactions contained in the segment.
530532
pub tx_range: Range<u64>,
533+
/// The size of the segment.
531534
pub size_in_bytes: u64,
535+
/// The largest epoch found in the segment.
532536
pub max_epoch: u64,
537+
/// The latest commit found in the segment.
538+
///
539+
/// The value is the `min_tx_offset` of the commit, i.e.
540+
/// `max_commit_offset..tx_range.end` is the range of
541+
/// transactions contained in it.
542+
pub max_commit_offset: u64,
533543
}
534544

535545
impl Metadata {
536-
/// Reads and validates metadata from a segment.
546+
/// Reads and validates metadata from a segment.
537547
/// It will look for last commit index offset and then traverse the segment
538548
///
539549
/// Determines `max_tx_offset`, `size_in_bytes`, and `max_epoch` from the segment.
@@ -562,6 +572,7 @@ impl Metadata {
562572
},
563573
size_in_bytes: Header::LEN as u64,
564574
max_epoch: u64::default(),
575+
max_commit_offset: min_tx_offset,
565576
});
566577

567578
reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
@@ -597,6 +608,7 @@ impl Metadata {
597608
sofar.size_in_bytes += commit.size_in_bytes;
598609
// TODO: Should it be an error to encounter an epoch going backwards?
599610
sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
611+
sofar.max_commit_offset = commit.tx_range.start;
600612
}
601613

602614
Ok(sofar)
@@ -628,6 +640,7 @@ impl Metadata {
628640
},
629641
size_in_bytes: byte_offset + commit.size_in_bytes,
630642
max_epoch: commit.epoch,
643+
max_commit_offset: commit.tx_range.start,
631644
});
632645
}
633646

@@ -734,28 +747,31 @@ mod tests {
734747
let repo = repo::Memory::default();
735748

736749
let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap();
750+
// Commit 0..2
737751
writer.append([0; 32]).unwrap();
738752
writer.append([0; 32]).unwrap();
739753
writer.commit().unwrap();
754+
// Commit 2..3
740755
writer.append([1; 32]).unwrap();
741756
writer.commit().unwrap();
757+
// Commit 3..5
742758
writer.append([2; 32]).unwrap();
743759
writer.append([2; 32]).unwrap();
744760
writer.commit().unwrap();
745761

746762
let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
747-
let Metadata {
748-
header: _,
749-
tx_range,
750-
size_in_bytes,
751-
max_epoch: _,
752-
} = reader.metadata().unwrap();
753-
754-
assert_eq!(tx_range.start, 0);
755-
assert_eq!(tx_range.end, 5);
763+
let metadata = reader.metadata().unwrap();
764+
756765
assert_eq!(
757-
size_in_bytes,
758-
(Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64
766+
metadata,
767+
Metadata {
768+
header: Header::default(),
769+
tx_range: Range { start: 0, end: 5 },
770+
// header + 5 txs + 3 commits
771+
size_in_bytes: (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64,
772+
max_epoch: Commit::DEFAULT_EPOCH,
773+
max_commit_offset: 3
774+
}
759775
);
760776
}
761777

0 commit comments

Comments
 (0)