From be6f85f1d55fc80da66fdb3b882a24359e03b3b5 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 27 Oct 2023 14:56:22 +0200 Subject: [PATCH 1/3] fix: Commit offsets from strategy join The return value of Strategy.join() is currently ignored, because we have no access to the consumer from within the assignment callbacks. Shuffle a few things around so that consumer callbacks contain Arc to the consumer internally. --- .../rust_arroyo/examples/base_consumer.rs | 2 +- .../rust_arroyo/examples/base_processor.rs | 2 +- .../examples/transform_and_produce.rs | 2 +- .../rust_arroyo/src/backends/kafka/mod.rs | 84 ++++++++----------- .../rust_arroyo/src/backends/local/broker.rs | 2 +- .../rust_arroyo/src/backends/local/mod.rs | 28 +++---- rust_snuba/rust_arroyo/src/backends/mod.rs | 4 +- .../src/backends/storages/memory.rs | 2 +- .../rust_arroyo/src/backends/storages/mod.rs | 2 +- rust_snuba/rust_arroyo/src/processing/mod.rs | 80 +++++++++--------- rust_snuba/rust_arroyo/src/utils/clock.rs | 2 +- 11 files changed, 102 insertions(+), 108 deletions(-) diff --git a/rust_snuba/rust_arroyo/examples/base_consumer.rs b/rust_snuba/rust_arroyo/examples/base_consumer.rs index 795a75f6bd6..820c30647d7 100644 --- a/rust_snuba/rust_arroyo/examples/base_consumer.rs +++ b/rust_snuba/rust_arroyo/examples/base_consumer.rs @@ -21,7 +21,7 @@ fn main() { false, None, ); - let mut consumer = KafkaConsumer::new(config); + let mut consumer = KafkaConsumer::new(config).unwrap(); let topic = Topic { name: "test_static".to_string(), }; diff --git a/rust_snuba/rust_arroyo/examples/base_processor.rs b/rust_snuba/rust_arroyo/examples/base_processor.rs index 362c8bb17af..36c9ea709db 100644 --- a/rust_snuba/rust_arroyo/examples/base_processor.rs +++ b/rust_snuba/rust_arroyo/examples/base_processor.rs @@ -24,7 +24,7 @@ fn main() { false, None, ); - let consumer = Box::new(KafkaConsumer::new(config)); + let consumer = KafkaConsumer::new(config).unwrap(); let topic = Topic { name: "test_static".to_string(), }; diff --git a/rust_snuba/rust_arroyo/examples/transform_and_produce.rs b/rust_snuba/rust_arroyo/examples/transform_and_produce.rs index 371524af18b..a8f2c5bf3a5 100644 --- a/rust_snuba/rust_arroyo/examples/transform_and_produce.rs +++ b/rust_snuba/rust_arroyo/examples/transform_and_produce.rs @@ -73,7 +73,7 @@ async fn main() { None, ); - let consumer = Box::new(KafkaConsumer::new(config.clone())); + let consumer = KafkaConsumer::new(config.clone()).unwrap(); let mut processor = StreamProcessor::new( consumer, Box::new(ReverseStringAndProduceStrategyFactory { diff --git a/rust_snuba/rust_arroyo/src/backends/kafka/mod.rs b/rust_snuba/rust_arroyo/src/backends/kafka/mod.rs index a9a6b987749..5ac0ff3203f 100644 --- a/rust_snuba/rust_arroyo/src/backends/kafka/mod.rs +++ b/rust_snuba/rust_arroyo/src/backends/kafka/mod.rs @@ -73,12 +73,13 @@ fn create_kafka_message(msg: BorrowedMessage) -> BrokerMessage { ) } +#[derive(Clone)] pub struct CustomContext { // This is horrible. I want to mutate callbacks (to invoke on_assign) // From the pre_rebalance function. // But pre_rebalance gets &self and not &mut self. // I am sure there has to be a better way to do this. - callbacks: Mutex>, + callbacks: Arc>>>, consumer_offsets: Arc>>, } @@ -104,7 +105,7 @@ impl ConsumerContext for CustomContext { offsets.remove(partition); } - self.callbacks.lock().unwrap().on_revoke(partitions); + self.callbacks.lock().unwrap().as_mut().unwrap().on_revoke(partitions); } } @@ -129,7 +130,7 @@ impl ConsumerContext for CustomContext { for (partition, offset) in map.clone() { offsets.insert(partition, offset); } - self.callbacks.lock().unwrap().on_assign(map); + self.callbacks.lock().unwrap().as_mut().unwrap().on_assign(map); } } @@ -137,57 +138,50 @@ impl ConsumerContext for CustomContext { } pub struct KafkaConsumer { - // TODO: This has to be an option as of now because rdkafka requires - // callbacks during the instantiation. While the streaming processor - // can only pass the callbacks during the subscribe call. - // So we need to build the kafka consumer upon subscribe and not - // in the constructor. - pub consumer: Option>, - config: KafkaConfig, + pub consumer: BaseConsumer, + context: CustomContext, state: KafkaConsumerState, - offsets: Arc>>, staged_offsets: HashMap, } impl KafkaConsumer { - pub fn new(config: KafkaConfig) -> Self { - Self { - consumer: None, - config, + pub fn new(config: KafkaConfig) -> Result { + let context = CustomContext { + callbacks: Arc::new(Mutex::new(None)), + consumer_offsets: Arc::new(Mutex::new(HashMap::new())), + }; + + let mut config_obj: ClientConfig = config.clone().into(); + + let consumer: BaseConsumer = config_obj + .set_log_level(RDKafkaLogLevel::Warning) + .create_with_context(context.clone())?; + + Ok(Self { + consumer, state: KafkaConsumerState::NotSubscribed, - offsets: Arc::new(Mutex::new(HashMap::new())), + context, staged_offsets: HashMap::new(), - } + }) } } -impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { +impl ArroyoConsumer for KafkaConsumer { fn subscribe( &mut self, topics: &[Topic], callbacks: Box, ) -> Result<(), ConsumerError> { - let context = CustomContext { - callbacks: Mutex::new(callbacks), - consumer_offsets: self.offsets.clone(), - }; - - let mut config_obj: ClientConfig = self.config.clone().into(); - - let consumer: BaseConsumer = config_obj - .set_log_level(RDKafkaLogLevel::Warning) - .create_with_context(context)?; let topic_str: Vec<&str> = topics.iter().map(|t| t.name.as_ref()).collect(); - consumer.subscribe(&topic_str)?; - self.consumer = Some(consumer); + *self.context.callbacks.lock().unwrap() = Some(callbacks); + self.consumer.subscribe(&topic_str)?; self.state = KafkaConsumerState::Consuming; Ok(()) } fn unsubscribe(&mut self) -> Result<(), ConsumerError> { self.state.assert_consuming_state()?; - let consumer = self.consumer.as_mut().unwrap(); - consumer.unsubscribe(); + self.consumer.unsubscribe(); Ok(()) } @@ -199,8 +193,7 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { self.state.assert_consuming_state()?; let duration = timeout.unwrap_or(Duration::ZERO); - let consumer = self.consumer.as_mut().unwrap(); - let res = consumer.poll(duration); + let res = self.consumer.poll(duration); match res { None => Ok(None), Some(res) => { @@ -216,7 +209,7 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { let mut topic_map = HashMap::new(); for partition in partitions { let offset = *self - .offsets + .context.consumer_offsets .lock() .unwrap() .get(&partition) @@ -227,9 +220,8 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { ); } - let consumer = self.consumer.as_ref().unwrap(); let topic_partition_list = TopicPartitionList::from_topic_map(&topic_map).unwrap(); - consumer.pause(&topic_partition_list)?; + self.consumer.pause(&topic_partition_list)?; Ok(()) } @@ -239,14 +231,13 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { let mut topic_partition_list = TopicPartitionList::new(); for partition in partitions { - if !self.offsets.lock().unwrap().contains_key(&partition) { + if !self.context.consumer_offsets.lock().unwrap().contains_key(&partition) { return Err(ConsumerError::UnassignedPartition); } topic_partition_list.add_partition(&partition.topic.name, partition.index as i32); } - let consumer = self.consumer.as_mut().unwrap(); - consumer.resume(&topic_partition_list)?; + self.consumer.resume(&topic_partition_list)?; Ok(()) } @@ -258,7 +249,7 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { fn tell(&self) -> Result, ConsumerError> { self.state.assert_consuming_state()?; - Ok(self.offsets.lock().unwrap().clone()) + Ok(self.context.consumer_offsets.lock().unwrap().clone()) } fn seek(&self, _: HashMap) -> Result<(), ConsumerError> { @@ -284,9 +275,8 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { ); } - let consumer = self.consumer.as_mut().unwrap(); let partitions = TopicPartitionList::from_topic_map(&topic_map).unwrap(); - consumer.commit(&partitions, CommitMode::Sync).unwrap(); + self.consumer.commit(&partitions, CommitMode::Sync).unwrap(); // Clear staged offsets let cleared_map = HashMap::new(); @@ -297,7 +287,7 @@ impl<'a> ArroyoConsumer<'a, KafkaPayload> for KafkaConsumer { fn close(&mut self) { self.state = KafkaConsumerState::Closed; - self.consumer = None; + // TODO: consume self? } fn closed(&self) -> bool { @@ -363,7 +353,7 @@ mod tests { false, None, ); - let mut consumer = KafkaConsumer::new(configuration); + let mut consumer = KafkaConsumer::new(configuration).unwrap(); let topic = Topic { name: "test".to_string(), }; @@ -381,7 +371,7 @@ mod tests { false, None, ); - let mut consumer = KafkaConsumer::new(configuration); + let mut consumer = KafkaConsumer::new(configuration).unwrap(); let topic = Topic { name: "test".to_string(), }; @@ -412,7 +402,7 @@ mod tests { None, ); - let mut consumer = KafkaConsumer::new(configuration); + let mut consumer = KafkaConsumer::new(configuration).unwrap(); let topic = Topic { name: "test2".to_string(), }; diff --git a/rust_snuba/rust_arroyo/src/backends/local/broker.rs b/rust_snuba/rust_arroyo/src/backends/local/broker.rs index 572feb15bfd..900957b7bd3 100644 --- a/rust_snuba/rust_arroyo/src/backends/local/broker.rs +++ b/rust_snuba/rust_arroyo/src/backends/local/broker.rs @@ -32,7 +32,7 @@ impl From for BrokerError { } } -impl LocalBroker { +impl LocalBroker { pub fn new(storage: Box>, clock: Box) -> Self { Self { storage, diff --git a/rust_snuba/rust_arroyo/src/backends/local/mod.rs b/rust_snuba/rust_arroyo/src/backends/local/mod.rs index e162ffe9506..2de89f609d8 100644 --- a/rust_snuba/rust_arroyo/src/backends/local/mod.rs +++ b/rust_snuba/rust_arroyo/src/backends/local/mod.rs @@ -24,10 +24,10 @@ struct SubscriptionState { last_eof_at: HashMap, } -pub struct LocalConsumer<'a, TPayload: Clone> { +pub struct LocalConsumer { id: Uuid, group: String, - broker: &'a mut LocalBroker, + broker: LocalBroker, pending_callback: VecDeque, paused: HashSet, // The offset that a the last ``EndOfPartition`` exception that was @@ -40,10 +40,10 @@ pub struct LocalConsumer<'a, TPayload: Clone> { closed: bool, } -impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> { +impl LocalConsumer { pub fn new( id: Uuid, - broker: &'a mut LocalBroker, + broker: LocalBroker, group: String, enable_end_of_partition: bool, ) -> Self { @@ -68,7 +68,7 @@ impl<'a, TPayload: Clone> LocalConsumer<'a, TPayload> { } } -impl<'a, TPayload: Clone> Consumer<'a, TPayload> for LocalConsumer<'a, TPayload> { +impl Consumer for LocalConsumer { fn subscribe( &mut self, topics: &[Topic], @@ -327,7 +327,7 @@ mod tests { #[test] fn test_consumer_subscription() { - let mut broker = build_broker(); + let broker = build_broker(); let topic1 = Topic { name: "test1".to_string(), @@ -338,7 +338,7 @@ mod tests { let my_callbacks: Box = Box::new(EmptyCallbacks {}); let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true); + LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true); assert!(consumer.subscription_state.topics.is_empty()); let res = consumer.subscribe(&[topic1.clone(), topic2.clone()], my_callbacks); @@ -381,7 +381,7 @@ mod tests { #[test] fn test_subscription_callback() { - let mut broker = build_broker(); + let broker = build_broker(); let topic1 = Topic { name: "test1".to_string(), @@ -456,7 +456,7 @@ mod tests { let my_callbacks: Box = Box::new(TheseCallbacks {}); let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true); + LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true); let _ = consumer.subscribe(&[topic1, topic2], my_callbacks); let _ = consumer.poll(Some(Duration::from_millis(100))); @@ -501,7 +501,7 @@ mod tests { let my_callbacks: Box = Box::new(TheseCallbacks {}); let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), true); + LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), true); let _ = consumer.subscribe(&[topic2], my_callbacks); @@ -523,7 +523,7 @@ mod tests { #[test] fn test_paused() { - let mut broker = build_broker(); + let broker = build_broker(); let topic2 = Topic { name: "test2".to_string(), }; @@ -533,7 +533,7 @@ mod tests { }; let my_callbacks: Box = Box::new(EmptyCallbacks {}); let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false); + LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false); let _ = consumer.subscribe(&[topic2], my_callbacks); assert_eq!(consumer.poll(None).unwrap(), None); @@ -549,10 +549,10 @@ mod tests { #[test] fn test_commit() { - let mut broker = build_broker(); + let broker = build_broker(); let my_callbacks: Box = Box::new(EmptyCallbacks {}); let mut consumer = - LocalConsumer::new(Uuid::nil(), &mut broker, "test_group".to_string(), false); + LocalConsumer::new(Uuid::nil(), broker, "test_group".to_string(), false); let topic2 = Topic { name: "test2".to_string(), }; diff --git a/rust_snuba/rust_arroyo/src/backends/mod.rs b/rust_snuba/rust_arroyo/src/backends/mod.rs index 1a75b71a166..6806f74dfaf 100755 --- a/rust_snuba/rust_arroyo/src/backends/mod.rs +++ b/rust_snuba/rust_arroyo/src/backends/mod.rs @@ -41,7 +41,7 @@ pub enum ProducerError { /// This is basically an observer pattern to receive the callbacks from /// the consumer when partitions are assigned/revoked. -pub trait AssignmentCallbacks: Send + Sync { +pub trait AssignmentCallbacks: Send { fn on_assign(&mut self, partitions: HashMap); fn on_revoke(&mut self, partitions: Vec); } @@ -80,7 +80,7 @@ pub trait AssignmentCallbacks: Send + Sync { /// occurs even if the consumer retains ownership of the partition across /// assignments.) For this reason, it is generally good practice to ensure /// offsets are committed as part of the revocation callback. -pub trait Consumer<'a, TPayload: Clone> { +pub trait Consumer: Send { fn subscribe( &mut self, topic: &[Topic], diff --git a/rust_snuba/rust_arroyo/src/backends/storages/memory.rs b/rust_snuba/rust_arroyo/src/backends/storages/memory.rs index 5ace8de9071..b696d318d3c 100755 --- a/rust_snuba/rust_arroyo/src/backends/storages/memory.rs +++ b/rust_snuba/rust_arroyo/src/backends/storages/memory.rs @@ -68,7 +68,7 @@ impl Default for MemoryMessageStorage { } } -impl MessageStorage for MemoryMessageStorage { +impl MessageStorage for MemoryMessageStorage { fn create_topic(&mut self, topic: Topic, partitions: u16) -> Result<(), TopicExists> { if self.topics.contains_key(&topic) { return Err(TopicExists); diff --git a/rust_snuba/rust_arroyo/src/backends/storages/mod.rs b/rust_snuba/rust_arroyo/src/backends/storages/mod.rs index 2ac3caaacee..d4789e5d1e2 100755 --- a/rust_snuba/rust_arroyo/src/backends/storages/mod.rs +++ b/rust_snuba/rust_arroyo/src/backends/storages/mod.rs @@ -21,7 +21,7 @@ pub enum ConsumeError { OffsetOutOfRange, } -pub trait MessageStorage { +pub trait MessageStorage: Send { // Create a topic with the given number of partitions. // // If the topic already exists, a ``TopicExists`` exception will be diff --git a/rust_snuba/rust_arroyo/src/processing/mod.rs b/rust_snuba/rust_arroyo/src/processing/mod.rs index 74897e5b0de..77f11a469f2 100644 --- a/rust_snuba/rust_arroyo/src/processing/mod.rs +++ b/rust_snuba/rust_arroyo/src/processing/mod.rs @@ -34,6 +34,13 @@ struct Strategies { struct Callbacks { strategies: Arc>>, + consumer: Arc>>, +} + +impl Callbacks { + pub fn new(strategies: Arc>>, consumer: Arc >>) -> Self { + Self { strategies, consumer } + } } #[derive(Debug, Clone)] @@ -69,7 +76,11 @@ impl AssignmentCallbacks for Callbacks { s.close(); // TODO: We need to actually call consumer.commit() with the commit request. // Right now we are never committing during consumer shutdown. - let _ = s.join(None); + if let Some(commit_request) = s.join(None) { + let mut consumer = self.consumer.lock().unwrap(); + consumer.stage_offsets(commit_request.positions).unwrap(); + consumer.commit_offsets().unwrap(); + } } } stg.strategy = None; @@ -84,18 +95,12 @@ impl AssignmentCallbacks for Callbacks { } } -impl Callbacks { - pub fn new(strategies: Arc>>) -> Self { - Self { strategies } - } -} - /// A stream processor manages the relationship between a ``Consumer`` /// instance and a ``ProcessingStrategy``, ensuring that processing /// strategies are instantiated on partition assignment and closed on /// partition revocation. -pub struct StreamProcessor<'a, TPayload: Clone> { - consumer: Box + 'a>, +pub struct StreamProcessor { + consumer: Arc>>, strategies: Arc>>, message: Option>, processor_handle: ProcessorHandle, @@ -104,9 +109,9 @@ pub struct StreamProcessor<'a, TPayload: Clone> { metrics_buffer: metrics_buffer::MetricsBuffer, } -impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { - pub fn new( - consumer: Box + 'a>, +impl StreamProcessor { + pub fn new + 'static>( + consumer: C, processing_factory: Box>, ) -> Self { let strategies = Arc::new(Mutex::new(Strategies { @@ -114,6 +119,8 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { strategy: None, })); + let consumer = Arc::new(Mutex::new(consumer)); + Self { consumer, strategies, @@ -129,15 +136,15 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { pub fn subscribe(&mut self, topic: Topic) { let callbacks: Box = - Box::new(Callbacks::new(self.strategies.clone())); - self.consumer.subscribe(&[topic], callbacks).unwrap(); + Box::new(Callbacks::new(self.strategies.clone(), self.consumer.clone())); + self.consumer.lock().unwrap().subscribe(&[topic], callbacks).unwrap(); } pub fn run_once(&mut self) -> Result<(), RunError> { if self.is_paused { // If the consumer waas paused, it should not be returning any messages // on ``poll``. - let res = self.consumer.poll(Some(Duration::ZERO)).unwrap(); + let res = self.consumer.lock().unwrap().poll(Some(Duration::ZERO)).unwrap(); match res { None => {} @@ -148,7 +155,7 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { // even if there is no active assignment and/or processing strategy. let poll_start = Instant::now(); //TODO: Support errors properly - match self.consumer.poll(Some(Duration::from_secs(1))) { + match self.consumer.lock().unwrap().poll(Some(Duration::from_secs(1))) { Ok(msg) => { self.message = msg.map(|inner| Message { inner_message: InnerMessage::BrokerMessage(inner), @@ -170,14 +177,11 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { Some(_) => return Err(RunError::InvalidState), }, Some(strategy) => { - let commit_request = strategy.poll(); - match commit_request { - None => {} - Some(request) => { - self.consumer.stage_offsets(request.positions).unwrap(); - self.consumer.commit_offsets().unwrap(); - } - }; + if let Some(commit_request) = strategy.poll() { + let mut consumer = self.consumer.lock().unwrap(); + consumer.stage_offsets(commit_request.positions).unwrap(); + consumer.commit_offsets().unwrap(); + } let msg = self.message.take(); if let Some(msg_s) = msg { @@ -192,9 +196,9 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { // Resume if we are currently in a paused state if self.is_paused { let partitions: std::collections::HashSet = - self.consumer.tell().unwrap().keys().cloned().collect(); + self.consumer.lock().unwrap().tell().unwrap().keys().cloned().collect(); - let res = self.consumer.resume(partitions); + let res = self.consumer.lock().unwrap().resume(partitions); match res { Ok(()) => { self.is_paused = false; @@ -234,9 +238,9 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { log::warn!("Consumer is in backpressure state for more than 1 second, pausing",); let partitions = - self.consumer.tell().unwrap().keys().cloned().collect(); + self.consumer.lock().unwrap().tell().unwrap().keys().cloned().collect(); - let res = self.consumer.pause(partitions); + let res = self.consumer.lock().unwrap().pause(partitions); match res { Ok(()) => { self.is_paused = true; @@ -272,7 +276,7 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { } } drop(trait_callbacks); // unlock mutex so we can close consumer - self.consumer.close(); + self.consumer.lock().unwrap().close(); return Err(e); } } @@ -286,11 +290,11 @@ impl<'a, TPayload: 'static + Clone> StreamProcessor<'a, TPayload> { } pub fn shutdown(&mut self) { - self.consumer.close(); + self.consumer.lock().unwrap().close(); } pub fn tell(self) -> HashMap { - self.consumer.tell().unwrap() + self.consumer.lock().unwrap().tell().unwrap() } } @@ -359,13 +363,13 @@ mod tests { #[test] fn test_processor() { - let mut broker = build_broker(); - let consumer = Box::new(LocalConsumer::new( + let broker = build_broker(); + let consumer = LocalConsumer::new( Uuid::nil(), - &mut broker, + broker, "test_group".to_string(), false, - )); + ); let mut processor = StreamProcessor::new(consumer, Box::new(TestFactory {})); processor.subscribe(Topic { @@ -388,12 +392,12 @@ mod tests { let _ = broker.produce(&partition, "message1".to_string()); let _ = broker.produce(&partition, "message2".to_string()); - let consumer = Box::new(LocalConsumer::new( + let consumer = LocalConsumer::new( Uuid::nil(), - &mut broker, + broker, "test_group".to_string(), false, - )); + ); let mut processor = StreamProcessor::new(consumer, Box::new(TestFactory {})); processor.subscribe(Topic { diff --git a/rust_snuba/rust_arroyo/src/utils/clock.rs b/rust_snuba/rust_arroyo/src/utils/clock.rs index 0b161dec2a0..26f84fb78c5 100644 --- a/rust_snuba/rust_arroyo/src/utils/clock.rs +++ b/rust_snuba/rust_arroyo/src/utils/clock.rs @@ -1,7 +1,7 @@ use std::thread::sleep; use std::time::{Duration, SystemTime}; -pub trait Clock { +pub trait Clock: Send { fn time(&self) -> SystemTime; fn sleep(self, duration: Duration); From 8db8c01182b72fa300a43c09288b78da1bee6936 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 27 Oct 2023 14:59:49 +0200 Subject: [PATCH 2/3] remove todo --- rust_snuba/rust_arroyo/src/processing/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust_snuba/rust_arroyo/src/processing/mod.rs b/rust_snuba/rust_arroyo/src/processing/mod.rs index 77f11a469f2..ec888400498 100644 --- a/rust_snuba/rust_arroyo/src/processing/mod.rs +++ b/rust_snuba/rust_arroyo/src/processing/mod.rs @@ -74,8 +74,6 @@ impl AssignmentCallbacks for Callbacks { None => {} Some(s) => { s.close(); - // TODO: We need to actually call consumer.commit() with the commit request. - // Right now we are never committing during consumer shutdown. if let Some(commit_request) = s.join(None) { let mut consumer = self.consumer.lock().unwrap(); consumer.stage_offsets(commit_request.positions).unwrap(); From 86261296a5f0e79f959c6e1ee84bfa0808d80690 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Fri, 27 Oct 2023 15:14:51 +0200 Subject: [PATCH 3/3] update rust_snuba for api changes --- rust_snuba/src/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index ebd80f8691b..73c2e1f37df 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -236,7 +236,7 @@ pub fn consumer_impl( let clickhouse_cluster_config = first_storage.clickhouse_cluster.clone(); let clickhouse_table_name = first_storage.clickhouse_table_name.clone(); - let consumer = Box::new(KafkaConsumer::new(config)); + let consumer = KafkaConsumer::new(config).unwrap(); let mut processor = StreamProcessor::new( consumer, Box::new(ConsumerStrategyFactory {