From 423da0ea28548731f64f11d2bb67b13f1c2633b3 Mon Sep 17 00:00:00 2001 From: Hisham Muhammad Date: Tue, 22 Oct 2024 10:46:44 -0300 Subject: [PATCH] feat(*): new data model, with nodes and ports --- Cargo.lock | 31 +- Cargo.toml | 2 + datakit.meta.json | 14 +- src/config.rs | 958 +++++++++++++++++++++++++++++++++++----- src/data.rs | 203 +++++---- src/debug.rs | 67 +-- src/dependency_graph.rs | 169 +++++-- src/filter.rs | 138 ++++-- src/nodes.rs | 129 +++++- src/nodes/call.rs | 36 +- src/nodes/jq.rs | 55 ++- src/nodes/response.rs | 30 +- src/nodes/template.rs | 22 +- 13 files changed, 1502 insertions(+), 352 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4350006..4e9675d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,7 @@ dependencies = [ name = "datakit" version = "0.1.1" dependencies = [ + "derivative", "handlebars", "jaq-core", "jaq-interpret", @@ -112,6 +113,17 @@ dependencies = [ "url", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.10.7" @@ -362,7 +374,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn", + "syn 2.0.74", ] [[package]] @@ -465,7 +477,7 @@ checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.74", ] [[package]] @@ -491,6 +503,17 @@ dependencies = [ "digest", ] +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.74" @@ -519,7 +542,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.74", ] [[package]] @@ -616,5 +639,5 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.74", ] diff --git a/Cargo.toml b/Cargo.toml index 4f70935..ab5cd26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,3 +22,5 @@ jaq-interpret = "1.2.1" jaq-parse = "1.0.2" jaq-core = "1.2.1" jaq-std = "1.2.1" +derivative = "2.2.0" + diff --git a/datakit.meta.json b/datakit.meta.json index 2f65edc..dda439b 100644 --- a/datakit.meta.json +++ b/datakit.meta.json @@ -11,9 +11,19 @@ "type": { "type": "string" }, "name": { "type": "string" }, "input": { "type": "string" }, - "inputs": { "type": "array", "items": { "type": "string" } }, + "inputs": { + "oneOf": [ + { "type": "array", "items": { "type": "string" } }, + { "type": "object", "additionalProperties": { "type": "string" } } + ] + }, "output": { "type": "string" }, - "outputs": { "type": "array", "items": { "type": "string" } } + "outputs": { + "oneOf": [ + { "type": "array", "items": { "type": "string" } }, + { "type": "object", "additionalProperties": { "type": "string" } } + ] + } } } } diff --git a/src/config.rs b/src/config.rs index db3b331..b5f2064 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,37 +1,120 @@ use crate::nodes; -use crate::nodes::{NodeConfig, NodeMap}; +use crate::nodes::{NodeConfig, NodeVec, PortConfig}; use crate::DependencyGraph; -use lazy_static::lazy_static; +use derivative::Derivative; use serde::de::{Error, MapAccess, Visitor}; use serde::{Deserialize, Deserializer}; -use serde_json::Value; +use serde_json::{Map, Value}; use serde_json_wasm::de; use std::collections::BTreeMap; -use std::collections::HashSet; -use std::fmt; - -lazy_static! { - static ref RESERVED_NODE_NAMES: HashSet<&'static str> = [ - "request_headers", - "request_body", - "service_request_headers", - "service_request_body", - "service_response_headers", - "service_response_body", - "response_headers", - "response_body", - ] - .iter() - .copied() - .collect(); +use std::fmt::{self, Formatter}; + +pub struct ImplicitNode { + name: String, + inputs: Vec, + outputs: Vec, +} + +impl ImplicitNode { + pub fn new(name: &str, inputs: &[&str], outputs: &[&str]) -> ImplicitNode { + ImplicitNode { + name: name.into(), + inputs: inputs.iter().map(|s| s.to_string()).collect(), + outputs: outputs.iter().map(|s| s.to_string()).collect(), + } + } +} + +#[derive(PartialEq, Debug)] +struct UserNodePort { + node: Option, + port: Option, } -pub struct UserNodeConfig { +impl std::fmt::Display for UserNodePort { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "{}.{}", + self.node.as_deref().unwrap_or(""), + self.port.as_deref().unwrap_or("") + ) + } +} + +#[derive(PartialEq, Debug)] +struct UserLink { + from: UserNodePort, + to: UserNodePort, +} + +#[derive(PartialEq, Debug)] +struct UserNodeDesc { node_type: String, name: String, +} + +#[derive(PartialEq, Debug)] +struct UserNodeConfig { + desc: UserNodeDesc, bt: BTreeMap, - inputs: Vec, - outputs: Vec, + links: Vec, + n_inputs: usize, + n_outputs: usize, + named_ins: Vec, + named_outs: Vec, +} + +impl UserLink { + pub fn new( + from_node: Option, + from_port: Option, + to_node: Option, + to_port: Option, + ) -> Self { + UserLink { + from: UserNodePort { + node: from_node, + port: from_port, + }, + to: UserNodePort { + node: to_node, + port: to_port, + }, + } + } + + pub fn new_reverse( + from_node: Option, + from_port: Option, + to_node: Option, + to_port: Option, + ) -> Self { + UserLink { + from: UserNodePort { + node: to_node, + port: to_port, + }, + to: UserNodePort { + node: from_node, + port: from_port, + }, + } + } +} + +fn parse_node_port(value: String) -> (Option, Option) { + let trim = value.trim().to_string(); + + if let Some(dot) = trim.find('.') { + let (node, port) = trim.split_at(dot); + ( + Some(node.trim().to_string()), + Some(port[1..].trim().to_string()), + ) + } else { + (Some(trim), None) + } } impl<'a> Deserialize<'a> for UserNodeConfig { @@ -55,8 +138,9 @@ impl<'a> Deserialize<'a> for UserNodeConfig { let mut bt = BTreeMap::new(); let mut typ: Option = None; let mut name: Option = None; - let mut inputs = Vec::new(); - let mut outputs = Vec::new(); + let mut links: Vec = Vec::new(); + let mut named_ins: Vec = Vec::new(); + let mut named_outs: Vec = Vec::new(); while let Some(key) = map.next_key::()? { match key.as_str() { "type" => { @@ -70,27 +154,27 @@ impl<'a> Deserialize<'a> for UserNodeConfig { } } "input" => { - if let Ok(serde_json::Value::String(value)) = map.next_value() { - inputs.push(value); + if let Ok(serde_json::Value::String(node_port)) = map.next_value() { + let (node, port) = parse_node_port(node_port); + links.push(UserLink::new(node, port, None, None)); } } "inputs" => { - if let Ok(values) = map.next_value() { - if let Ok(v) = serde_json::from_value::>(values) { - inputs = v; - } + if let Ok(v) = map.next_value::() { + decode_links(&mut links, v, &mut named_ins, UserLink::new) + .map_err(Error::custom::<&str>)?; } } "output" => { if let Ok(serde_json::Value::String(value)) = map.next_value() { - outputs.push(value); + let (node, port) = parse_node_port(value); + links.push(UserLink::new(None, None, node, port)); } } "outputs" => { - if let Ok(values) = map.next_value() { - if let Ok(v) = serde_json::from_value::>(values) { - outputs = v; - } + if let Ok(v) = map.next_value::() { + decode_links(&mut links, v, &mut named_outs, UserLink::new_reverse) + .map_err(Error::custom::<&str>)?; } } _ => { @@ -101,14 +185,30 @@ impl<'a> Deserialize<'a> for UserNodeConfig { } } + let name = name.unwrap_or_else(|| format!("{:p}", &bt)); + + let mut n_inputs = 0; + let mut n_outputs = 0; + for link in &mut links { + if link.to.node.is_none() { + link.to.node = Some(name.clone()); + n_inputs += 1; + } + if link.from.node.is_none() { + link.from.node = Some(name.clone()); + n_outputs += 1; + } + } + if let Some(node_type) = typ { - let name = name.unwrap_or_else(|| format!("{:p}", &bt)); Ok(UserNodeConfig { - node_type, - name, + desc: UserNodeDesc { node_type, name }, bt, - inputs, - outputs, + links, + n_inputs, + n_outputs, + named_ins, + named_outs, }) } else { Err(Error::missing_field("type")) @@ -120,97 +220,369 @@ impl<'a> Deserialize<'a> for UserNodeConfig { } } -#[derive(Deserialize, Default)] +fn decode_links( + links: &mut Vec, + value: Value, + named: &mut Vec, + ctor: impl Fn(Option, Option, Option, Option) -> UserLink, +) -> Result<(), &'static str> { + if value.is_object() { + if let Ok(map) = serde_json::from_value::>(value) { + for (my_port, v) in map { + named.push(my_port.clone()); + if let Ok(node_port) = serde_json::from_value::(v) { + let (node, port) = parse_node_port(node_port); + links.push(ctor(node, port, None, Some(my_port))); + } + } + } else { + return Err("invalid map"); + } + } else if value.is_array() { + if let Ok(vec) = serde_json::from_value::>(value) { + for v in vec { + if let Ok(node_port) = serde_json::from_value::(v) { + let (node, port) = parse_node_port(node_port); + links.push(ctor(node, port, None, None)); + } + } + } else { + return Err("invalid list"); + } + } else { + return Err("invalid object"); + } + Ok(()) +} + +#[derive(Deserialize, Default, PartialEq, Debug)] pub struct UserConfig { nodes: Vec, #[serde(default)] debug: bool, } +#[derive(Derivative)] +#[derivative(PartialEq, Debug)] struct NodeInfo { name: String, node_type: String, + #[derivative(PartialEq = "ignore")] + #[derivative(Debug = "ignore")] node_config: Box, } +#[derive(PartialEq, Debug)] pub struct Config { + n_nodes: usize, + n_implicits: usize, node_list: Vec, - node_names: Vec, graph: DependencyGraph, debug: bool, } -fn add_default_connections(unc: &UserNodeConfig, nc: &dyn NodeConfig, graph: &mut DependencyGraph) { - let name: &str = &unc.name; - if unc.inputs.is_empty() { +fn add_default_links( + name: &str, + n_inputs: usize, + n_outputs: usize, + links: &mut Vec, + nc: &dyn NodeConfig, +) { + if n_inputs == 0 { if let Some(default_inputs) = nc.default_inputs() { for input in &default_inputs { - graph.add(input, name) + links.push(UserLink { + from: UserNodePort { + node: Some(input.other_node.clone()), + port: Some(input.other_port.clone()), + }, + to: UserNodePort { + node: Some(name.into()), + port: Some(input.this_port.clone()), + }, + }); } } } - if unc.outputs.is_empty() { + if n_outputs == 0 { if let Some(default_outputs) = nc.default_outputs() { for output in &default_outputs { - graph.add(name, output) + links.push(UserLink { + from: UserNodePort { + node: Some(name.into()), + port: Some(output.this_port.clone()), + }, + to: UserNodePort { + node: Some(output.other_node.clone()), + port: Some(output.other_port.clone()), + }, + }); } } } } -impl Config { - pub fn new(config_bytes: Vec) -> Result { - match de::from_slice::(&config_bytes) { - Ok(user_config) => { - let mut node_list = Vec::new(); - let mut node_names = Vec::new(); - let mut graph: DependencyGraph = Default::default(); +fn push_ports(ports: &mut Vec>, pc: PortConfig, given: &Vec) -> bool { + let mut list = pc.defaults.unwrap_or_default(); + + if pc.user_defined_ports { + for port in given { + if !list.iter().any(|p| p == port) { + list.push(port.into()); + } + } + } - for unc in &user_config.nodes { - let name: &str = &unc.name; + ports.push(list); - if RESERVED_NODE_NAMES.contains(name) { - return Err(format!("cannot use reserved node name '{name}'")); - } + pc.user_defined_ports +} - node_names.push(name.to_string()); - for input in &unc.inputs { - graph.add(input, name); - } - for output in &unc.outputs { - graph.add(name, output); - } +fn has(xs: &[String], s: &str) -> bool { + xs.iter().any(|x| x == s) +} + +fn resolve_port_names( + link: &mut UserLink, + outs: &mut Vec, + user_outs: bool, + ins: &mut Vec, + user_ins: bool, + n_linked_inputs: usize, +) -> Result<(), String> { + let mut from_port = None; + let mut to_port = None; + match &link.from.port { + Some(port) => { + if !has(outs, port) { + if user_outs { + outs.push(port.into()); + } else { + return Err(format!("invalid output port name {port}")); } + } + } + None => { + // If out ports list has a first port declared (either explicitly + // or implicitly), use it + if let Some(&port) = outs.first().as_ref() { + from_port = Some(port.into()); + } else if user_outs { + let new_port = make_port_name(&link.to)?; - for unc in &user_config.nodes { - let inputs = graph.get_input_names(&unc.name); - match nodes::new_config(&unc.node_type, &unc.name, inputs, &unc.bt) { - Ok(nc) => { - add_default_connections(unc, &*nc, &mut graph); - - node_list.push(NodeInfo { - name: unc.name.to_string(), - node_type: unc.node_type.to_string(), - node_config: nc, - }); - } - Err(err) => { - return Err(err); - } - }; + // otherwise the if outs.first() would have returned it + assert!(!has(outs, &new_port)); + + from_port = Some(new_port.clone()); + outs.push(new_port); + } else { + return Err("node in link has no output ports".into()); + } + } + } + + match &link.to.port { + Some(port) => { + if !has(ins, port) { + if user_ins { + ins.push(port.into()); + } else { + return Err(format!("invalid input port name {port}")); + } + } + } + None => { + if user_ins { + let new_port = make_port_name(&link.from)?; + if !has(outs, &new_port) { + to_port = Some(new_port.clone()); + ins.push(new_port); + } else { + return Err(format!("duplicated input port {new_port}")); } + } else if let Some(&port) = ins.get(n_linked_inputs - 1).as_ref() { + to_port = Some(port.into()); + } else { + let n = ins.len(); + return Err(format!( + "too many inputs declared (node type supports {n} inputs)" + )); + } + } + } + + // assign in the end, so that the input and output resolution + // are not affected by the order of links when calling make_port_name + if from_port.is_some() { + link.from.port = from_port; + } + if to_port.is_some() { + link.to.port = to_port; + } + assert!(link.from.port.is_some()); + assert!(link.to.port.is_some()); + + Ok(()) +} + +fn make_port_name(np: &UserNodePort) -> Result { + Ok(match (&np.node, &np.port) { + (Some(n), Some(p)) => format!("{n}.{p}"), + (Some(n), None) => n.into(), + (None, _) => return Err("could not resolve a name".into()), + }) +} + +fn err_at_node(desc: &UserNodeDesc, e: &str) -> String { + let name = &desc.name; + let nt = &desc.node_type; + format!("in node `{name}` of type `{nt}`: {e}") +} + +fn get_link_str(o: &Option, _name: &str) -> Result { + o.as_ref() + .ok_or_else(|| "bad link definition in node {_name}".into()) + .cloned() +} + +fn convert_config( + mut user_config: UserConfig, + implicits: &[ImplicitNode], +) -> Result { + let p = implicits.len(); + let n = user_config.nodes.len() + p; + + let mut node_names: Vec = Vec::with_capacity(n); + let mut in_ports = Vec::with_capacity(n); + let mut out_ports = Vec::with_capacity(n); + let mut user_def_ins = vec![true; n]; + let mut user_def_outs = vec![true; n]; + let mut node_list = Vec::with_capacity(n); + + // This is performed in several loops to ensure that the resolution + // order for links does not depend on the order of the nodes given + // in the input file. - Ok(Config { - node_list, - node_names, - graph, - debug: user_config.debug, - }) + for (i, inode) in implicits.iter().enumerate() { + node_names.push(inode.name.clone()); + in_ports.push(inode.inputs.clone()); + out_ports.push(inode.outputs.clone()); + user_def_ins[i] = false; + user_def_outs[i] = false; + node_list.push(NodeInfo { + name: inode.name.clone(), + node_type: "implicit".into(), + node_config: Box::new(nodes::implicit::ImplicitConfig {}), + }); + } + + for (i, unc) in user_config.nodes.iter().enumerate() { + let name: &str = &unc.desc.name; + let nt: &str = &unc.desc.node_type; + + // at this point, node_names contains only the implicit entries + if has(&node_names, name) { + return Err(format!("cannot use reserved node name `{name}`")); + } + + if !nodes::is_valid_type(nt) { + return Err(format!("unknown node type `{nt}`")); + } + + let ins = nodes::default_input_ports(nt).unwrap(); + user_def_ins[i + p] = push_ports(&mut in_ports, ins, &unc.named_ins); + + let outs = nodes::default_output_ports(nt).unwrap(); + user_def_outs[i + p] = push_ports(&mut out_ports, outs, &unc.named_outs); + } + + for unc in &user_config.nodes { + let name = &unc.desc.name; + + if node_names.iter().any(|n| n == name) { + return Err(format!("multiple definitions of node `{name}`")); + } + + node_names.push(name.into()); + } + + let mut linked_inputs = vec![0; n]; + for unc in user_config.nodes.iter_mut() { + for link in &mut unc.links { + let s = node_position(&node_names, &link.from, &unc.desc)?; + let d = node_position(&node_names, &link.to, &unc.desc)?; + let outs = &mut out_ports[s]; + let u_outs = user_def_outs[s]; + let ins = &mut in_ports[d]; + let u_ins = user_def_ins[d]; + + linked_inputs[d] += 1; + + resolve_port_names(link, outs, u_outs, ins, u_ins, linked_inputs[d]) + .map_err(|e| err_at_node(&unc.desc, &e))?; + } + } + + for (u, unc) in user_config.nodes.iter_mut().enumerate() { + let i = u + p; + let ins = &mut in_ports[i]; + let outs = &mut out_ports[i]; + let name = &unc.desc.name; + let desc = &unc.desc; + match nodes::new_config(&desc.node_type, &desc.name, ins, outs, &unc.bt) { + Ok(nc) => { + add_default_links(name, unc.n_inputs, unc.n_outputs, &mut unc.links, &*nc); + + node_list.push(NodeInfo { + name: name.to_string(), + node_type: desc.node_type.to_string(), + node_config: nc, + }); } - Err(err) => Err(format!( - "failed parsing configuration: {}: {err}", - String::from_utf8(config_bytes).unwrap() - )), + Err(err) => return Err(err), + }; + } + + let mut graph = DependencyGraph::new(node_names, in_ports, out_ports); + + for unc in &user_config.nodes { + let name = &unc.desc.name; + for link in &unc.links { + graph.add( + &get_link_str(&link.from.node, name)?, + &get_link_str(&link.from.port, name)?, + &get_link_str(&link.to.node, name)?, + &get_link_str(&link.to.port, name)?, + )?; + } + } + + Ok(Config { + n_nodes: n, + n_implicits: p, + node_list, + graph, + debug: user_config.debug, + }) +} + +fn node_position( + node_names: &[String], + np: &UserNodePort, + desc: &UserNodeDesc, +) -> Result { + node_names + .iter() + .position(|name: &String| Some(name) == np.node.as_ref()) + .ok_or_else(|| err_at_node(desc, &format!("unknown node in link: {}", np))) +} + +impl Config { + pub fn new(config_bytes: Vec, implicits: &[ImplicitNode]) -> Result { + match de::from_slice::(&config_bytes) { + Ok(user_config) => convert_config(user_config, implicits) + .map_err(|err| format!("failed checking configuration: {err}")), + Err(err) => Err(format!("failed parsing configuration: {err}",)), } } @@ -218,8 +590,12 @@ impl Config { self.debug } - pub fn get_node_names(&self) -> &Vec { - &self.node_names + pub fn node_count(&self) -> usize { + self.n_nodes + } + + pub fn number_of_implicits(&self) -> usize { + self.n_implicits } pub fn node_types(&self) -> impl Iterator { @@ -232,19 +608,13 @@ impl Config { &self.graph } - pub fn build_nodes(&self) -> NodeMap { - let mut nodes = NodeMap::new(); + pub fn build_nodes(&self) -> NodeVec { + let mut nodes = NodeVec::with_capacity(self.node_list.len()); for info in &self.node_list { - let name = &info.name; - match nodes::new_node(&info.node_type, &*info.node_config) { - Ok(node) => { - nodes.insert(name.to_string(), node); - } - Err(err) => { - log::error!("{err}"); - } + Ok(node) => nodes.push(node), + Err(err) => log::error!("{err}"), } } @@ -259,3 +629,379 @@ pub fn get_config_value serde::Deserialize<'de>>( bt.get(key) .and_then(|v| serde_json::from_value(v.clone()).ok()) } + +#[cfg(test)] +mod test { + use super::*; + use serde_json::json; + use std::any::Any; + + fn deserialize_user_config(cfg: &str) -> UserConfig { + de::from_slice::(cfg.as_bytes()).unwrap() + } + + #[test] + fn deserialize_empty_nodes() { + let uc = deserialize_user_config( + r#"{ + "nodes": [] + }"#, + ); + assert_eq!( + uc, + UserConfig { + nodes: vec![], + debug: false, + } + ); + } + + #[test] + fn deserialize_complete_example() { + let uc = deserialize_user_config( + r#"{ + "nodes": [ + { + "name": "jq1", + "type": "jq", + "input": "request.headers", + "jq": "{ \"x-bar\": $request_headers[\"x-foo\"] }" + }, + { + "name": "mycall", + "type": "call", + "input": "jq1", + "url": "http://example.com" + }, + { + "name": "jq2", + "type": "jq", + "inputs": { + "$mycall": "mycall", + "$request": "request.body" + }, + "jq": "{ \"bee\": $mycall.bee, \"boo\": $request.boo }" + } + ] + }"#, + ); + assert_eq!( + uc, + UserConfig { + nodes: vec![ + UserNodeConfig { + desc: UserNodeDesc { + node_type: "jq".into(), + name: "jq1".into(), + }, + bt: BTreeMap::from([( + "jq".into(), + json!("{ \"x-bar\": $request_headers[\"x-foo\"] }") + )]), + links: vec![UserLink { + from: UserNodePort { + node: Some("request".into()), + port: Some("headers".into()) + }, + to: UserNodePort { + node: Some("jq1".into()), + port: None + } + }], + n_inputs: 1, + n_outputs: 0, + named_ins: vec![], + named_outs: vec![] + }, + UserNodeConfig { + desc: UserNodeDesc { + node_type: "call".into(), + name: "mycall".into() + }, + bt: BTreeMap::from([("url".to_string(), json!("http://example.com"))]), + links: vec![UserLink { + from: UserNodePort { + node: Some("jq1".into()), + port: None + }, + to: UserNodePort { + node: Some("mycall".into()), + port: None + } + }], + n_inputs: 1, + n_outputs: 0, + named_ins: vec![], + named_outs: vec![] + }, + UserNodeConfig { + desc: UserNodeDesc { + node_type: "jq".into(), + name: "jq2".into() + }, + bt: BTreeMap::from([( + "jq".to_string(), + json!("{ \"bee\": $mycall.bee, \"boo\": $request.boo }") + )]), + links: vec![ + UserLink { + from: UserNodePort { + node: Some("mycall".into()), + port: None + }, + to: UserNodePort { + node: Some("jq2".into()), + port: Some("$mycall".into()) + } + }, + UserLink { + from: UserNodePort { + node: Some("request".into()), + port: Some("body".into()) + }, + to: UserNodePort { + node: Some("jq2".into()), + port: Some("$request".into()) + } + } + ], + n_inputs: 2, + n_outputs: 0, + named_ins: vec!["$mycall".into(), "$request".into()], + named_outs: vec![] + } + ], + debug: false + } + ); + } + + #[test] + fn test_parse_node_port() { + let cases = vec![ + ("", (Some(""), None)), + (" ", (Some(""), None)), + (".", (Some(""), Some(""))), + (". ", (Some(""), Some(""))), + (" . ", (Some(""), Some(""))), + (".foo", (Some(""), Some("foo"))), + (".foo.bar", (Some(""), Some("foo.bar"))), + ("..foo.bar", (Some(""), Some(".foo.bar"))), + (". .foo.bar", (Some(""), Some(".foo.bar"))), + ("f.bar", (Some("f"), Some("bar"))), + ("foo", (Some("foo"), None)), + ("foo.", (Some("foo"), Some(""))), + ("foo.b", (Some("foo"), Some("b"))), + ("foo.b ", (Some("foo"), Some("b"))), + ("foo.bar", (Some("foo"), Some("bar"))), + ("foo . bar", (Some("foo"), Some("bar"))), + ("foo..baz", (Some("foo"), Some(".baz"))), + ("foo.bar.", (Some("foo"), Some("bar."))), + ("foo.bar..", (Some("foo"), Some("bar.."))), + ("foo.bar.baz", (Some("foo"), Some("bar.baz"))), + ("foo.bar baz", (Some("foo"), Some("bar baz"))), + ("foo bar.baz bla", (Some("foo bar"), Some("baz bla"))), + (" foo . bar.baz ", (Some("foo"), Some("bar.baz"))), + ]; + for (node_port, pair) in cases { + assert_eq!( + parse_node_port(node_port.to_owned()), + (pair.0.map(str::to_owned), pair.1.map(str::to_owned)) + ); + } + } + + fn accept_config(cfg: &str) -> Config { + let result = Config::new(cfg.as_bytes().to_vec(), &[]); + + result.unwrap() + } + + fn reject_config_with(cfg: &str, message: &str) { + let result = Config::new(cfg.as_bytes().to_vec(), &[]); + + let err = result.unwrap_err(); + assert_eq!(err, message); + } + + #[test] + fn config_no_json() { + reject_config_with( + "", + "failed parsing configuration: EOF while parsing a JSON value.", + ) + } + + #[test] + fn config_bad_json() { + reject_config_with( + "{", + "failed parsing configuration: EOF while parsing an object.", + ) + } + + #[test] + fn config_empty_json() { + reject_config_with("{}", "failed parsing configuration: missing field `nodes`") + } + + #[test] + fn config_empty_nodes() { + accept_config( + r#"{ + "nodes": [] + }"#, + ); + } + + #[test] + fn config_missing_type() { + reject_config_with( + r#"{ + "nodes": [ + { + "name": "MY_NODE" + } + ] + }"#, + "failed parsing configuration: missing field `type`", + ) + } + + #[test] + fn config_invalid_type() { + reject_config_with( + r#"{ + "nodes": [ + { + "name": "MY_NODE", + "type": "INVALID" + } + ] + }"#, + "failed checking configuration: unknown node type `INVALID`", + ) + } + + struct IgnoreConfig {} + impl NodeConfig for IgnoreConfig { + fn as_any(&self) -> &dyn Any { + self + } + } + + #[test] + fn convert_complete_example() { + let uc = deserialize_user_config( + r#"{ + "nodes": [ + { + "name": "jq1", + "type": "jq", + "input": "request.headers", + "jq": "{ \"x-bar\": $request_headers[\"x-foo\"] }" + }, + { + "name": "mycall", + "type": "call", + "input": "jq1", + "url": "http://example.com" + }, + { + "name": "jq2", + "type": "jq", + "inputs": { + "$mycall": "mycall", + "$request": "request.body" + }, + "jq": "{ \"bee\": $mycall.bee, \"boo\": $request.boo }" + } + ] + }"#, + ); + + nodes::register_node("call", Box::new(nodes::call::CallFactory {})); + nodes::register_node("jq", Box::new(nodes::jq::JqFactory {})); + + let implicits = vec![ + ImplicitNode::new("request", &[], &["body", "headers"]), + ImplicitNode::new("service_request", &["body", "headers"], &[]), + ImplicitNode::new("response", &["body", "headers"], &[]), + ImplicitNode::new("service_response", &[], &["body", "headers"]), + ]; + + let config = convert_config(uc, &implicits).unwrap(); + assert!(!config.debug); + assert_eq!(config.n_nodes, 7); + assert_eq!(config.n_implicits, 4); + assert_eq!( + config.node_list, + vec![ + NodeInfo { + name: "request".into(), + node_type: "implicit".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "service_request".into(), + node_type: "implicit".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "response".into(), + node_type: "implicit".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "service_response".into(), + node_type: "implicit".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "jq1".into(), + node_type: "jq".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "mycall".into(), + node_type: "call".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "jq2".into(), + node_type: "jq".into(), + node_config: Box::new(IgnoreConfig {}), + }, + ] + ); + let input_lists: &[&[Option<(usize, usize)>]] = &[ + &[], + &[None, None], + &[None, None], + &[], + &[Some((0, 1))], + &[Some((4, 0)), None, None], + &[Some((5, 0)), Some((0, 0))], + ]; + for (i, &input_list) in input_lists.iter().enumerate() { + let given: Vec<_> = input_list.iter().collect(); + let computed: Vec<_> = config.graph.each_input(i).collect(); + assert_eq!(given, computed); + } + + let output_lists: &[&[&[(usize, usize)]]] = &[ + &[&[(6, 1)], &[(4, 0)]], + &[], + &[], + &[&[], &[]], + &[&[(5, 0)]], + &[&[(6, 0)], &[]], + &[], + ]; + for (i, &output_list) in output_lists.iter().enumerate() { + let given: Vec<_> = output_list.iter().collect(); + let computed: Vec<_> = config.graph.each_output(i).collect(); + assert_eq!(given, computed); + } + } +} diff --git a/src/data.rs b/src/data.rs index bb6ca69..afe5648 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,5 +1,3 @@ -use std::collections::BTreeMap; - use crate::dependency_graph::DependencyGraph; use crate::payload::Payload; @@ -21,107 +19,164 @@ pub struct Input<'a> { #[derive(Debug)] pub enum State { Waiting(u32), - Done(Option), - Fail(Option), + Done(Vec>), + Fail(Vec>), } -#[derive(Default)] pub struct Data { graph: DependencyGraph, - states: BTreeMap, + states: Vec>, +} + +fn set_port( + ports: &mut [Option], + port: usize, + payload: Payload, +) -> Result<(), &'static str> { + match ports.get(port) { + Some(_) => Err("cannot overwrite a payload"), + None => { + ports[port] = Some(payload); + Ok(()) + } + } +} + +fn get_port(ports: &[Option], port: usize) -> Option<&Payload> { + match ports.get(port) { + Some(Some(ref payload)) => Some(payload), + _ => None, + } } impl Data { pub fn new(graph: DependencyGraph) -> Data { - Data { - graph, - states: Default::default(), + let n = graph.number_of_nodes(); + let mut states = Vec::with_capacity(n); + states.resize_with(n, Default::default); + Data { graph, states } + } + + pub fn get_node_name(&self, i: usize) -> &str { + self.graph.get_node_name(i).expect("valid index") + } + + pub fn set(&mut self, node: usize, state: State) { + self.states[node] = Some(state); + } + + pub fn fill_port( + &mut self, + node: usize, + port: usize, + payload: Payload, + ) -> Result<(), &'static str> { + match &mut self.states[node] { + None => { + let mut ports: Vec> = + Vec::with_capacity(self.graph.number_of_outputs(node)); + ports[port] = Some(payload); + let state = State::Done(ports); + self.states[node] = Some(state); + Ok(()) + } + Some(State::Waiting(_)) => Err("cannot force payload on a waiting node"), + Some(State::Done(ports)) => set_port(ports, port, payload), + Some(State::Fail(ports)) => set_port(ports, port, payload), } } - pub fn set(&mut self, name: &str, state: State) { - self.states.insert(name.to_string(), state); + pub fn fetch_port(&self, node: usize, port: usize) -> Option<&Payload> { + match self.graph.get_provider(node, port) { + Some((n, p)) => match self.states.get(n).unwrap() { + Some(State::Waiting(_)) => None, + Some(State::Done(ports)) => get_port(ports, p), + Some(State::Fail(ports)) => get_port(ports, p), + None => None, + }, + None => None, + } } - fn can_trigger(&self, name: &str, waiting: Option) -> bool { - // If node is Done, avoid producing inputs - // and re-triggering its execution. - if let Some(state) = self.states.get(name) { - match state { - State::Done(_) => { - return false; - } + fn can_trigger(&self, i: usize, waiting: Option) -> bool { + // This is intentionally written with all of the match arms + // stated explicitly (instead of using _ catch-alls), + // so that the trigger logic and all its states + // are considered by the reader. + match &self.states[i] { + // state was never created, trigger + None => true, + Some(state) => match state { + // never retrigger Done + State::Done(_) => false, + // never retrigger Fail + State::Fail(_) => false, State::Waiting(w) => match &waiting { - Some(id) => { - if w != id { - return false; - } - } - None => return false, + // we're waiting on the right id, allow triggering + Some(id) if w == id => true, + // waiting on something else, skip + Some(_) => false, + // not called from a wait state + None => false, }, - State::Fail(_) => { - return false; - } - } + }, } + } - // Check that all inputs have payloads available - for input in self.graph.each_input(name) { - let val = self.states.get(input); - match val { - Some(State::Done(_)) => {} - _ => { - return false; + fn for_each_input<'a, T>( + &'a self, + i: usize, + f: impl for<'b> Fn(Option<&'a Payload>, &'b mut T), + mut t: T, + ) -> Option { + for input in self.graph.each_input(i) { + // if input port is connected in the graph + match *input { + Some((n, p)) => { + // check if other node is Done + match &self.states[n] { + Some(State::Done(ports)) => { + // check if port has payload available + match &ports[p] { + // ok, has payload + Some(payload) => f(Some(payload), &mut t), + // no payload available + None => return None, + } + } + Some(State::Waiting(_)) => return None, + Some(State::Fail(_)) => return None, + None => return None, + } } - }; + None => f(None, &mut t), // ok, port is not connected + } } - true + Some(t) } pub fn get_inputs_for( &self, - name: &str, + node: usize, waiting: Option, ) -> Option>> { - if !self.can_trigger(name, waiting) { - return None; - } - - // If so, allocate the vector with the result. - let mut vec: Vec> = Vec::new(); - for input in self.graph.each_input(name) { - if let Some(State::Done(p)) = self.states.get(input) { - vec.push(p.as_ref()); - } - } - - Some(vec) - } - - /// If the node is triggerable, that is, it has all its required - /// inputs available to trigger (i.e. none of its inputs are in a - /// `Waiting` state), then return the payload of the first input that - /// is in a `Done state. - /// - /// Note that by returning an `Option<&Payload>` this makes no - /// distinction between the node being not triggerable or the - /// node being triggerable via a `Done(None)` input. - /// - /// This is not an issue because this function is intended for use - /// with the implicit nodes (`response_body`, etc.) which are - /// handled as special cases directly by the filter. - pub fn first_input_for(&self, name: &str, waiting: Option) -> Option<&Payload> { - if !self.can_trigger(name, waiting) { + if !self.can_trigger(node, waiting) { return None; } - for input in self.graph.each_input(name) { - if let Some(State::Done(p)) = self.states.get(input) { - return p.as_ref(); - } - } + // Check first that all connected inputs are ready + self.for_each_input(node, |_, _| (), &mut ())?; - None + // If so, allocate the vector with the result. + let n = self.graph.number_of_inputs(node); + self.for_each_input( + node, + |payload, v: &mut Vec>| match payload { + Some(p) => v.push(Some(p)), + None => v.push(None), + }, + Vec::with_capacity(n), + ) } } diff --git a/src/debug.rs b/src/debug.rs index 73bfa1f..a95d12e 100644 --- a/src/debug.rs +++ b/src/debug.rs @@ -23,11 +23,16 @@ struct RunOperation { action: RunMode, } +#[derive(Serialize)] +struct PortValue { + data_type: String, + value: Option, +} + struct SetOperation { node_name: String, - data_type: String, status: DataMode, - value: Option, + values: Vec, } enum Operation { @@ -52,17 +57,26 @@ impl State { } } -fn payload_to_op_info(p: &Option, default_type: &str) -> (String, Option) { - if let Some(payload) = p { - let dt = payload.content_type().unwrap_or(default_type).to_string(); - - match payload.to_json() { - Ok(v) => (dt, Some(v)), - Err(e) => ("fail".to_string(), Some(serde_json::json!(e))), - } - } else { - ("none".to_string(), None) - } +fn payloads_to_values(payloads: &[Option], default_type: &str) -> Vec { + payloads + .iter() + .map(|p| match p { + Some(payload) => match payload.to_json() { + Ok(v) => PortValue { + data_type: payload.content_type().unwrap_or(default_type).to_string(), + value: Some(v), + }, + Err(e) => PortValue { + data_type: "fail".into(), + value: Some(serde_json::json!(e)), + }, + }, + None => PortValue { + data_type: "none".into(), + value: None, + }, + }) + .collect() } impl Debug { @@ -82,17 +96,14 @@ impl Debug { pub fn set_data(&mut self, name: &str, state: &State) { if self.trace { - let (data_type, value) = match state { - State::Done(p) => payload_to_op_info(p, "raw"), - State::Waiting(_) => ("waiting".to_string(), None), - State::Fail(p) => payload_to_op_info(p, "fail"), - }; - self.operations.push(Operation::Set(SetOperation { node_name: name.to_string(), - data_type, status: state.to_data_mode(), - value, + values: match state { + State::Waiting(_) => vec![], + State::Done(p) => payloads_to_values(p, "raw"), + State::Fail(p) => payloads_to_values(p, "fail"), + }, })); } } @@ -135,7 +146,7 @@ impl Debug { #[serde(skip_serializing_if = "Option::is_none")] r#type: Option<&'a str>, #[serde(skip_serializing_if = "Option::is_none")] - value: Option<&'a Value>, + values: Option<&'a Vec>, } let mut actions: Vec = vec![]; @@ -147,28 +158,28 @@ impl Debug { RunMode::Run => "run", RunMode::Resume => "resume", }, - name: &run.node_name, r#type: Some(&run.node_type), - value: None, + name: &run.node_name, + values: None, }, Operation::Set(set) => match set.status { DataMode::Done => TraceAction { action: "value", name: &set.node_name, - r#type: Some(&set.data_type), - value: set.value.as_ref(), + r#type: None, + values: Some(&set.values), }, DataMode::Waiting => TraceAction { action: "wait", name: &set.node_name, r#type: None, - value: None, + values: None, }, DataMode::Fail => TraceAction { action: "fail", name: &set.node_name, r#type: None, - value: set.value.as_ref(), + values: Some(&set.values), }, }, }); diff --git a/src/dependency_graph.rs b/src/dependency_graph.rs index f08aac8..fa1db9c 100644 --- a/src/dependency_graph.rs +++ b/src/dependency_graph.rs @@ -1,55 +1,148 @@ -use core::slice::Iter; -use std::collections::BTreeMap; - -#[derive(Default, Clone)] +#[derive(Clone, PartialEq, Debug)] pub struct DependencyGraph { - dependents: BTreeMap>, - providers: BTreeMap>, - empty: Vec, + node_names: Vec, + input_names: Vec>, + output_names: Vec>, + dependents: Vec>>, + providers: Vec>>, } -fn add_to(map: &mut BTreeMap>, key: &str, value: &str) { - match map.get_mut(key) { - Some(key_items) => { - let v = value.to_string(); - if !key_items.contains(&v) { - key_items.push(v); - } - } - None => { - map.insert(key.to_string(), vec![value.to_string()]); - } - }; +pub fn find( + node: &str, + port: &str, + node_names: &[String], + port_names: &[Vec], +) -> (usize, usize) { + let n = node_names + .iter() + .position(|x| x == node) + .expect("node registered in node_names"); + let p = port_names + .get(n) + .expect("valid node index") + .iter() + .position(|x| x == port) + .expect("port registered in port_names"); + (n, p) } impl DependencyGraph { - pub fn add(&mut self, src: &str, dst: &str) { - add_to(&mut self.dependents, src, dst); - add_to(&mut self.providers, dst, src); + pub fn new( + node_names: Vec, + input_names: Vec>, + output_names: Vec>, + ) -> DependencyGraph { + let n = node_names.len(); + let mut dependents = Vec::with_capacity(n); + let mut providers = Vec::with_capacity(n); + for ports in &input_names { + providers.push(vec![None; ports.len()]); + } + for ports in &output_names { + let np = ports.len(); + let mut lists = Vec::with_capacity(np); + lists.resize_with(np, Vec::new); + dependents.push(lists); + } + DependencyGraph { + node_names, + input_names, + output_names, + dependents, + providers, + } } - pub fn has_dependents(&self, name: &str) -> bool { - self.dependents.contains_key(name) + pub fn get_node_name(&self, i: usize) -> Option<&str> { + self.node_names.get(i).map(|o| o.as_ref()) } - pub fn has_providers(&self, name: &str) -> bool { - self.providers.contains_key(name) + pub fn number_of_nodes(&self) -> usize { + self.node_names.len() } - pub fn get_input_names(&self, name: &str) -> &Vec { - if let Some(items) = self.providers.get(name) { - items - } else { - &self.empty - } + pub fn number_of_outputs(&self, node: usize) -> usize { + self.output_names[node].len() + } + + pub fn number_of_inputs(&self, node: usize) -> usize { + self.input_names[node].len() } - pub fn each_input(&self, name: &str) -> Iter { - if let Some(items) = self.providers.get(name) { - items.iter() - } else { - // FIXME is there a better way to do this? - self.empty.iter() + fn add_dependent(&mut self, node: usize, port: usize, entry: (usize, usize)) { + let node_list = &mut self.dependents; + let port_list = node_list.get_mut(node).expect("valid node index"); + let entries_list = port_list.get_mut(port).expect("valid port index"); + entries_list.push(entry); + } + + fn add_provider( + &mut self, + node: usize, + port: usize, + entry: (usize, usize), + ) -> Result<(), String> { + let node_list = &mut self.providers; + let port_list = node_list.get_mut(node).expect("valid node index"); + match *port_list.get(port).expect("valid port index") { + Some((other_n, other_p)) => self.err_already_connected(node, port, other_n, other_p), + None => { + port_list[port] = Some(entry); + Ok(()) + } } } + + fn err_already_connected( + &self, + n: usize, + p: usize, + oth_n: usize, + oth_p: usize, + ) -> Result<(), String> { + let this_node = self.node_names.get(n).expect("valid node"); + let this_port = self.output_names[n].get(p).expect("valid port"); + let other_node = self.node_names.get(oth_n).expect("valid node"); + let other_port = self.output_names[oth_n].get(oth_p).expect("valid port"); + Err(format!( + "{this_node}.{this_port} is already connected to {other_node}.{other_port}" + )) + } + + pub fn add( + &mut self, + src_node: &str, + src_port: &str, + dst_node: &str, + dst_port: &str, + ) -> Result<(), String> { + let (sn, sp) = find(src_node, src_port, &self.node_names, &self.output_names); + let (dn, dp) = find(dst_node, dst_port, &self.node_names, &self.input_names); + self.add_dependent(sn, sp, (dn, dp)); + self.add_provider(dn, dp, (sn, sp)) + } + + pub fn has_dependents(&self, node: &str, port: &str) -> bool { + let (n, p) = find(node, port, &self.node_names, &self.output_names); + !self.dependents[n][p].is_empty() + } + + pub fn has_providers(&self, node: &str, port: &str) -> bool { + let (n, p) = find(node, port, &self.node_names, &self.input_names); + self.providers[n][p].is_some() + } + + pub fn get_provider(&self, node: usize, port: usize) -> Option<(usize, usize)> { + self.providers[node][port] + } + + pub fn each_input(&self, node: usize) -> std::slice::Iter> { + self.providers[node].iter() + } + + /// used in tests only + #[allow(dead_code)] + pub fn each_output(&self, node: usize) -> std::slice::Iter> { + self.dependents[node].iter() + } } diff --git a/src/filter.rs b/src/filter.rs index 2b1a36d..2d9ffc6 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,3 +1,4 @@ +use lazy_static::lazy_static; use proxy_wasm::{traits::*, types::*}; use std::rc::Rc; @@ -8,12 +9,40 @@ mod dependency_graph; mod nodes; mod payload; -use crate::config::Config; +use crate::config::{Config, ImplicitNode}; use crate::data::{Data, Input, Phase, Phase::*, State}; use crate::debug::{Debug, RunMode}; use crate::dependency_graph::DependencyGraph; -use crate::nodes::{Node, NodeMap}; +use crate::nodes::{Node, NodeVec}; use crate::payload::Payload; +use crate::ImplicitNodeId::*; + +// ----------------------------------------------------------------------------- +// Implicit nodes +// ----------------------------------------------------------------------------- + +#[derive(Copy, Clone)] +enum ImplicitNodeId { + Request = 0, + ServiceRequest = 1, + Response = 2, + ServiceResponse = 3, +} + +#[derive(Copy, Clone)] +enum ImplicitPortId { + Body = 0, + Headers = 1, +} + +lazy_static! { + static ref IMPLICIT_NODES: Vec = vec![ + ImplicitNode::new("request", &[], &["body", "headers"]), + ImplicitNode::new("service_request", &["body", "headers"], &[]), + ImplicitNode::new("response", &["body", "headers"], &[]), + ImplicitNode::new("service_response", &[], &["body", "headers"]), + ]; +} // ----------------------------------------------------------------------------- // Root Context @@ -28,7 +57,7 @@ impl Context for DataKitFilterRootContext {} impl RootContext for DataKitFilterRootContext { fn on_configure(&mut self, _config_size: usize) -> bool { match self.get_plugin_configuration() { - Some(config_bytes) => match Config::new(config_bytes) { + Some(config_bytes) => match Config::new(config_bytes, &IMPLICIT_NODES) { Ok(config) => { self.config = Some(Rc::new(config)); true @@ -62,14 +91,14 @@ impl RootContext for DataKitFilterRootContext { // to avoid cloning every time? let data = Data::new(graph.clone()); - let do_request_headers = graph.has_dependents("request_headers"); - let do_request_body = graph.has_dependents("request_body"); - let do_service_request_headers = graph.has_providers("service_request_headers"); - let do_service_request_body = graph.has_providers("service_request_body"); - let do_service_response_headers = graph.has_dependents("service_response_headers"); - let do_service_response_body = graph.has_dependents("service_response_body"); - let do_response_headers = graph.has_providers("response_headers"); - let do_response_body = graph.has_providers("response_body"); + let do_request_headers = graph.has_dependents("request", "headers"); + let do_request_body = graph.has_dependents("request", "body"); + let do_service_request_headers = graph.has_providers("service_request", "headers"); + let do_service_request_body = graph.has_providers("service_request", "body"); + let do_service_response_headers = graph.has_dependents("service_response", "headers"); + let do_service_response_body = graph.has_dependents("service_response", "body"); + let do_response_headers = graph.has_providers("response", "headers"); + let do_response_body = graph.has_providers("response", "body"); Some(Box::new(DataKitFilter { config, @@ -95,7 +124,7 @@ impl RootContext for DataKitFilterRootContext { pub struct DataKitFilter { config: Rc, - nodes: NodeMap, + nodes: NodeVec, data: Data, debug: Option, failed: bool, @@ -161,16 +190,27 @@ impl DataKitFilter { ); } - fn set_data(&mut self, name: &str, state: State) { - if let Some(ref mut debug) = self.debug { - debug.set_data(name, &state); - } - self.data.set(name, state); + fn set_headers_data(&mut self, node: ImplicitNodeId, vec: Vec<(String, String)>) { + let payload = payload::from_pwm_headers(vec); + self.data + .fill_port(node as usize, ImplicitPortId::Headers as usize, payload) + .unwrap(); } - fn set_headers_data(&mut self, vec: Vec<(String, String)>, name: &str) { - let payload = payload::from_pwm_headers(vec); - self.set_data(name, State::Done(Some(payload))); + fn set_body_data(&mut self, node: ImplicitNodeId, payload: Payload) { + self.data + .fill_port(node as usize, ImplicitPortId::Body as usize, payload) + .unwrap(); + } + + fn get_headers_data(&self, node: ImplicitNodeId) -> Option<&Payload> { + self.data + .fetch_port(node as usize, ImplicitPortId::Headers as usize) + } + + fn get_body_data(&self, node: ImplicitNodeId) -> Option<&Payload> { + self.data + .fetch_port(node as usize, ImplicitPortId::Body as usize) } fn run_nodes(&mut self, phase: Phase) -> Action { @@ -181,15 +221,18 @@ impl DataKitFilter { debug_is_tracing = debug.is_tracing(); } + let from = self.config.number_of_implicits(); + let to = self.config.node_count(); + while !self.failed { let mut any_ran = false; - for name in self.config.get_node_names() { + for i in from..to { let node: &dyn Node = self .nodes - .get(name) - .expect("self.nodes doesn't match self.node_names") + .get(i) + .expect("self.nodes doesn't match node_count") .as_ref(); - if let Some(inputs) = self.data.get_inputs_for(name, None) { + if let Some(inputs) = self.data.get_inputs_for(i, None) { any_ran = true; let input = Input { @@ -199,6 +242,7 @@ impl DataKitFilter { let state = node.run(self as &dyn HttpContext, &input); if let Some(ref mut debug) = self.debug { + let name = self.data.get_node_name(i); debug.run(name, &inputs, &state, RunMode::Run); } @@ -215,7 +259,7 @@ impl DataKitFilter { } } - self.data.set(name, state); + self.data.set(i, state); } } if !any_ran { @@ -228,7 +272,7 @@ impl DataKitFilter { fn set_service_request_headers(&mut self) { if self.do_service_request_headers { - if let Some(payload) = self.data.first_input_for("service_request_headers", None) { + if let Some(payload) = self.get_headers_data(ServiceRequest) { let headers = payload::to_pwm_headers(Some(payload)); self.set_http_request_headers(headers); self.do_service_request_headers = false; @@ -238,7 +282,7 @@ impl DataKitFilter { fn set_service_request_body(&mut self) { if self.do_service_request_body { - if let Some(payload) = self.data.first_input_for("service_request_body", None) { + if let Some(payload) = self.get_body_data(ServiceRequest) { if let Ok(bytes) = payload.to_bytes() { self.set_http_request_body(0, bytes.len(), &bytes); } @@ -258,13 +302,16 @@ impl Context for DataKitFilter { ) { log::debug!("DataKitFilter: on http call response, id = {:?}", token_id); - for name in self.config.get_node_names() { + let from = self.config.number_of_implicits(); + let to = self.config.node_count(); + + for i in from..to { let node: &dyn Node = self .nodes - .get(name) - .expect("self.nodes doesn't match self.node_names") + .get(i) + .expect("self.nodes doesn't match node_count") .as_ref(); - if let Some(inputs) = self.data.get_inputs_for(name, Some(token_id)) { + if let Some(inputs) = self.data.get_inputs_for(i, Some(token_id)) { let input = Input { data: &inputs, phase: HttpCallResponse, @@ -272,10 +319,11 @@ impl Context for DataKitFilter { let state = node.resume(self, &input); if let Some(ref mut debug) = self.debug { + let name = self.data.get_node_name(i); debug.run(name, &inputs, &state, RunMode::Resume); } - self.data.set(name, state); + self.data.set(i, state); break; } } @@ -297,7 +345,7 @@ impl HttpContext for DataKitFilter { if self.do_request_headers { let vec = self.get_http_request_headers(); - self.set_headers_data(vec, "request_headers"); + self.set_headers_data(Request, vec); } let action = self.run_nodes(HttpRequestHeaders); @@ -316,8 +364,9 @@ impl HttpContext for DataKitFilter { if eof && self.do_request_body { if let Some(bytes) = self.get_http_request_body(0, body_size) { let content_type = self.get_http_request_header("Content-Type"); - let body_payload = Payload::from_bytes(bytes, content_type.as_deref()); - self.set_data("request_body", State::Done(body_payload)); + if let Some(payload) = Payload::from_bytes(bytes, content_type.as_deref()) { + self.set_body_data(Request, payload); + } } } @@ -332,20 +381,20 @@ impl HttpContext for DataKitFilter { fn on_http_response_headers(&mut self, _nheaders: usize, _eof: bool) -> Action { if self.do_service_response_headers { let vec = self.get_http_response_headers(); - self.set_headers_data(vec, "service_response_headers"); + self.set_headers_data(ServiceResponse, vec); } let action = self.run_nodes(HttpResponseHeaders); if self.do_response_headers { - if let Some(payload) = self.data.first_input_for("response_headers", None) { + if let Some(payload) = self.get_headers_data(Response) { let headers = payload::to_pwm_headers(Some(payload)); self.set_http_response_headers(headers); } } if self.do_response_body { - if let Some(payload) = self.data.first_input_for("response_body", None) { + if let Some(payload) = self.get_body_data(Response) { let content_length = payload.len().map(|n| n.to_string()); self.set_http_response_header("Content-Length", content_length.as_deref()); self.set_http_response_header("Content-Type", payload.content_type()); @@ -370,15 +419,16 @@ impl HttpContext for DataKitFilter { if eof && self.do_service_response_body { if let Some(bytes) = self.get_http_response_body(0, body_size) { let content_type = self.get_http_response_header("Content-Type"); - let payload = Payload::from_bytes(bytes, content_type.as_deref()); - self.set_data("service_response_body", State::Done(payload)); + if let Some(payload) = Payload::from_bytes(bytes, content_type.as_deref()) { + self.set_body_data(ServiceResponse, payload); + } } } let action = self.run_nodes(HttpResponseBody); if self.do_response_body { - if let Some(payload) = self.data.first_input_for("response_body", None) { + if let Some(payload) = self.get_body_data(Response) { if let Ok(bytes) = payload.to_bytes() { self.set_http_response_body(0, bytes.len(), &bytes); } else { @@ -387,8 +437,9 @@ impl HttpContext for DataKitFilter { } else if let Some(debug) = &self.debug { if let Some(bytes) = self.get_http_response_body(0, body_size) { let content_type = debug.response_body_content_type(); - let payload = Payload::from_bytes(bytes, content_type.as_deref()); - self.set_data("response_body", State::Done(payload)); + if let Some(payload) = Payload::from_bytes(bytes, content_type.as_deref()) { + self.set_body_data(Response, payload); + } } } } @@ -402,6 +453,7 @@ impl HttpContext for DataKitFilter { } proxy_wasm::main! {{ + nodes::register_node("implicit", Box::new(nodes::implicit::ImplicitFactory {})); nodes::register_node("template", Box::new(nodes::template::TemplateFactory {})); nodes::register_node("call", Box::new(nodes::call::CallFactory {})); nodes::register_node("response", Box::new(nodes::response::ResponseFactory {})); diff --git a/src/nodes.rs b/src/nodes.rs index d302f1e..8694b88 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -11,26 +11,53 @@ pub mod jq; pub mod response; pub mod template; -pub type NodeMap = BTreeMap>; +pub type NodeVec = Vec>; + +#[derive(Clone)] +pub struct PortConfig { + pub defaults: Option>, + pub user_defined_ports: bool, +} + +impl PortConfig { + fn names(list: &[&str]) -> Option> { + Some(list.iter().map(|&s| str::to_owned(s)).collect()) + } +} + +impl Default for PortConfig { + fn default() -> Self { + PortConfig { + defaults: None, + user_defined_ports: true, + } + } +} pub trait Node { fn run(&self, _ctx: &dyn HttpContext, _input: &Input) -> State { - Done(None) + Done(vec![None]) } fn resume(&self, _ctx: &dyn HttpContext, _input: &Input) -> State { - Done(None) + Done(vec![None]) } } +pub struct NodeDefaultLink { + pub this_port: String, + pub other_node: String, + pub other_port: String, +} + pub trait NodeConfig { fn as_any(&self) -> &dyn Any; - fn default_inputs(&self) -> Option> { + fn default_inputs(&self) -> Option> { None } - fn default_outputs(&self) -> Option> { + fn default_outputs(&self) -> Option> { None } } @@ -40,10 +67,15 @@ pub trait NodeFactory: Send { &self, name: &str, inputs: &[String], + outputs: &[String], bt: &BTreeMap, ) -> Result, String>; fn new_node(&self, config: &dyn NodeConfig) -> Box; + + fn default_input_ports(&self) -> PortConfig; + + fn default_output_ports(&self) -> PortConfig; } type NodeTypeMap = BTreeMap>; @@ -53,31 +85,90 @@ fn node_types() -> &'static Mutex { NODE_TYPES.get_or_init(|| Mutex::new(BTreeMap::new())) } -pub fn register_node(name: &str, factory: Box) -> bool { - node_types() - .lock() - .unwrap() - .insert(String::from(name), factory); - true +pub fn register_node(name: &str, factory: Box) { + node_types().lock().unwrap().insert(name.into(), factory); +} + +fn with_node_type(node_type: &str, f: impl Fn(&Box) -> T) -> Option +where + T: Sized, +{ + node_types().lock().unwrap().get(node_type).map(f) +} + +pub fn is_valid_type(node_type: &str) -> bool { + with_node_type(node_type, |_| true).unwrap_or(false) +} + +pub fn default_input_ports(node_type: &str) -> Option { + with_node_type(node_type, |nf| nf.default_input_ports()) +} + +pub fn default_output_ports(node_type: &str) -> Option { + with_node_type(node_type, |nf| nf.default_output_ports()) } pub fn new_config( node_type: &str, name: &str, inputs: &[String], + outputs: &[String], bt: &BTreeMap, ) -> Result, String> { - if let Some(nf) = node_types().lock().unwrap().get(node_type) { - nf.new_config(name, inputs, bt) - } else { - Err(format!("no such node type: {node_type}")) + match with_node_type(node_type, |nf| nf.new_config(name, inputs, outputs, bt)) { + Some(Ok(ok)) => Ok(ok), + Some(Err(e)) => Err(e), + None => Err(format!("no such node type: {node_type}")), } } pub fn new_node(node_type: &str, config: &dyn NodeConfig) -> Result, String> { - if let Some(nf) = node_types().lock().unwrap().get(node_type) { - Ok(nf.new_node(config)) - } else { - Err(format!("no such node type: {node_type}")) + with_node_type(node_type, |nf| nf.new_node(config)) + .ok_or(format!("no such node type: {node_type}")) +} + +pub mod implicit { + use super::*; + + #[derive(Clone)] + pub struct Implicit {} + + impl Node for Implicit {} + + pub struct ImplicitFactory {} + + #[derive(Debug)] + pub struct ImplicitConfig {} + + impl NodeConfig for ImplicitConfig { + fn as_any(&self) -> &dyn Any { + self + } + } + impl NodeFactory for ImplicitFactory { + fn default_input_ports(&self) -> PortConfig { + Default::default() + } + + fn default_output_ports(&self) -> PortConfig { + Default::default() + } + + fn new_config( + &self, + _name: &str, + _inputs: &[String], + _outputs: &[String], + _bt: &BTreeMap, + ) -> Result, String> { + Ok(Box::new(ImplicitConfig {})) + } + + fn new_node(&self, config: &dyn NodeConfig) -> Box { + match config.as_any().downcast_ref::() { + Some(_cc) => Box::new(Implicit {}), + None => panic!("incompatible NodeConfig"), + } + } } } diff --git a/src/nodes/call.rs b/src/nodes/call.rs index 785597d..7d6953d 100644 --- a/src/nodes/call.rs +++ b/src/nodes/call.rs @@ -8,7 +8,7 @@ use url::Url; use crate::config::get_config_value; use crate::data::{Input, State, State::*}; -use crate::nodes::{Node, NodeConfig, NodeFactory}; +use crate::nodes::{Node, NodeConfig, NodeFactory, PortConfig}; use crate::payload; use crate::payload::Payload; @@ -44,7 +44,7 @@ impl Node for Call { Ok(u) => u, Err(err) => { log::error!("call: failed parsing URL from 'url' field: {err}"); - return Done(None); + return Done(vec![]); } }; @@ -52,7 +52,7 @@ impl Node for Call { Some(h) => h, None => { log::error!("call: failed getting host from URL"); - return Done(None); + return Done(vec![]); } }; @@ -63,7 +63,7 @@ impl Node for Call { let body_slice = match payload::to_pwm_body(*body) { Ok(slice) => slice, - Err(e) => return Fail(Some(Payload::Error(e))), + Err(e) => return Fail(vec![Some(Payload::Error(e))]), }; let trailers = vec![]; @@ -87,14 +87,18 @@ impl Node for Call { log::debug!("call: dispatch call id: {:?}", id); Waiting(id) } - Err(status) => Fail(Some(Payload::Error(format!("error: {:?}", status)))), + Err(status) => Fail(vec![Some(Payload::Error(format!("error: {:?}", status)))]), } } fn resume(&self, ctx: &dyn HttpContext, _inputs: &Input) -> State { log::debug!("call: resume"); - let r = if let Some(body) = ctx.get_http_call_response_body(0, usize::MAX) { + let headers = Some(payload::from_pwm_headers( + ctx.get_http_call_response_headers(), + )); + + let body = if let Some(body) = ctx.get_http_call_response_body(0, usize::MAX) { let content_type = ctx.get_http_call_response_header("Content-Type"); Payload::from_bytes(body, content_type.as_deref()) @@ -102,20 +106,34 @@ impl Node for Call { None }; - // TODO once we have multiple outputs, - // also return headers and produce a Fail() status on HTTP >= 400 + // TODO only produce an output if it is connected + // TODO produce a Fail() status on HTTP >= 400 - Done(r) + Done(vec![body, headers]) } } pub struct CallFactory {} impl NodeFactory for CallFactory { + fn default_input_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["body", "headers", "query"]), + user_defined_ports: false, + } + } + fn default_output_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["body", "headers"]), + user_defined_ports: false, + } + } + fn new_config( &self, _name: &str, _inputs: &[String], + _outputs: &[String], bt: &BTreeMap, ) -> Result, String> { Ok(Box::new(CallConfig { diff --git a/src/nodes/jq.rs b/src/nodes/jq.rs index aec5b97..df80f3d 100644 --- a/src/nodes/jq.rs +++ b/src/nodes/jq.rs @@ -8,13 +8,14 @@ use std::collections::BTreeMap; use crate::config::get_config_value; use crate::data::{Input, State}; -use crate::nodes::{Node, NodeConfig, NodeFactory}; +use crate::nodes::{Node, NodeConfig, NodeFactory, PortConfig}; use crate::payload::Payload; #[derive(Clone, Debug)] pub struct JqConfig { jq: String, inputs: Vec, + _outputs: Vec, // TODO: implement multiple outputs } impl NodeConfig for JqConfig { @@ -69,12 +70,12 @@ impl Errors { impl From for State { fn from(val: Errors) -> Self { - State::Fail(Some(Payload::Error(if val.is_empty() { + State::Fail(vec![Some(Payload::Error(if val.is_empty() { // should be unreachable "unknown jq error".to_string() } else { val.0.join(", ") - }))) + }))]) } } @@ -178,22 +179,19 @@ impl Jq { impl Node for Jq { fn run(&self, _ctx: &dyn HttpContext, input: &Input) -> State { match self.exec(input.data) { - Ok(mut results) => { - State::Done(match results.len() { + Ok(results) => { + match results.len() { // empty - 0 => None, - - // single - 1 => { - let Some(item) = results.pop() else { - unreachable!(); - }; - Some(Payload::Json(item)) - } - - // more than one, return as an array - _ => Some(Payload::Json(results.into())), - }) + 0 => State::Done(vec![None]), + + // one or more + _ => State::Done( + results + .into_iter() + .map(|item| Some(Payload::Json(item))) + .collect(), + ), + } } Err(errs) => errs.into(), } @@ -202,16 +200,35 @@ impl Node for Jq { pub struct JqFactory {} +fn sanitize_jq_inputs(inputs: &[String]) -> Vec { + // TODO: this is a minimal implementation. + // Ideally we need to validate input names into valid jq variables + inputs + .iter() + .map(|input| input.replace('.', "_").replace('$', "")) + .collect() +} + impl NodeFactory for JqFactory { + fn default_input_ports(&self) -> PortConfig { + Default::default() + } + + fn default_output_ports(&self) -> PortConfig { + Default::default() + } + fn new_config( &self, _name: &str, inputs: &[String], + outputs: &[String], bt: &BTreeMap, ) -> Result, String> { Ok(Box::new(JqConfig { jq: get_config_value(bt, "jq").unwrap_or(".".to_string()), - inputs: inputs.to_vec(), + inputs: sanitize_jq_inputs(inputs), + _outputs: outputs.to_vec(), })) } diff --git a/src/nodes/response.rs b/src/nodes/response.rs index 024fe45..afd2b74 100644 --- a/src/nodes/response.rs +++ b/src/nodes/response.rs @@ -7,7 +7,7 @@ use std::sync::atomic::Ordering::Relaxed; use crate::config::get_config_value; use crate::data::{Input, Phase, State, State::*}; -use crate::nodes::{Node, NodeConfig, NodeFactory}; +use crate::nodes::{Node, NodeConfig, NodeDefaultLink, NodeFactory, PortConfig}; use crate::payload; use crate::payload::Payload; @@ -33,8 +33,19 @@ impl NodeConfig for ResponseConfig { self } - fn default_outputs(&self) -> Option> { - Some(vec!["response_body".to_string()]) + fn default_outputs(&self) -> Option> { + Some(vec![ + NodeDefaultLink { + this_port: "body".into(), + other_node: "response".into(), + other_port: "body".into(), + }, + NodeDefaultLink { + this_port: "headers".into(), + other_node: "response".into(), + other_port: "headers".into(), + }, + ]) } } @@ -80,7 +91,7 @@ impl Node for Response { let body_slice = match payload::to_pwm_body(body) { Ok(slice) => slice, - Err(e) => return Fail(Some(Payload::Error(e))), + Err(e) => return Fail(vec![Some(Payload::Error(e))]), }; if input.phase == Phase::HttpResponseBody { @@ -96,17 +107,26 @@ impl Node for Response { ctx.send_http_response(status, headers_vec, body_slice.as_deref()); } - Done(None) + Done(vec![None]) } } pub struct ResponseFactory {} impl NodeFactory for ResponseFactory { + fn default_input_ports(&self) -> PortConfig { + Default::default() + } + + fn default_output_ports(&self) -> PortConfig { + Default::default() + } + fn new_config( &self, name: &str, _inputs: &[String], + _outputs: &[String], bt: &BTreeMap, ) -> Result, String> { Ok(Box::new(ResponseConfig { diff --git a/src/nodes/template.rs b/src/nodes/template.rs index 45927f3..c16f24f 100644 --- a/src/nodes/template.rs +++ b/src/nodes/template.rs @@ -6,7 +6,7 @@ use std::collections::BTreeMap; use crate::config::get_config_value; use crate::data::{Input, State}; -use crate::nodes::{Node, NodeConfig, NodeFactory}; +use crate::nodes::{Node, NodeConfig, NodeFactory, PortConfig}; use crate::payload::Payload; #[derive(Clone, Debug)] @@ -83,13 +83,13 @@ impl Node for Template<'_> { Ok(output) => { log::debug!("output: {output}"); match Payload::from_bytes(output.into(), Some(&self.config.content_type)) { - p @ Some(Payload::Error(_)) => State::Fail(p), - p => State::Done(p), + p @ Some(Payload::Error(_)) => State::Fail(vec![p]), + p => State::Done(vec![p]), } } - Err(err) => State::Fail(Some(Payload::Error(format!( + Err(err) => State::Fail(vec![Some(Payload::Error(format!( "error rendering template: {err}" - )))), + )))]), } } } @@ -97,10 +97,22 @@ impl Node for Template<'_> { pub struct TemplateFactory {} impl NodeFactory for TemplateFactory { + fn default_input_ports(&self) -> PortConfig { + Default::default() + } + + fn default_output_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["output"]), + user_defined_ports: false, + } + } + fn new_config( &self, _name: &str, inputs: &[String], + _outputs: &[String], bt: &BTreeMap, ) -> Result, String> { Ok(Box::new(TemplateConfig {