Skip to content

Commit

Permalink
Feature: Add RaftLogReader::limited_get_log_entries()
Browse files Browse the repository at this point in the history
This commit adds the `RaftLogReader::limited_get_log_entries()` method,
which enables applications to fetch log entries that are equal to or
smaller than a specified range. This functionality is particularly
useful for customizing the size of AppendEntries requests at the storage
API level.

- Applications can now decide the number of log entries to return based
  on the input range. If the application determines that the requested
  log entries range is too large for a single RPC, it can opt to return
  only the first several requested log entries instead of the full
  range.

- The method provides a default implementation that delegates the
  operation to `RaftLogReader::try_get_log_entries`.

This enhancement allows for more flexible and efficient handling of log
entries, particularly in scenarios where network constraints or
performance considerations require smaller data transfers.
  • Loading branch information
drmingdrmer committed Jun 16, 2024
1 parent d752e92 commit 075002f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
22 changes: 12 additions & 10 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,21 +393,23 @@ where
let r = LogIdRange::new(rng.prev, rng.prev);
(vec![], r)
} else {
let logs = self.log_reader.try_get_log_entries(start..end).await?;
debug_assert_eq!(
logs.len(),
(end - start) as usize,
"expect logs {}..{} but got only {} entries, first: {}, last: {}",
// limited_get_log_entries will return logs smaller than the range [start, end).
let logs = self.log_reader.limited_get_log_entries(start, end).await?;

let first = *logs.first().map(|x| x.get_log_id()).unwrap();
let last = *logs.last().map(|x| x.get_log_id()).unwrap();

debug_assert!(
!logs.is_empty() && logs.len() <= (end - start) as usize,
"expect logs ⊆ [{}..{}) but got {} entries, first: {}, last: {}",
start,
end,
logs.len(),
logs.first().map(|ent| ent.get_log_id()).display(),
logs.last().map(|ent| ent.get_log_id()).display()
first,
last
);

let last_log_id = logs.last().map(|ent| *ent.get_log_id());

let r = LogIdRange::new(rng.prev, last_log_id);
let r = LogIdRange::new(rng.prev, Some(last));
(logs, r)
}
};
Expand Down
17 changes: 17 additions & 0 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,23 @@ where C: RaftTypeConfig
/// A log reader must also be able to read the last saved vote by [`RaftLogStorage::save_vote`],
/// See: [log-stream](`crate::docs::protocol::replication::log_stream`)
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>;

/// Returns log entries within range `[start, end)`, `end` is exclusive,
/// potentially limited by implementation-defined constraints.
///
/// If the specified range is too large, the implementation may return only the first few log
/// entries to ensure the result is not excessively large.
///
/// It must not return empty result if the input range is not empty.
///
/// The default implementation just returns the full range of log entries.
async fn limited_get_log_entries(
&mut self,
start: u64,
end: u64,
) -> Result<Vec<C::Entry>, StorageError<C::NodeId>> {
self.try_get_log_entries(start..end).await
}
}

/// A trait defining the interface for a Raft state machine snapshot subsystem.
Expand Down
22 changes: 22 additions & 0 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ where
run_fut(run_test(builder, Self::get_initial_state_re_apply_committed))?;
run_fut(run_test(builder, Self::save_vote))?;
run_fut(run_test(builder, Self::get_log_entries))?;
run_fut(run_test(builder, Self::limited_get_log_entries))?;
run_fut(run_test(builder, Self::try_get_log_entry))?;
run_fut(run_test(builder, Self::initial_logs))?;
run_fut(run_test(builder, Self::get_log_state))?;
Expand Down Expand Up @@ -720,6 +721,27 @@ where
Ok(())
}

pub async fn limited_get_log_entries(mut store: LS, mut sm: SM) -> Result<(), StorageError<C::NodeId>> {
Self::feed_10_logs_vote_self(&mut store).await?;

tracing::info!("--- get start == stop");
{
let logs = store.limited_get_log_entries(3, 3).await?;
assert_eq!(logs.len(), 0, "expected no logs to be returned");
}

tracing::info!("--- get start < stop");
{
let logs = store.limited_get_log_entries(5, 7).await?;

assert!(!logs.is_empty());
assert!(logs.len() <= 2);
assert_eq!(*logs[0].get_log_id(), log_id_0(1, 5));
}

Ok(())
}

pub async fn try_get_log_entry(mut store: LS, mut sm: SM) -> Result<(), StorageError<C::NodeId>> {
Self::feed_10_logs_vote_self(&mut store).await?;

Expand Down

0 comments on commit 075002f

Please sign in to comment.