Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Commit offsets from strategy join #4936

Merged
merged 3 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust_snuba/rust_arroyo/examples/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {
false,
None,
);
let mut consumer = KafkaConsumer::new(config);
let mut consumer = KafkaConsumer::new(config).unwrap();
let topic = Topic {
name: "test_static".to_string(),
};
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/rust_arroyo/examples/base_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
false,
None,
);
let consumer = Box::new(KafkaConsumer::new(config));
let consumer = KafkaConsumer::new(config).unwrap();
let topic = Topic {
name: "test_static".to_string(),
};
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/rust_arroyo/examples/transform_and_produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn main() {
None,
);

let consumer = Box::new(KafkaConsumer::new(config.clone()));
let consumer = KafkaConsumer::new(config.clone()).unwrap();
let mut processor = StreamProcessor::new(
consumer,
Box::new(ReverseStringAndProduceStrategyFactory {
Expand Down
84 changes: 37 additions & 47 deletions rust_snuba/rust_arroyo/src/backends/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,13 @@ fn create_kafka_message(msg: BorrowedMessage) -> BrokerMessage<KafkaPayload> {
)
}

#[derive(Clone)]
pub struct CustomContext {
// This is horrible. I want to mutate callbacks (to invoke on_assign)
// From the pre_rebalance function.
// But pre_rebalance gets &self and not &mut self.
// I am sure there has to be a better way to do this.
callbacks: Mutex<Box<dyn AssignmentCallbacks>>,
callbacks: Arc<Mutex<Option<Box<dyn AssignmentCallbacks>>>>,
consumer_offsets: Arc<Mutex<HashMap<Partition, u64>>>,
}

Expand All @@ -104,7 +105,7 @@ impl ConsumerContext for CustomContext {
offsets.remove(partition);
}

self.callbacks.lock().unwrap().on_revoke(partitions);
self.callbacks.lock().unwrap().as_mut().unwrap().on_revoke(partitions);
}
}

Expand All @@ -129,65 +130,58 @@ impl ConsumerContext for CustomContext {
for (partition, offset) in map.clone() {
offsets.insert(partition, offset);
}
self.callbacks.lock().unwrap().on_assign(map);
self.callbacks.lock().unwrap().as_mut().unwrap().on_assign(map);
}
}

fn commit_callback(&self, _: KafkaResult<()>, _offsets: &TopicPartitionList) {}
}

pub struct KafkaConsumer {
// TODO: This has to be an option as of now because rdkafka requires
// callbacks during the instantiation. While the streaming processor
// can only pass the callbacks during the subscribe call.
// So we need to build the kafka consumer upon subscribe and not
// in the constructor.
pub consumer: Option<BaseConsumer<CustomContext>>,
config: KafkaConfig,
pub consumer: BaseConsumer<CustomContext>,
context: CustomContext,
state: KafkaConsumerState,
offsets: Arc<Mutex<HashMap<Partition, u64>>>,
staged_offsets: HashMap<Partition, u64>,
}

impl KafkaConsumer {
pub fn new(config: KafkaConfig) -> Self {
Self {
consumer: None,
config,
pub fn new(config: KafkaConfig) -> Result<Self, ConsumerError> {
let context = CustomContext {
callbacks: Arc::new(Mutex::new(None)),
consumer_offsets: Arc::new(Mutex::new(HashMap::new())),
};

let mut config_obj: ClientConfig = config.clone().into();

let consumer: BaseConsumer<CustomContext> = config_obj
.set_log_level(RDKafkaLogLevel::Warning)
.create_with_context(context.clone())?;

Ok(Self {
consumer,
state: KafkaConsumerState::NotSubscribed,
offsets: Arc::new(Mutex::new(HashMap::new())),
context,
staged_offsets: HashMap::new(),
}
})
}
}

impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {
impl ArroyoConsumer<KafkaPayload> for KafkaConsumer {
fn subscribe(
&mut self,
topics: &[Topic],
callbacks: Box<dyn AssignmentCallbacks>,
) -> Result<(), ConsumerError> {
let context = CustomContext {
callbacks: Mutex::new(callbacks),
consumer_offsets: self.offsets.clone(),
};

let mut config_obj: ClientConfig = self.config.clone().into();

let consumer: BaseConsumer<CustomContext> = config_obj
.set_log_level(RDKafkaLogLevel::Warning)
.create_with_context(context)?;
let topic_str: Vec<&str> = topics.iter().map(|t| t.name.as_ref()).collect();
consumer.subscribe(&topic_str)?;
self.consumer = Some(consumer);
*self.context.callbacks.lock().unwrap() = Some(callbacks);
self.consumer.subscribe(&topic_str)?;
self.state = KafkaConsumerState::Consuming;
Ok(())
}

fn unsubscribe(&mut self) -> Result<(), ConsumerError> {
self.state.assert_consuming_state()?;
let consumer = self.consumer.as_mut().unwrap();
consumer.unsubscribe();
self.consumer.unsubscribe();

Ok(())
}
Expand All @@ -199,8 +193,7 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {
self.state.assert_consuming_state()?;

let duration = timeout.unwrap_or(Duration::ZERO);
let consumer = self.consumer.as_mut().unwrap();
let res = consumer.poll(duration);
let res = self.consumer.poll(duration);
match res {
None => Ok(None),
Some(res) => {
Expand All @@ -216,7 +209,7 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {
let mut topic_map = HashMap::new();
for partition in partitions {
let offset = *self
.offsets
.context.consumer_offsets
.lock()
.unwrap()
.get(&partition)
Expand All @@ -227,9 +220,8 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {
);
}

let consumer = self.consumer.as_ref().unwrap();
let topic_partition_list = TopicPartitionList::from_topic_map(&topic_map).unwrap();
consumer.pause(&topic_partition_list)?;
self.consumer.pause(&topic_partition_list)?;

Ok(())
}
Expand All @@ -239,14 +231,13 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {

let mut topic_partition_list = TopicPartitionList::new();
for partition in partitions {
if !self.offsets.lock().unwrap().contains_key(&partition) {
if !self.context.consumer_offsets.lock().unwrap().contains_key(&partition) {
return Err(ConsumerError::UnassignedPartition);
}
topic_partition_list.add_partition(&partition.topic.name, partition.index as i32);
}

let consumer = self.consumer.as_mut().unwrap();
consumer.resume(&topic_partition_list)?;
self.consumer.resume(&topic_partition_list)?;

Ok(())
}
Expand All @@ -258,7 +249,7 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {

fn tell(&self) -> Result<HashMap<Partition, u64>, ConsumerError> {
self.state.assert_consuming_state()?;
Ok(self.offsets.lock().unwrap().clone())
Ok(self.context.consumer_offsets.lock().unwrap().clone())
}

fn seek(&self, _: HashMap<Partition, u64>) -> Result<(), ConsumerError> {
Expand All @@ -284,9 +275,8 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {
);
}

let consumer = self.consumer.as_mut().unwrap();
let partitions = TopicPartitionList::from_topic_map(&topic_map).unwrap();
consumer.commit(&partitions, CommitMode::Sync).unwrap();
self.consumer.commit(&partitions, CommitMode::Sync).unwrap();

// Clear staged offsets
let cleared_map = HashMap::new();
Expand All @@ -297,7 +287,7 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer {

fn close(&mut self) {
self.state = KafkaConsumerState::Closed;
self.consumer = None;
// TODO: consume self?
}

fn closed(&self) -> bool {
Expand Down Expand Up @@ -363,7 +353,7 @@ mod tests {
false,
None,
);
let mut consumer = KafkaConsumer::new(configuration);
let mut consumer = KafkaConsumer::new(configuration).unwrap();
let topic = Topic {
name: "test".to_string(),
};
Expand All @@ -381,7 +371,7 @@ mod tests {
false,
None,
);
let mut consumer = KafkaConsumer::new(configuration);
let mut consumer = KafkaConsumer::new(configuration).unwrap();
let topic = Topic {
name: "test".to_string(),
};
Expand Down Expand Up @@ -412,7 +402,7 @@ mod tests {
None,
);

let mut consumer = KafkaConsumer::new(configuration);
let mut consumer = KafkaConsumer::new(configuration).unwrap();
let topic = Topic {
name: "test2".to_string(),
};
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/rust_arroyo/src/backends/local/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl From<TopicDoesNotExist> for BrokerError {
}
}

impl<TPayload: Clone> LocalBroker<TPayload> {
impl<TPayload: Clone + Send> LocalBroker<TPayload> {
pub fn new(storage: Box<dyn MessageStorage<TPayload>>, clock: Box<dyn Clock>) -> Self {
Self {
storage,
Expand Down
28 changes: 14 additions & 14 deletions rust_snuba/rust_arroyo/src/backends/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ struct SubscriptionState {
last_eof_at: HashMap<Partition, u64>,
}

pub struct LocalConsumer<'a, TPayload: Clone> {
pub struct LocalConsumer<TPayload: Clone> {
id: Uuid,
group: String,
broker: &'a mut LocalBroker<TPayload>,
broker: LocalBroker<TPayload>,
pending_callback: VecDeque<Callback>,
paused: HashSet<Partition>,
// The offset that a the last ``EndOfPartition`` exception that was
Expand All @@ -40,10 +40,10 @@ pub struct LocalConsumer<'a, TPayload: Clone> {
closed: bool,
}

impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> {
impl<TPayload: Clone> LocalConsumer<TPayload> {
pub fn new(
id: Uuid,
broker: &'a mut LocalBroker<TPayload>,
broker: LocalBroker<TPayload>,
group: String,
enable_end_of_partition: bool,
) -> Self {
Expand All @@ -68,7 +68,7 @@ impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> {
}
}

impl<'a, TPayload: Clone> Consumer<'a, TPayload> for LocalConsumer<'a, TPayload> {
impl<TPayload: Clone + Send> Consumer<TPayload> for LocalConsumer<TPayload> {
fn subscribe(
&mut self,
topics: &[Topic],
Expand Down Expand Up @@ -327,7 +327,7 @@ mod tests {

#[test]
fn test_consumer_subscription() {
let mut broker = build_broker();
let broker = build_broker();

let topic1 = Topic {
name: "test1".to_string(),
Expand All @@ -338,7 +338,7 @@ mod tests {

let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(EmptyCallbacks {});
let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true);
LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true);
assert!(consumer.subscription_state.topics.is_empty());

let res = consumer.subscribe(&[topic1.clone(), topic2.clone()], my_callbacks);
Expand Down Expand Up @@ -381,7 +381,7 @@ mod tests {

#[test]
fn test_subscription_callback() {
let mut broker = build_broker();
let broker = build_broker();

let topic1 = Topic {
name: "test1".to_string(),
Expand Down Expand Up @@ -456,7 +456,7 @@ mod tests {
let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(TheseCallbacks {});

let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true);
LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true);

let _ = consumer.subscribe(&[topic1, topic2], my_callbacks);
let _ = consumer.poll(Some(Duration::from_millis(100)));
Expand Down Expand Up @@ -501,7 +501,7 @@ mod tests {

let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(TheseCallbacks {});
let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true);
LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true);

let _ = consumer.subscribe(&[topic2], my_callbacks);

Expand All @@ -523,7 +523,7 @@ mod tests {

#[test]
fn test_paused() {
let mut broker = build_broker();
let broker = build_broker();
let topic2 = Topic {
name: "test2".to_string(),
};
Expand All @@ -533,7 +533,7 @@ mod tests {
};
let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(EmptyCallbacks {});
let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false);
LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false);
let _ = consumer.subscribe(&[topic2], my_callbacks);

assert_eq!(consumer.poll(None).unwrap(), None);
Expand All @@ -549,10 +549,10 @@ mod tests {

#[test]
fn test_commit() {
let mut broker = build_broker();
let broker = build_broker();
let my_callbacks: Box<dyn AssignmentCallbacks> = Box::new(EmptyCallbacks {});
let mut consumer =
LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false);
LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false);
let topic2 = Topic {
name: "test2".to_string(),
};
Expand Down
4 changes: 2 additions & 2 deletions rust_snuba/rust_arroyo/src/backends/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum ProducerError {

/// This is basically an observer pattern to receive the callbacks from
/// the consumer when partitions are assigned/revoked.
pub trait AssignmentCallbacks: Send + Sync {
pub trait AssignmentCallbacks: Send {
fn on_assign(&mut self, partitions: HashMap<Partition, u64>);
fn on_revoke(&mut self, partitions: Vec<Partition>);
}
Expand Down Expand Up @@ -80,7 +80,7 @@ pub trait AssignmentCallbacks: Send + Sync {
/// occurs even if the consumer retains ownership of the partition across
/// assignments.) For this reason, it is generally good practice to ensure
/// offsets are committed as part of the revocation callback.
pub trait Consumer<'a, TPayload: Clone> {
pub trait Consumer<TPayload: Clone>: Send {
fn subscribe(
&mut self,
topic: &[Topic],
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/rust_arroyo/src/backends/storages/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<TPayload: Clone> Default for MemoryMessageStorage<TPayload> {
}
}

impl<TPayload: Clone> MessageStorage<TPayload> for MemoryMessageStorage<TPayload> {
impl<TPayload: Clone + Send> MessageStorage<TPayload> for MemoryMessageStorage<TPayload> {
fn create_topic(&mut self, topic: Topic, partitions: u16) -> Result<(), TopicExists> {
if self.topics.contains_key(&topic) {
return Err(TopicExists);
Expand Down
2 changes: 1 addition & 1 deletion rust_snuba/rust_arroyo/src/backends/storages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub enum ConsumeError {
OffsetOutOfRange,
}

pub trait MessageStorage<TPayload: Clone> {
pub trait MessageStorage<TPayload: Clone + Send>: Send {
// Create a topic with the given number of partitions.
//
// If the topic already exists, a ``TopicExists`` exception will be
Expand Down
Loading