Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

feat(capture): send historical_migration batches to separate topic #30

Merged
merged 9 commits into from
Apr 30, 2024

Conversation

xvello
Copy link
Contributor

@xvello xvello commented Apr 29, 2024

Implement the support for historical batch exports: on the /batch shape, the historical_migration flag will set capture to write events to the separate events_plugin_ingestion_historical topic. To achieve this:

  • The ProcessedEvent gets a new DataType enum field. This field is marked skip_serializing to not be written to Kafka messages
    • We'll probably want to print data_type in the Kafka headers for partition stats? Follow-up PR
  • The http handler switches the DataType based on the request. Having it here will be good when we want to:
    • implement "route to historical on old events" (the KafkaSink should not have this logic)
    • split part of the analytics traffic to a separate topic after the fact, e.g $exception
  • KafkaSink choses its topic per event based on the value of DataType.

Alternatives not chosen

  • Passing a DataType enum instead of a topic name will allow us to transparently implement multi-MSK routing for replay data: we give a data type to the sink and it decides where to write it based on its config
  • Having a data type at the event level instead of as an argument to sink::send_batch will allow us to accept mixed-product batches, which are happening anyway
  • I initially added a AnalyticsOverflow value in the enum, but refactoring the overflow decision back into the http handler was cumbersome, so I am leaving it in the sink for now, as we'll nuke it soon

Testing

  • Added end2end scenario. Other end2end scenarios are updated to confirm that data is written to the main topic by default
  • Added django_compat test case, with a payload from temporal batch exports

@xvello xvello force-pushed the xvello/historical branch from 5bea038 to 187ea27 Compare April 29, 2024 14:28
@xvello xvello marked this pull request as ready for review April 29, 2024 16:36
@xvello xvello requested a review from a team April 29, 2024 16:36
@@ -39,6 +39,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka_compression_codec: "none".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_historical_topic: "events_plugin_ingestion_historical".to_string(),
Copy link
Contributor Author

@xvello xvello Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need a better story around specifying 10 topics over several clusters, but that's for future us

let partition_key = if limited { None } else { Some(key.as_str()) };
let event_key = event.key();
let (topic, partition_key): (&str, Option<&str>) = match &event.data_type {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we could get rid of locality in historical too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocked by person processing idempotence, but to be kept in mind. Part of the solution for person processing might be in-memory caches that would benefit from locality.
If we get more instances of historical being overwhelmed, we could implement a very-high threshold overflow detection here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm particularly concerned since historical doesn't overflow, but we also do have a higher SLA.

I do understand that the solution is not here but in person processing. Hoping there won't be much trouble until we get there.

Copy link
Contributor

@tomasfarias tomasfarias left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of questions but no blockers, overall I like the code!

@xvello xvello merged commit bca9f84 into main Apr 30, 2024
4 checks passed
@xvello xvello deleted the xvello/historical branch April 30, 2024 10:24
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants