-
Notifications
You must be signed in to change notification settings - Fork 0
feat(capture): send historical_migration batches to separate topic #30
Conversation
5bea038
to
187ea27
Compare
@@ -39,6 +39,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config { | |||
kafka_compression_codec: "none".to_string(), | |||
kafka_hosts: "kafka:9092".to_string(), | |||
kafka_topic: "events_plugin_ingestion".to_string(), | |||
kafka_historical_topic: "events_plugin_ingestion_historical".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need a better story around specifying 10 topics over several clusters, but that's for future us
let partition_key = if limited { None } else { Some(key.as_str()) }; | ||
let event_key = event.key(); | ||
let (topic, partition_key): (&str, Option<&str>) = match &event.data_type { | ||
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we could get rid of locality in historical too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocked by person processing idempotence, but to be kept in mind. Part of the solution for person processing might be in-memory caches that would benefit from locality.
If we get more instances of historical being overwhelmed, we could implement a very-high threshold overflow detection here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm particularly concerned since historical doesn't overflow, but we also do have a higher SLA.
I do understand that the solution is not here but in person processing. Hoping there won't be much trouble until we get there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of questions but no blockers, overall I like the code!
Implement the support for historical batch exports: on the
/batch
shape, thehistorical_migration
flag will set capture to write events to the separateevents_plugin_ingestion_historical
topic. To achieve this:ProcessedEvent
gets a newDataType
enum field. This field is markedskip_serializing
to not be written to Kafka messagesdata_type
in the Kafka headers for partition stats? Follow-up PRDataType
based on the request. Having it here will be good when we want to:$exception
KafkaSink
choses its topic per event based on the value ofDataType
.Alternatives not chosen
DataType
enum instead of a topic name will allow us to transparently implement multi-MSK routing for replay data: we give a data type to the sink and it decides where to write it based on its configsink::send_batch
will allow us to accept mixed-product batches, which are happening anywayAnalyticsOverflow
value in the enum, but refactoring the overflow decision back into the http handler was cumbersome, so I am leaving it in the sink for now, as we'll nuke it soonTesting
django_compat
test case, with a payload from temporal batch exports