diff --git a/.github/workflows/semantic.yml b/.github/workflows/semantic.yml index 6c2342bca29bee..02e54f4840118a 100644 --- a/.github/workflows/semantic.yml +++ b/.github/workflows/semantic.yml @@ -180,6 +180,7 @@ jobs: reduce transform remap transform route transform + exclusive_route transform sample transform tag_cardinality_limit transform throttle transform diff --git a/Cargo.toml b/Cargo.toml index b7dcdc710a7161..5ab5981924ebd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -642,6 +642,7 @@ transforms-logs = [ "transforms-reduce", "transforms-remap", "transforms-route", + "transforms-exclusive-route", "transforms-sample", "transforms-throttle", ] @@ -668,6 +669,7 @@ transforms-pipelines = ["transforms-filter", "transforms-route"] transforms-reduce = ["transforms-impl-reduce"] transforms-remap = [] transforms-route = [] +transforms-exclusive-route = [] transforms-sample = ["transforms-impl-sample"] transforms-tag_cardinality_limit = ["dep:bloomy", "dep:hashbrown"] transforms-throttle = ["dep:governor"] diff --git a/changelog.d/21707_exclusive_route_transform.feature.md b/changelog.d/21707_exclusive_route_transform.feature.md new file mode 100644 index 00000000000000..6b4f8625c05c4b --- /dev/null +++ b/changelog.d/21707_exclusive_route_transform.feature.md @@ -0,0 +1,3 @@ +Introduce a new exclusive_route transform, which functions as a switch statement to route events based on user-defined conditions. + +authors: pront diff --git a/src/transforms/exclusive_route/config.rs b/src/transforms/exclusive_route/config.rs new file mode 100644 index 00000000000000..c5d1a0223b651c --- /dev/null +++ b/src/transforms/exclusive_route/config.rs @@ -0,0 +1,195 @@ +use crate::conditions::{AnyCondition, ConditionConfig, VrlConfig}; +use crate::config::{ + DataType, GenerateConfig, Input, LogNamespace, OutputId, TransformConfig, TransformContext, + TransformOutput, +}; +use crate::schema; +use crate::sinks::prelude::configurable_component; +use crate::transforms::exclusive_route::transform::ExclusiveRoute; +use crate::transforms::Transform; +use std::hash::{Hash, Hasher}; +use vector_lib::config::clone_input_definitions; + +pub(super) const UNMATCHED_ROUTE: &str = "_unmatched"; + +/// Individual route configuration. +#[configurable_component] +#[derive(Clone, Debug)] +pub struct Route { + /// The name of the route is also the name of the transform port. + /// + /// The `_unmatched` name is reserved and thus cannot be used as route ID. + /// + /// Each route can then be referenced as an input by other components with the name + /// `.`. If an event doesn’t match any route, + /// it is sent to the `._unmatched` output. + pub name: String, + + /// Each condition represents a filter which is applied to each event. + pub condition: AnyCondition, +} + +impl Hash for Route { + fn hash(&self, state: &mut H) { + self.name.hash(state); + } +} + +impl PartialEq for Route { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + } +} + +impl Eq for Route {} + +/// Configuration for the `route` transform. +#[configurable_component(transform( + "exclusive_route", + "Split a stream of events into unique sub-streams based on user-supplied conditions." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct ExclusiveRouteConfig { + /// An array of named routes. The route names are expected to be unique. + #[configurable(metadata(docs::examples = "routes_example()"))] + pub routes: Vec, +} + +fn routes_example() -> Vec { + vec![ + Route { + name: "foo-and-bar-exist".to_owned(), + condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig { + source: "exists(.foo) && exists(.bar)".to_owned(), + ..Default::default() + })), + }, + Route { + name: "only-foo-exists".to_owned(), + condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig { + source: "exists(.foo)".to_owned(), + ..Default::default() + })), + }, + ] +} + +impl GenerateConfig for ExclusiveRouteConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + routes: routes_example(), + }) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "exclusive_route")] +impl TransformConfig for ExclusiveRouteConfig { + async fn build(&self, context: &TransformContext) -> crate::Result { + let route = ExclusiveRoute::new(self, context)?; + Ok(Transform::synchronous(route)) + } + + fn input(&self) -> Input { + Input::all() + } + + fn validate(&self, _: &schema::Definition) -> Result<(), Vec> { + let mut errors = Vec::new(); + + let mut counts = std::collections::HashMap::new(); + for route in &self.routes { + *counts.entry(route.name.clone()).or_insert(0) += 1; + } + + let duplicates: Vec = counts + .iter() + .filter(|&(_, &count)| count > 1) + .map(|(name, _)| name.clone()) + .collect(); + + if !duplicates.is_empty() { + errors.push(format!( + "Found routes with duplicate names: {:?}", + duplicates + )); + } + + if self + .routes + .iter() + .any(|route| route.name == UNMATCHED_ROUTE) + { + errors.push(format!("Using reserved '{UNMATCHED_ROUTE}' name.")); + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } + + fn outputs( + &self, + _: vector_lib::enrichment::TableRegistry, + input_definitions: &[(OutputId, schema::Definition)], + _: LogNamespace, + ) -> Vec { + let mut outputs: Vec<_> = self + .routes + .iter() + .map(|route| { + TransformOutput::new( + DataType::all_bits(), + clone_input_definitions(input_definitions), + ) + .with_port(route.name.clone()) + }) + .collect(); + outputs.push( + TransformOutput::new( + DataType::all_bits(), + clone_input_definitions(input_definitions), + ) + .with_port(UNMATCHED_ROUTE), + ); + outputs + } + + fn enable_concurrency(&self) -> bool { + true + } +} + +#[cfg(test)] +mod tests { + use super::ExclusiveRouteConfig; + use indoc::indoc; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn can_serialize_remap() { + // We need to serialize the config to check if a config has + // changed when reloading. + let config = serde_yaml::from_str::(indoc! {r#" + routes: + - name: a + condition: + type = "vrl" + source = '.message == "hello world"' + "#}) + .unwrap(); + + assert_eq!( + serde_json::to_string(&config).unwrap(), + r#"{"routes":[{"name":"a","condition":"type = \"vrl\" source = '.message == \"hello world\"'"}]}"# + ); + } +} diff --git a/src/transforms/exclusive_route/mod.rs b/src/transforms/exclusive_route/mod.rs new file mode 100644 index 00000000000000..bf6934908b0adb --- /dev/null +++ b/src/transforms/exclusive_route/mod.rs @@ -0,0 +1,4 @@ +pub mod config; +#[cfg(test)] +mod tests; +pub mod transform; diff --git a/src/transforms/exclusive_route/tests.rs b/src/transforms/exclusive_route/tests.rs new file mode 100644 index 00000000000000..8a636325994995 --- /dev/null +++ b/src/transforms/exclusive_route/tests.rs @@ -0,0 +1,97 @@ +use crate::config::{DataType, TransformOutput}; +use crate::event::{Event, LogEvent}; +use std::collections::HashMap; + +use indoc::indoc; +use vector_lib::transform::TransformOutputsBuf; + +use crate::transforms::exclusive_route::config::{ExclusiveRouteConfig, UNMATCHED_ROUTE}; +use crate::transforms::exclusive_route::transform::ExclusiveRoute; +use crate::transforms::SyncTransform; +use crate::{ + config::{build_unit_tests, ConfigBuilder}, + test_util::components::{init_test, COMPONENT_MULTIPLE_OUTPUTS_TESTS}, +}; + +fn get_outputs_buf() -> (Vec<&'static str>, TransformOutputsBuf) { + let names = vec!["a", "b", UNMATCHED_ROUTE]; + let buf = TransformOutputsBuf::new_with_capacity( + names + .iter() + .map(|output_name| { + TransformOutput::new(DataType::all_bits(), HashMap::new()) + .with_port(output_name.to_owned()) + }) + .collect(), + 1, + ); + (names, buf) +} + +#[test] +fn exclusive_routes() { + let config = serde_yaml::from_str::(indoc! {r#" + routes: + - name: a + condition: + type: vrl + source: '.service == "a"' + - name: b + condition: + type: vrl + source: '.service == "b"' + "#}) + .unwrap(); + + let mut transform = ExclusiveRoute::new(&config, &Default::default()).unwrap(); + + let (output_names, mut outputs) = get_outputs_buf(); + for service in ["a", "b", "c"] { + let event = Event::Log(LogEvent::from(btreemap! { + "service" => service + })); + transform.transform(event.clone(), &mut outputs); + for name in output_names.clone() { + let mut events: Vec<_> = outputs.drain_named(name).collect(); + if name == service || (name == UNMATCHED_ROUTE && service == "c") { + assert_eq!(events.len(), 1); + assert_eq!(events.pop().unwrap(), event); + } else { + assert!(events.is_empty()); + } + } + } +} + +#[tokio::test] +async fn route_metrics_with_output_tag() { + init_test(); + + let config: ConfigBuilder = serde_yaml::from_str(indoc! {r#" + transforms: + foo: + inputs: [] + type: "exclusive_route" + routes: + - name: first + condition: + type: "is_log" + + tests: + - name: "metric output" + input: + insert_at: "foo" + value: "none" + outputs: + - extract_from: "foo.first" + conditions: + - type: "vrl" + source: "true" + "#}) + .unwrap(); + + let mut tests = build_unit_tests(config).await.unwrap(); + assert!(tests.remove(0).run().await.errors.is_empty()); + // Check that metrics were emitted with output tag + COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]); +} diff --git a/src/transforms/exclusive_route/transform.rs b/src/transforms/exclusive_route/transform.rs new file mode 100644 index 00000000000000..56a85a248bea49 --- /dev/null +++ b/src/transforms/exclusive_route/transform.rs @@ -0,0 +1,51 @@ +use vector_lib::transform::SyncTransform; + +use crate::conditions::Condition; +use crate::transforms::exclusive_route::config::{ExclusiveRouteConfig, UNMATCHED_ROUTE}; +use crate::transforms::TransformOutputsBuf; +use crate::{config::TransformContext, event::Event}; + +#[derive(Clone)] +pub struct ResolvedRoute { + name: String, + condition: Condition, +} + +#[derive(Clone)] +pub struct ExclusiveRoute { + routes: Vec, +} + +impl ExclusiveRoute { + pub fn new(config: &ExclusiveRouteConfig, context: &TransformContext) -> crate::Result { + let resolved_routes = config + .routes + .iter() + .map(|route| { + let condition = route.condition.build(&context.enrichment_tables)?; + Ok(ResolvedRoute { + name: route.name.clone(), + condition, + }) + }) + .collect::>>()?; + + Ok(Self { + routes: resolved_routes, + }) + } +} + +impl SyncTransform for ExclusiveRoute { + fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) { + for route in &self.routes { + let (result, event) = route.condition.check(event.clone()); + if result { + output.push(Some(&route.name), event); + return; + } + } + + output.push(Some(UNMATCHED_ROUTE), event); + } +} diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index 601c8edf4ab6bf..ea3e9aa163afed 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -11,6 +11,8 @@ pub mod sample; pub mod aggregate; #[cfg(feature = "transforms-aws_ec2_metadata")] pub mod aws_ec2_metadata; +#[cfg(feature = "transforms-exclusive-route")] +mod exclusive_route; #[cfg(feature = "transforms-filter")] pub mod filter; #[cfg(feature = "transforms-log_to_metric")] diff --git a/website/content/en/docs/reference/configuration/transforms/exclusive_route.md b/website/content/en/docs/reference/configuration/transforms/exclusive_route.md new file mode 100644 index 00000000000000..4e7326d19ee31d --- /dev/null +++ b/website/content/en/docs/reference/configuration/transforms/exclusive_route.md @@ -0,0 +1,14 @@ +--- +title: Exclusive Route +description: Routes events from one or more streams to unique sub-streams based on a set of user-defined conditions. +component_kind: transform +layout: component +tags: [ "exclusive", "route", "swimlanes", "split", "component", "transform" ] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... + */}} diff --git a/website/content/en/highlights/2024-11-07-exclusive_route.md b/website/content/en/highlights/2024-11-07-exclusive_route.md new file mode 100644 index 00000000000000..11f37eec68a2c1 --- /dev/null +++ b/website/content/en/highlights/2024-11-07-exclusive_route.md @@ -0,0 +1,44 @@ +--- +date: "2024-11-07" +title: "Exclusive Route Transform" +description: "Introducing the exclusive route transform" +authors: [ "pront" ] +pr_numbers: [ 21707 ] +release: "0.43.0" +hide_on_release_notes: false +badges: + type: "new feature" + domains: [ "transforms" ] +--- + +### Functionality + +The Exclusive Route transform splits an event stream into unique sub-streams based on user-defined conditions. Each event will only be +routed to a single stream. This transforms complements the existing [Route transform][docs.transforms.route]. + +A visual representation: + +Vector + +### Config Example + +Let's see an example that demonstrates the above: + +```yaml +# Sources section omitted + +transforms: + transform0: + inputs: + - source0 + type: exclusive_route + routes: + - name: "foo" + condition: '.origin == "foo"' + - name: "bar" + condition: '.origin == "bar"' + +# Sinks section omitted +``` + +[docs.transforms.route]: https://vector.dev/docs/reference/configuration/transforms/route/ diff --git a/website/cue/reference/components.cue b/website/cue/reference/components.cue index df63346ad427c6..6e1317ea2585da 100644 --- a/website/cue/reference/components.cue +++ b/website/cue/reference/components.cue @@ -191,17 +191,18 @@ components: { } if Args.kind == "transform" { - aggregate?: #FeaturesAggregate - convert?: #FeaturesConvert - enrich?: #FeaturesEnrich - filter?: #FeaturesFilter - parse?: #FeaturesParse - program?: #FeaturesProgram - proxy?: #FeaturesProxy - reduce?: #FeaturesReduce - route?: #FeaturesRoute - sanitize?: #FeaturesSanitize - shape?: #FeaturesShape + aggregate?: #FeaturesAggregate + convert?: #FeaturesConvert + enrich?: #FeaturesEnrich + filter?: #FeaturesFilter + parse?: #FeaturesParse + program?: #FeaturesProgram + proxy?: #FeaturesProxy + reduce?: #FeaturesReduce + route?: #FeaturesRoute + exclusive_route?: #FeaturesExclusiveRoute + sanitize?: #FeaturesSanitize + shape?: #FeaturesShape } if Args.kind == "sink" { @@ -327,6 +328,8 @@ components: { #FeaturesRoute: {} + #FeaturesExclusiveRoute: {} + #FeaturesSanitize: {} #FeaturesShape: {} diff --git a/website/cue/reference/components/transforms/base/exclusive_route.cue b/website/cue/reference/components/transforms/base/exclusive_route.cue new file mode 100644 index 00000000000000..9260d83373d619 --- /dev/null +++ b/website/cue/reference/components/transforms/base/exclusive_route.cue @@ -0,0 +1,41 @@ +package metadata + +base: components: transforms: exclusive_route: configuration: routes: { + description: "An array of named routes. The route names are expected to be unique." + required: true + type: array: items: type: object: { + examples: [{ + condition: { + source: "exists(.foo) && exists(.bar)" + type: "vrl" + } + name: "foo-and-bar-exist" + }, { + condition: { + source: "exists(.foo)" + type: "vrl" + } + name: "only-foo-exists" + }] + options: { + condition: { + description: "Each condition represents a filter which is applied to each event." + required: true + type: condition: {} + } + name: { + description: """ + The name of the route is also the name of the transform port. + + The `_unmatched` name is reserved and thus cannot be used as route ID. + + Each route can then be referenced as an input by other components with the name + `.`. If an event doesn’t match any route, + it is sent to the `._unmatched` output. + """ + required: true + type: string: {} + } + } + } +} diff --git a/website/cue/reference/components/transforms/exclusive_route.cue b/website/cue/reference/components/transforms/exclusive_route.cue new file mode 100644 index 00000000000000..7b982f6aad6a17 --- /dev/null +++ b/website/cue/reference/components/transforms/exclusive_route.cue @@ -0,0 +1,101 @@ +package metadata + +components: transforms: exclusive_route: { + title: "Exclusive Route" + + description: """ + Routes events from one or more streams to unique sub-streams based on a set of user-defined conditions. + + Also, see the [Route](\(urls.vector_route_transform) transform for routing an event to multiple streams. + """ + + classes: { + commonly_used: false + development: "beta" + egress_method: "stream" + stateful: false + } + + features: { + exclusive_route: {} + } + + support: { + requirements: [] + warnings: [] + notices: [] + } + + configuration: base.components.transforms.exclusive_route.configuration + + input: { + logs: true + metrics: { + counter: true + distribution: true + gauge: true + histogram: true + set: true + summary: true + } + traces: true + } + + outputs: [ + { + name: "" + description: "Each route can be referenced as an input by other components with the name `.`." + }, + ] + + how_it_works: { + routing_to_multiple_components: { + title: "Routing to multiple components" + body: """ + An event can only be routed to a single output. + The following is an example of how you can create two exclusive routes (plus the implicitly created `_unmatched` route). + + ```yaml + transforms: + transform0: + inputs: + - source0 + type: exclusive_route + routes: + - name: "a" + condition: + type: vrl + source: .level == 1 + - name: "b" + condition: + type: vrl + # Note that the first condition is redundant. The previous route will always have precedence. + source: .level == 1 || .level == 2 + + tests: + - name: case-1 + inputs: + - type: log + insert_at: transform0 + log_fields: + level: 1 + - type: log + insert_at: transform0 + log_fields: + level: 2 + outputs: + - extract_from: transform0.a + conditions: + - type: vrl + source: | + assert!(.level == 1) + - extract_from: transform0.b + conditions: + - type: vrl + source: | + assert!(.level == 2) + ``` + """ + } + } +} diff --git a/website/cue/reference/components/transforms/route.cue b/website/cue/reference/components/transforms/route.cue index e56335b99f2d6b..fa45ee50d100d2 100644 --- a/website/cue/reference/components/transforms/route.cue +++ b/website/cue/reference/components/transforms/route.cue @@ -6,6 +6,9 @@ components: transforms: route: { description: """ Splits a stream of events into multiple sub-streams based on a set of conditions. + + Also, see the [Exclusive Route](\(urls.vector_exclusive_route_transform) transform for routing an event to + a single stream. """ classes: { diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index cc32b4ce52c119..8732f4e9a2b11f 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -602,6 +602,7 @@ urls: { vector_repo: "\(github)/vectordotdev/vector" vector_roles: "/docs/setup/deployment/roles" vector_route_transform: "/docs/reference/configuration/transforms/route" + vector_exclusive_route_transform: "/docs/reference/configuration/transforms/exclusive_route" vector_rpm_source_files: "\(vector_repo)/tree/master/distribution/rpm" vector_security_policy: "\(vector_repo)/security/policy" vector_semantic_yml: "\(vector_repo)/blob/master/.github/semantic.yml"