Skip to content

Commit

Permalink
Merge branch 'read-ahead' of github.com:numaproj/numaflow into read-a…
Browse files Browse the repository at this point in the history
…head
  • Loading branch information
yhl25 committed Dec 19, 2024
2 parents 3beaa87 + 1beb914 commit 7ee9c3a
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 0 deletions.
2 changes: 2 additions & 0 deletions rust/numaflow-core/src/config/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub(crate) mod source {

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct SourceConfig {
/// for high-throughput use-cases we read-ahead the next batch before the previous batch has
/// been acked (or completed). For most cases it should be set to false.
pub(crate) read_ahead: bool,
pub(crate) source_type: SourceType,
}
Expand Down
2 changes: 2 additions & 0 deletions rust/numaflow-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ impl Source {
let handle = tokio::spawn(async move {
let mut processed_msgs_count: usize = 0;
let mut last_logged_at = time::Instant::now();
// this semaphore is used only if read-ahead is disabled. we hold this semaphore to
// make sure we can read only if the current inflight ones are ack'ed.
let semaphore = Arc::new(Semaphore::new(1));

loop {
Expand Down

0 comments on commit 7ee9c3a

Please sign in to comment.