diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 262ac0b377..9924e09d40 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -246,20 +246,11 @@ pub fn consumer_impl( let processor = if mutations_mode { let mut_factory = MutConsumerStrategyFactory { storage_config: first_storage, - env_config, - logical_topic_name, max_batch_size, max_batch_time, processing_concurrency: ConcurrencyConfig::new(concurrency), clickhouse_concurrency: ConcurrencyConfig::new(clickhouse_concurrency), - async_inserts, - python_max_queue_depth, - use_rust_processor, health_check_file: health_check_file.map(ToOwned::to_owned), - enforce_schema, - physical_consumer_group: consumer_group.to_owned(), - physical_topic_name: Topic::new(&consumer_config.raw_topic.physical_topic_name), - accountant_topic_config: consumer_config.accountant_topic, batch_write_timeout, }; diff --git a/rust_snuba/src/mutations/factory.rs b/rust_snuba/src/mutations/factory.rs index 12f3644018..04d7b8ae87 100644 --- a/rust_snuba/src/mutations/factory.rs +++ b/rust_snuba/src/mutations/factory.rs @@ -13,7 +13,7 @@ use sentry_arroyo::processing::strategies::run_task_in_threads::{ }; use sentry_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory}; use sentry_arroyo::types::Message; -use sentry_arroyo::types::{Partition, Topic}; +use sentry_arroyo::types::Partition; use crate::config; use crate::metrics::global_tags::set_global_tag; @@ -25,20 +25,11 @@ use crate::mutations::synchronize::Synchronizer; pub struct MutConsumerStrategyFactory { pub storage_config: config::StorageConfig, - pub env_config: config::EnvConfig, - pub logical_topic_name: String, pub max_batch_size: usize, pub max_batch_time: Duration, pub processing_concurrency: ConcurrencyConfig, pub clickhouse_concurrency: ConcurrencyConfig, - pub async_inserts: bool, - pub python_max_queue_depth: Option, - pub use_rust_processor: bool, pub health_check_file: Option, - pub enforce_schema: bool, - pub physical_consumer_group: String, - pub physical_topic_name: Topic, - pub accountant_topic_config: config::TopicConfig, pub batch_write_timeout: Option, } diff --git a/rust_snuba/src/processors/eap_spans.rs b/rust_snuba/src/processors/eap_spans.rs index 3c8f4c0665..5017430a28 100644 --- a/rust_snuba/src/processors/eap_spans.rs +++ b/rust_snuba/src/processors/eap_spans.rs @@ -40,7 +40,7 @@ pub fn process_message( let payload_bytes = payload.payload().context("Expected payload")?; let msg: FromSpanMessage = serde_json::from_slice(payload_bytes)?; let origin_timestamp = DateTime::from_timestamp(msg.received as i64, 0); - let mut span: EAPSpan = msg.try_into()?; + let mut span: EAPSpan = msg.into(); span.retention_days = Some(enforce_retention(span.retention_days, &config.env_config));