Skip to content

Commit

Permalink
feat(exclusive_route transform): implements new transform (#21707)
Browse files Browse the repository at this point in the history
* feat(exclusive_route transform): implements new transform

* chagnelog

* added release highlight

* validate now outputs duplicate names

* docs

* fix md format

* cue fixes

* add md file

* address feedback
  • Loading branch information
pront authored Nov 14, 2024
1 parent 3463c9f commit b969c01
Show file tree
Hide file tree
Showing 15 changed files with 573 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ jobs:
reduce transform
remap transform
route transform
exclusive_route transform
sample transform
tag_cardinality_limit transform
throttle transform
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ transforms-logs = [
"transforms-reduce",
"transforms-remap",
"transforms-route",
"transforms-exclusive-route",
"transforms-sample",
"transforms-throttle",
]
Expand All @@ -668,6 +669,7 @@ transforms-pipelines = ["transforms-filter", "transforms-route"]
transforms-reduce = ["transforms-impl-reduce"]
transforms-remap = []
transforms-route = []
transforms-exclusive-route = []
transforms-sample = ["transforms-impl-sample"]
transforms-tag_cardinality_limit = ["dep:bloomy", "dep:hashbrown"]
transforms-throttle = ["dep:governor"]
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/21707_exclusive_route_transform.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Introduce a new exclusive_route transform, which functions as a switch statement to route events based on user-defined conditions.

authors: pront
195 changes: 195 additions & 0 deletions src/transforms/exclusive_route/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use crate::conditions::{AnyCondition, ConditionConfig, VrlConfig};
use crate::config::{
DataType, GenerateConfig, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
TransformOutput,
};
use crate::schema;
use crate::sinks::prelude::configurable_component;
use crate::transforms::exclusive_route::transform::ExclusiveRoute;
use crate::transforms::Transform;
use std::hash::{Hash, Hasher};
use vector_lib::config::clone_input_definitions;

pub(super) const UNMATCHED_ROUTE: &str = "_unmatched";

/// Individual route configuration.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct Route {
/// The name of the route is also the name of the transform port.
///
/// The `_unmatched` name is reserved and thus cannot be used as route ID.
///
/// Each route can then be referenced as an input by other components with the name
/// `<transform_name>.<name>`. If an event doesn’t match any route,
/// it is sent to the `<transform_name>._unmatched` output.
pub name: String,

/// Each condition represents a filter which is applied to each event.
pub condition: AnyCondition,
}

impl Hash for Route {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
}
}

impl PartialEq for Route {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
}
}

impl Eq for Route {}

/// Configuration for the `route` transform.
#[configurable_component(transform(
"exclusive_route",
"Split a stream of events into unique sub-streams based on user-supplied conditions."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct ExclusiveRouteConfig {
/// An array of named routes. The route names are expected to be unique.
#[configurable(metadata(docs::examples = "routes_example()"))]
pub routes: Vec<Route>,
}

fn routes_example() -> Vec<Route> {
vec![
Route {
name: "foo-and-bar-exist".to_owned(),
condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
source: "exists(.foo) && exists(.bar)".to_owned(),
..Default::default()
})),
},
Route {
name: "only-foo-exists".to_owned(),
condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
source: "exists(.foo)".to_owned(),
..Default::default()
})),
},
]
}

impl GenerateConfig for ExclusiveRouteConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
routes: routes_example(),
})
.unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "exclusive_route")]
impl TransformConfig for ExclusiveRouteConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
let route = ExclusiveRoute::new(self, context)?;
Ok(Transform::synchronous(route))
}

fn input(&self) -> Input {
Input::all()
}

fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
let mut errors = Vec::new();

let mut counts = std::collections::HashMap::new();
for route in &self.routes {
*counts.entry(route.name.clone()).or_insert(0) += 1;
}

let duplicates: Vec<String> = counts
.iter()
.filter(|&(_, &count)| count > 1)
.map(|(name, _)| name.clone())
.collect();

if !duplicates.is_empty() {
errors.push(format!(
"Found routes with duplicate names: {:?}",
duplicates
));
}

if self
.routes
.iter()
.any(|route| route.name == UNMATCHED_ROUTE)
{
errors.push(format!("Using reserved '{UNMATCHED_ROUTE}' name."));
}

if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}

fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
let mut outputs: Vec<_> = self
.routes
.iter()
.map(|route| {
TransformOutput::new(
DataType::all_bits(),
clone_input_definitions(input_definitions),
)
.with_port(route.name.clone())
})
.collect();
outputs.push(
TransformOutput::new(
DataType::all_bits(),
clone_input_definitions(input_definitions),
)
.with_port(UNMATCHED_ROUTE),
);
outputs
}

fn enable_concurrency(&self) -> bool {
true
}
}

#[cfg(test)]
mod tests {
use super::ExclusiveRouteConfig;
use indoc::indoc;

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<ExclusiveRouteConfig>();
}

#[test]
fn can_serialize_remap() {
// We need to serialize the config to check if a config has
// changed when reloading.
let config = serde_yaml::from_str::<ExclusiveRouteConfig>(indoc! {r#"
routes:
- name: a
condition:
type = "vrl"
source = '.message == "hello world"'
"#})
.unwrap();

assert_eq!(
serde_json::to_string(&config).unwrap(),
r#"{"routes":[{"name":"a","condition":"type = \"vrl\" source = '.message == \"hello world\"'"}]}"#
);
}
}
4 changes: 4 additions & 0 deletions src/transforms/exclusive_route/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod config;
#[cfg(test)]
mod tests;
pub mod transform;
97 changes: 97 additions & 0 deletions src/transforms/exclusive_route/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use crate::config::{DataType, TransformOutput};
use crate::event::{Event, LogEvent};
use std::collections::HashMap;

use indoc::indoc;
use vector_lib::transform::TransformOutputsBuf;

use crate::transforms::exclusive_route::config::{ExclusiveRouteConfig, UNMATCHED_ROUTE};
use crate::transforms::exclusive_route::transform::ExclusiveRoute;
use crate::transforms::SyncTransform;
use crate::{
config::{build_unit_tests, ConfigBuilder},
test_util::components::{init_test, COMPONENT_MULTIPLE_OUTPUTS_TESTS},
};

fn get_outputs_buf() -> (Vec<&'static str>, TransformOutputsBuf) {
let names = vec!["a", "b", UNMATCHED_ROUTE];
let buf = TransformOutputsBuf::new_with_capacity(
names
.iter()
.map(|output_name| {
TransformOutput::new(DataType::all_bits(), HashMap::new())
.with_port(output_name.to_owned())
})
.collect(),
1,
);
(names, buf)
}

#[test]
fn exclusive_routes() {
let config = serde_yaml::from_str::<ExclusiveRouteConfig>(indoc! {r#"
routes:
- name: a
condition:
type: vrl
source: '.service == "a"'
- name: b
condition:
type: vrl
source: '.service == "b"'
"#})
.unwrap();

let mut transform = ExclusiveRoute::new(&config, &Default::default()).unwrap();

let (output_names, mut outputs) = get_outputs_buf();
for service in ["a", "b", "c"] {
let event = Event::Log(LogEvent::from(btreemap! {
"service" => service
}));
transform.transform(event.clone(), &mut outputs);
for name in output_names.clone() {
let mut events: Vec<_> = outputs.drain_named(name).collect();
if name == service || (name == UNMATCHED_ROUTE && service == "c") {
assert_eq!(events.len(), 1);
assert_eq!(events.pop().unwrap(), event);
} else {
assert!(events.is_empty());
}
}
}
}

#[tokio::test]
async fn route_metrics_with_output_tag() {
init_test();

let config: ConfigBuilder = serde_yaml::from_str(indoc! {r#"
transforms:
foo:
inputs: []
type: "exclusive_route"
routes:
- name: first
condition:
type: "is_log"
tests:
- name: "metric output"
input:
insert_at: "foo"
value: "none"
outputs:
- extract_from: "foo.first"
conditions:
- type: "vrl"
source: "true"
"#})
.unwrap();

let mut tests = build_unit_tests(config).await.unwrap();
assert!(tests.remove(0).run().await.errors.is_empty());
// Check that metrics were emitted with output tag
COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
}
51 changes: 51 additions & 0 deletions src/transforms/exclusive_route/transform.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use vector_lib::transform::SyncTransform;

use crate::conditions::Condition;
use crate::transforms::exclusive_route::config::{ExclusiveRouteConfig, UNMATCHED_ROUTE};
use crate::transforms::TransformOutputsBuf;
use crate::{config::TransformContext, event::Event};

#[derive(Clone)]
pub struct ResolvedRoute {
name: String,
condition: Condition,
}

#[derive(Clone)]
pub struct ExclusiveRoute {
routes: Vec<ResolvedRoute>,
}

impl ExclusiveRoute {
pub fn new(config: &ExclusiveRouteConfig, context: &TransformContext) -> crate::Result<Self> {
let resolved_routes = config
.routes
.iter()
.map(|route| {
let condition = route.condition.build(&context.enrichment_tables)?;
Ok(ResolvedRoute {
name: route.name.clone(),
condition,
})
})
.collect::<crate::Result<Vec<_>>>()?;

Ok(Self {
routes: resolved_routes,
})
}
}

impl SyncTransform for ExclusiveRoute {
fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
for route in &self.routes {
let (result, event) = route.condition.check(event.clone());
if result {
output.push(Some(&route.name), event);
return;
}
}

output.push(Some(UNMATCHED_ROUTE), event);
}
}
2 changes: 2 additions & 0 deletions src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub mod sample;
pub mod aggregate;
#[cfg(feature = "transforms-aws_ec2_metadata")]
pub mod aws_ec2_metadata;
#[cfg(feature = "transforms-exclusive-route")]
mod exclusive_route;
#[cfg(feature = "transforms-filter")]
pub mod filter;
#[cfg(feature = "transforms-log_to_metric")]
Expand Down
Loading

0 comments on commit b969c01

Please sign in to comment.