Skip to content

Commit

Permalink
feat(rust): Commit offsets during join (#4990)
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara authored Nov 9, 2023
1 parent 987cbbf commit c64493b
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions src/processing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct Strategies<TPayload: Clone> {

struct Callbacks<TPayload: Clone> {
strategies: Arc<Mutex<Strategies<TPayload>>>,
consumer: Arc<Mutex<dyn Consumer<TPayload>>>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -67,9 +68,11 @@ impl<TPayload: 'static + Clone> AssignmentCallbacks for Callbacks<TPayload> {
None => {}
Some(s) => {
s.close();
// TODO: We need to actually call consumer.commit() with the commit request.
// Right now we are never committing during consumer shutdown.
let _ = s.join(None);
if let Ok(Some(commit_request)) = s.join(None) {
let mut consumer = self.consumer.lock().unwrap();
consumer.stage_offsets(commit_request.positions).unwrap();
consumer.commit_offsets().unwrap();
}
}
}
stg.strategy = None;
Expand All @@ -85,8 +88,14 @@ impl<TPayload: 'static + Clone> AssignmentCallbacks for Callbacks<TPayload> {
}

impl<TPayload: Clone> Callbacks<TPayload> {
pub fn new(strategies: Arc<Mutex<Strategies<TPayload>>>) -> Self {
Self { strategies }
pub fn new(
strategies: Arc<Mutex<Strategies<TPayload>>>,
consumer: Arc<Mutex<dyn Consumer<TPayload>>>,
) -> Self {
Self {
strategies,
consumer,
}
}
}

Expand Down Expand Up @@ -128,8 +137,10 @@ impl<TPayload: 'static + Clone> StreamProcessor<TPayload> {
}

pub fn subscribe(&mut self, topic: Topic) {
let callbacks: Box<dyn AssignmentCallbacks> =
Box::new(Callbacks::new(self.strategies.clone()));
let callbacks: Box<dyn AssignmentCallbacks> = Box::new(Callbacks::new(
self.strategies.clone(),
self.consumer.clone(),
));
self.consumer
.lock()
.unwrap()
Expand Down Expand Up @@ -212,7 +223,7 @@ impl<TPayload: 'static + Clone> StreamProcessor<TPayload> {
Ok(()) => {
// Resume if we are currently in a paused state
if self.is_paused {
let partitions: std::collections::HashSet<Partition> = self
let partitions = self
.consumer
.lock()
.unwrap()
Expand Down

0 comments on commit c64493b

Please sign in to comment.