diff --git a/Cargo.lock b/Cargo.lock index 4350006..4e9675d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,7 @@ dependencies = [ name = "datakit" version = "0.1.1" dependencies = [ + "derivative", "handlebars", "jaq-core", "jaq-interpret", @@ -112,6 +113,17 @@ dependencies = [ "url", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "digest" version = "0.10.7" @@ -362,7 +374,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn", + "syn 2.0.74", ] [[package]] @@ -465,7 +477,7 @@ checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.74", ] [[package]] @@ -491,6 +503,17 @@ dependencies = [ "digest", ] +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.74" @@ -519,7 +542,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.74", ] [[package]] @@ -616,5 +639,5 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.74", ] diff --git a/Cargo.toml b/Cargo.toml index 4f70935..ab5cd26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,3 +22,5 @@ jaq-interpret = "1.2.1" jaq-parse = "1.0.2" jaq-core = "1.2.1" jaq-std = "1.2.1" +derivative = "2.2.0" + diff --git a/datakit.meta.json b/datakit.meta.json index 2f65edc..a7c62b3 100644 --- a/datakit.meta.json +++ b/datakit.meta.json @@ -11,9 +11,21 @@ "type": { "type": "string" }, "name": { "type": "string" }, "input": { "type": "string" }, - "inputs": { "type": "array", "items": { "type": "string" } }, + "inputs": { + "oneOf": [ + { "type": "array", "items": { "type": "string" } }, + { "type": "array", "items": { "type": "object", "additionalProperties": { "type": "string" } } }, + { "type": "object", "additionalProperties": { "type": "string" } } + ] + }, "output": { "type": "string" }, - "outputs": { "type": "array", "items": { "type": "string" } } + "outputs": { + "oneOf": [ + { "type": "array", "items": { "type": "string" } }, + { "type": "array", "items": { "type": "object", "additionalProperties": { "type": "string" } } }, + { "type": "object", "additionalProperties": { "type": "string" } } + ] + } } } } diff --git a/docs/datakit.md b/docs/datakit.md index 6fe47ce..74dd66c 100644 --- a/docs/datakit.md +++ b/docs/datakit.md @@ -16,41 +16,148 @@ The data types are based on those of [serde-json], so representable value types ## The execution model +Nodes can have input ports and output ports. +Input ports consume data. Output ports produce data. + +You can link one node's output port to another node's input port. +An input port can receive at most one link, that is, data can only arrive +into an input via one other node. Therefore, there are no race conditions. + +An output port can be linked to multiple nodes. Therefore, one node can +provide data to several other nodes. + Each node triggers at most once. -A node only triggers when all its inputs are available. +A node only triggers when data is available to all its connected input ports; +that is, only when all nodes connected to its inputs have finished +executing. ## Node types The following node types are implemented: -* `call`: an HTTP dispatch call -* `template`: application of a string template -* `response`: trigger a direct response, rather than forwarding a proxied response +**Node type** | **Input ports** | **Output ports** | **Supported attributes** +--------------------:|:-----------------:|:-----------------:|:----------------------------- +`call | `body`, `headers` | `body`, `headers` | `url`, `method`, `timeout` +`jq` | user-defined | user-defined | `jq` +`handlebars` | user-defined | `output` | `template`, `content_type` +`exit` | `body`, `headers` | | `status` + +### `call` node type + +An HTTP dispatch call. + +#### Input ports: + +* `body`: body to use in the dispatch request. +* `headers`: headers to use in the dispatch request. + +#### Output ports: + +* `body`: body returned as the dispatch response. +* `headers`: headers returned as the dispatch response. + +#### Supported attributes: + +* `url`: the URL to use when dispatching. +* `method`: the HTTP method (default is `GET`). +* `timeout`: the dispatch timeout, in seconds (default is 60). + +### `jq` node type + +Execution of a JQ script for processing JSON. The JQ script is processed +using the [jaq] implementation of the JQ language. + +#### Input ports: + +User-defined. Each input port declared by the user will correspond to a +variable in the JQ execution context. A user can declare the name of the port +explicitly, which is the name of the variable. If a port does not have a given +name, it will get a default name based on the peer node and port to which it +is connected, and the name will be normalized into a valid variable name (e.g. +by replacing `.` to `_`). + +#### Output ports: + +User-defined. When the JQ script produces a JSON value, that is made available +in the first output port of the node. If the JQ script produces multiple JSON +values, each value will be routed to a separate output port. + +#### Supported attributes: + +* `jq`: the JQ script to execute when the node is triggered. + +### `handlebars` node type + +Application of a [Handlebars] template on a raw string, useful for producing +arbitrary non-JSON content types. + +#### Input ports: + +User-defined. Each input port declared by the user will correspond to a +variable in the Handlebars execution context. A user can declare the name of +the port explicitly, which is the name of the variable. If a port does not +have a given name, it will get a default name based on the peer node and port +to which it is connected, and the name will be normalized into a valid +variable name (e.g. by replacing `.` to `_`). + +#### Output ports: + +* `output`: the rendered template. The output payload will be in raw string + format, unless an alternative `content_type` triggers a conversion. + +#### Supported attributes: + +* `template`: the Handlebars template to apply when the node is triggered. +* `content_type`: if set to a MIME type that matches one of DataKit's + supported payload types, such as `application/json`, the output payload will + be converted to that format, making its contents available for further + processing by other nodes (default is `text/plain`, which produces a raw + string). + +### `exit` node type + +Trigger an early exit that produces a direct response, rather than forwarding +a proxied response. + +#### Input ports: + +* `body`: body to use in the early-exit response. +* `headers`: headers to use in the early-exit response. + +#### Output ports: + +None. + +#### Supported attributes: + +* `status`: the HTTP status code to use in the early-exit response (default is + 200). ## Implicit nodes -DataKit defines a number of implicit nodes that can be used as inputs or outputs without being -explicitly declared. These reserved node names cannot be used for user-defined nodes. These are: - -**Name** | **Usage** | **Description** ----------------------------:|:--------------:|:------------------ -`request_headers` | as input only | headers from the incoming request -`request_body` | as input only | body of the incoming request -`service_request_headers` | as output only | headers to be sent to the service being proxied to -`service_request_body` | as output only | body to be sent to the service being proxied to -`service_response_headers` | as input only | headers from the response sent by the service being proxied to -`service_response_body` | as input only | body of the response sent by the service being proxied to -`response_headers` | as output only | headers to be sent as a response to the incoming request -`response_body` | as output only | body to be sent as a response to the incoming request - -The `_headers` nodes produce maps from header names to their values. +DataKit defines a number of implicit nodes that can be used without being +explicitly declared. These reserved node names cannot be used for user-defined +nodes. These are: + +**Node** | **Input ports** | **Output ports** | **Description** +--------------------:|:-----------------:|:-----------------:|:------------------ +`request` | | `body`, `headers` | the incoming request +`service_request` | `body`, `headers` | | request sent to the service being proxied to +`service_response` | | `body`, `headers` | response sent by the service being proxied to +`response` | `body`, `headers` | | response to be sent to the incoming request + +The `headers` ports produce and consume maps from header names to their values. Keys are header names are normalized to lowercase. Values are strings if there is a single instance of a header, or arrays of strings if there are multiple instances of the same header. -The `_body` nodes produce either raw strings or JSON objects, depending on their corresponding -`Content-Type` values. +The `body` output ports produce either raw strings or JSON objects, +depending on their corresponding `Content-Type` values. + +Likewise, the `body` input ports accept either raw strings or JSON objects, +and both their `Content-Type` and `Content-Length` are automatically adjusted, +according to the type and size of the incoming data. ## Debugging @@ -69,3 +176,5 @@ as normal. Any other value will enable debug tracing. --- [serde-json]: https://docs.rs/serde_json/latest/serde_json/ +[Handlebars]: https://docs.rs/handlebars/latest/handlebars/ +[jaq]: https://lib.rs/crates/jaq diff --git a/examples/call_headers/config/demo.yml b/examples/call_headers/config/demo.yml new file mode 100644 index 0000000..4194b8b --- /dev/null +++ b/examples/call_headers/config/demo.yml @@ -0,0 +1,33 @@ +_format_version: "3.0" +services: +- url: http://127.0.0.1:8001 + name: my-service + routes: + - name: my-route + paths: + - / + strip_path: true + filter_chains: + - filters: + - name: datakit + config: + debug: true + nodes: + - name: MY_HEADERS + type: jq + inputs: + - req: request.headers + jq: | + { + "X-My-Call-Header": $req.apikey // "default value" + } + - name: CALL + type: call + inputs: + - headers: MY_HEADERS + url: https://httpbin.konghq.com/anything + - name: EXIT + type: exit + inputs: + - body: CALL.body + status: 200 diff --git a/test/demo.sh b/examples/call_headers/demo.sh similarity index 76% rename from test/demo.sh rename to examples/call_headers/demo.sh index 05b5508..1bd4a5e 100755 --- a/test/demo.sh +++ b/examples/call_headers/demo.sh @@ -26,7 +26,7 @@ fi message "Building the filter using cargo..." ( - cd .. + cd ../.. cargo build --target=wasm32-wasip1 --release || exit 1 ) || exit 1 @@ -34,8 +34,8 @@ message "Building the filter using cargo..." mkdir -p wasm -cp -a ../target/wasm32-wasip1/release/*.wasm wasm/ -cp ../*.meta.json wasm/ +cp -a ../../target/wasm32-wasip1/release/*.wasm wasm/ +cp ../../*.meta.json wasm/ script_dir=$(dirname $(realpath $0)) @@ -46,18 +46,11 @@ message "Setting up the Kong Gateway container..." docker stop $DEMO_KONG_CONTAINER docker rm $DEMO_KONG_CONTAINER -# Config trick to access localhost in a local Docker test, -# in case you want to edit your config/demo.yml to target -# a localhost server rather than httpbin.org: -# -# access_localhost="--add-host=host.docker.internal:$(ip -j address | jq -r '[ .[] | select(.ifname | test("^[ew]")) | .addr_info[] | select(.family == "inet") | .local ][0]')" -access_localhost="" - docker run -d --name "$DEMO_KONG_CONTAINER" \ $access_localhost \ -v "$script_dir/config:/kong/config/" \ -v "$script_dir/wasm:/wasm" \ - -e "KONG_LOG_LEVEL=info" \ + -e "KONG_LOG_LEVEL=debug" \ -e "KONG_DATABASE=off" \ -e "KONG_DECLARATIVE_CONFIG=/kong/config/demo.yml" \ -e "KONG_NGINX_WASM_SHM_KV_DATAKIT=12m" \ @@ -85,15 +78,12 @@ sleep 5 message "Now let's send a request to see the filter in effect:" -http :8000/anything -http :8000/anything -http :8000/anything -http :8000/anything -http :8000/anything -http :8000/anything +http :8000/ +http :8000/ apikey:mykey +http :8000/ apikey:mykey x-datakit-debug-trace:1 message "Finishing up!" -docker stop $DEMO_KONG_CONTAINER +#docker stop $DEMO_KONG_CONTAINER #docker rm $DEMO_KONG_CONTAINER diff --git a/examples/call_headers/reconfigure.sh b/examples/call_headers/reconfigure.sh new file mode 100755 index 0000000..88fe3f7 --- /dev/null +++ b/examples/call_headers/reconfigure.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +http :8001/config config=@config/demo.yml + diff --git a/examples/call_headers/reset.sh b/examples/call_headers/reset.sh new file mode 100755 index 0000000..37de6e2 --- /dev/null +++ b/examples/call_headers/reset.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +rm -rf wasm diff --git a/examples/facts/config/demo.yml b/examples/facts/config/demo.yml new file mode 100644 index 0000000..da8c514 --- /dev/null +++ b/examples/facts/config/demo.yml @@ -0,0 +1,36 @@ +_format_version: "3.0" +services: +- url: http://127.0.0.1:8001/ + name: my-service + routes: + - name: my-route + paths: + - / + strip_path: true + filter_chains: + - filters: + - name: datakit + config: + debug: true + nodes: + - name: CAT_FACT + type: call + url: https://catfact.ninja/fact + - name: CHUCK_NORRIS_FACT + type: call + url: https://api.chucknorris.io/jokes/random + - name: JOIN + type: jq + inputs: + - cat: CAT_FACT.body + - chuck: CHUCK_NORRIS_FACT.body + jq: | + { + "cat_fact": $cat.fact, + "chuck_norris_fact": $chuck.value + } + - name: EXIT + type: exit + inputs: + - body: JOIN + status: 200 diff --git a/examples/facts/demo.sh b/examples/facts/demo.sh new file mode 100755 index 0000000..fa86a27 --- /dev/null +++ b/examples/facts/demo.sh @@ -0,0 +1,87 @@ +#!/usr/bin/env bash +set -x + +DEMO_KONG_CONTAINER="${DEMO_KONG_CONTAINER:-kong-wasm}" +DEMO_KONG_IMAGE="${DEMO_KONG_IMAGE:-kong/kong:nightly}" + +function message() { + set +x + echo "----------------------------------------------------------------------" + echo $1 + echo "----------------------------------------------------------------------" + set -x +} + +################################################################################ + +if [[ "$1" == "stop" ]] +then + docker stop $DEMO_KONG_CONTAINER + docker rm $DEMO_KONG_CONTAINER + exit 0 +fi + +### Build filter ############################################################### + +message "Building the filter using cargo..." + +( + cd ../.. + cargo build --target=wasm32-wasip1 --release || exit 1 +) || exit 1 + +### Copy filter to wasm/ ####################################################### + +mkdir -p wasm + +cp -a ../../target/wasm32-wasip1/release/*.wasm wasm/ +cp ../../*.meta.json wasm/ + +script_dir=$(dirname $(realpath $0)) + +### Start container ############################################################ + +message "Setting up the Kong Gateway container..." + +docker stop $DEMO_KONG_CONTAINER +docker rm $DEMO_KONG_CONTAINER + +docker run -d --name "$DEMO_KONG_CONTAINER" \ + $access_localhost \ + -v "$script_dir/config:/kong/config/" \ + -v "$script_dir/wasm:/wasm" \ + -e "KONG_LOG_LEVEL=debug" \ + -e "KONG_DATABASE=off" \ + -e "KONG_DECLARATIVE_CONFIG=/kong/config/demo.yml" \ + -e "KONG_NGINX_WASM_SHM_KV_DATAKIT=12m" \ + -e "KONG_NGINX_HTTP_PROXY_WASM_ISOLATION=none" \ + -e "KONG_NGINX_WORKER_PROCESSES=2" \ + -e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \ + -e "KONG_PROXY_ERROR_LOG=/dev/stderr" \ + -e "KONG_WASM=on" \ + -e "KONG_WASM_FILTERS_PATH=/wasm" \ + -p 8000:8000 \ + -p 8443:8443 \ + -p 8001:8001 \ + -p 8444:8444 \ + "$DEMO_KONG_IMAGE" + +### Show configuration ######################################################### + +message "This is the configuration loaded into Kong:" + +cat config/demo.yml + +sleep 5 + +### Issue requests ############################################################# + +message "Now let's send a request to see the filter in effect:" + +http :8000/ + +message "Finishing up!" + +#docker stop $DEMO_KONG_CONTAINER +#docker rm $DEMO_KONG_CONTAINER + diff --git a/examples/facts/reconfigure.sh b/examples/facts/reconfigure.sh new file mode 100755 index 0000000..88fe3f7 --- /dev/null +++ b/examples/facts/reconfigure.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +http :8001/config config=@config/demo.yml + diff --git a/examples/facts/reset.sh b/examples/facts/reset.sh new file mode 100755 index 0000000..37de6e2 --- /dev/null +++ b/examples/facts/reset.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +rm -rf wasm diff --git a/examples/request_headers/config/demo.yml b/examples/request_headers/config/demo.yml new file mode 100644 index 0000000..01d3144 --- /dev/null +++ b/examples/request_headers/config/demo.yml @@ -0,0 +1,28 @@ +_format_version: "3.0" +services: +- url: http://httpbin.konghq.com + name: my-service + routes: + - name: my-route + paths: + - / + strip_path: true + filter_chains: + - filters: + - name: datakit + config: + debug: true + nodes: + - name: JOIN + type: jq + inputs: + - req: request.headers + jq: | + { + "key": $req.foo + } + - name: EXIT + type: exit + inputs: + - body: JOIN + status: 200 diff --git a/examples/request_headers/demo.sh b/examples/request_headers/demo.sh new file mode 100755 index 0000000..2be17fa --- /dev/null +++ b/examples/request_headers/demo.sh @@ -0,0 +1,87 @@ +#!/usr/bin/env bash +set -x + +DEMO_KONG_CONTAINER="${DEMO_KONG_CONTAINER:-kong-wasm}" +DEMO_KONG_IMAGE="${DEMO_KONG_IMAGE:-kong/kong:nightly}" + +function message() { + set +x + echo "----------------------------------------------------------------------" + echo $1 + echo "----------------------------------------------------------------------" + set -x +} + +################################################################################ + +if [[ "$1" == "stop" ]] +then + docker stop $DEMO_KONG_CONTAINER + docker rm $DEMO_KONG_CONTAINER + exit 0 +fi + +### Build filter ############################################################### + +message "Building the filter using cargo..." + +( + cd ../.. + cargo build --target=wasm32-wasip1 --release || exit 1 +) || exit 1 + +### Copy filter to wasm/ ####################################################### + +mkdir -p wasm + +cp -a ../../target/wasm32-wasip1/release/*.wasm wasm/ +cp ../../*.meta.json wasm/ + +script_dir=$(dirname $(realpath $0)) + +### Start container ############################################################ + +message "Setting up the Kong Gateway container..." + +docker stop $DEMO_KONG_CONTAINER +docker rm $DEMO_KONG_CONTAINER + +docker run -d --name "$DEMO_KONG_CONTAINER" \ + $access_localhost \ + -v "$script_dir/config:/kong/config/" \ + -v "$script_dir/wasm:/wasm" \ + -e "KONG_LOG_LEVEL=debug" \ + -e "KONG_DATABASE=off" \ + -e "KONG_DECLARATIVE_CONFIG=/kong/config/demo.yml" \ + -e "KONG_NGINX_WASM_SHM_KV_DATAKIT=12m" \ + -e "KONG_NGINX_HTTP_PROXY_WASM_ISOLATION=none" \ + -e "KONG_NGINX_WORKER_PROCESSES=2" \ + -e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \ + -e "KONG_PROXY_ERROR_LOG=/dev/stderr" \ + -e "KONG_WASM=on" \ + -e "KONG_WASM_FILTERS_PATH=/wasm" \ + -p 8000:8000 \ + -p 8443:8443 \ + -p 8001:8001 \ + -p 8444:8444 \ + "$DEMO_KONG_IMAGE" + +### Show configuration ######################################################### + +message "This is the configuration loaded into Kong:" + +cat config/demo.yml + +sleep 5 + +### Issue requests ############################################################# + +message "Now let's send a request to see the filter in effect:" + +http :8000/ foo:value + +message "Finishing up!" + +#docker stop $DEMO_KONG_CONTAINER +#docker rm $DEMO_KONG_CONTAINER + diff --git a/examples/request_headers/reconfigure.sh b/examples/request_headers/reconfigure.sh new file mode 100755 index 0000000..88fe3f7 --- /dev/null +++ b/examples/request_headers/reconfigure.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +http :8001/config config=@config/demo.yml + diff --git a/examples/request_headers/reset.sh b/examples/request_headers/reset.sh new file mode 100755 index 0000000..37de6e2 --- /dev/null +++ b/examples/request_headers/reset.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +rm -rf wasm diff --git a/examples/service_request/config/demo.yml b/examples/service_request/config/demo.yml new file mode 100644 index 0000000..298c35d --- /dev/null +++ b/examples/service_request/config/demo.yml @@ -0,0 +1,41 @@ +_format_version: "3.0" + +services: +- name: demo + url: http://httpbin.konghq.com + routes: + - name: my-route + paths: + - /anything + strip_path: false + methods: + - GET + - POST + filter_chains: + - filters: + - name: datakit + config: + debug: true + nodes: + - name: FIRST + type: call + url: https://api.zippopotam.us/br/93000-000 + - name: MY_HEADERS + type: jq + inputs: + - first: FIRST.body + output: service_request.headers + jq: | + { + "X-Hello": "World", + "X-Foo": "Bar", + "X-Country": $first.country + } + - name: MY_BODY + type: handlebars + content_type: text/plain + inputs: + - first: FIRST.body + output: service_request.body + template: | + Coordinates for {{ first.places.0.[place name] }}, {{ first.places.0.state }}, {{ first.country }} are ({{ first.places.0.latitude }}, {{ first.places.0.longitude }}). diff --git a/examples/service_request/demo.sh b/examples/service_request/demo.sh new file mode 100755 index 0000000..f02c5dd --- /dev/null +++ b/examples/service_request/demo.sh @@ -0,0 +1,87 @@ +#!/usr/bin/env bash +set -x + +DEMO_KONG_CONTAINER="${DEMO_KONG_CONTAINER:-kong-wasm}" +DEMO_KONG_IMAGE="${DEMO_KONG_IMAGE:-kong/kong:nightly}" + +function message() { + set +x + echo "----------------------------------------------------------------------" + echo $1 + echo "----------------------------------------------------------------------" + set -x +} + +################################################################################ + +if [[ "$1" == "stop" ]] +then + docker stop $DEMO_KONG_CONTAINER + docker rm $DEMO_KONG_CONTAINER + exit 0 +fi + +### Build filter ############################################################### + +message "Building the filter using cargo..." + +( + cd ../.. + cargo build --target=wasm32-wasip1 --release || exit 1 +) || exit 1 + +### Copy filter to wasm/ ####################################################### + +mkdir -p wasm + +cp -a ../../target/wasm32-wasip1/release/*.wasm wasm/ +cp ../../*.meta.json wasm/ + +script_dir=$(dirname $(realpath $0)) + +### Start container ############################################################ + +message "Setting up the Kong Gateway container..." + +docker stop $DEMO_KONG_CONTAINER +docker rm $DEMO_KONG_CONTAINER + +docker run -d --name "$DEMO_KONG_CONTAINER" \ + $access_localhost \ + -v "$script_dir/config:/kong/config/" \ + -v "$script_dir/wasm:/wasm" \ + -e "KONG_LOG_LEVEL=debug" \ + -e "KONG_DATABASE=off" \ + -e "KONG_DECLARATIVE_CONFIG=/kong/config/demo.yml" \ + -e "KONG_NGINX_WASM_SHM_KV_DATAKIT=12m" \ + -e "KONG_NGINX_HTTP_PROXY_WASM_ISOLATION=none" \ + -e "KONG_NGINX_WORKER_PROCESSES=2" \ + -e "KONG_PROXY_ACCESS_LOG=/dev/stdout" \ + -e "KONG_PROXY_ERROR_LOG=/dev/stderr" \ + -e "KONG_WASM=on" \ + -e "KONG_WASM_FILTERS_PATH=/wasm" \ + -p 8000:8000 \ + -p 8443:8443 \ + -p 8001:8001 \ + -p 8444:8444 \ + "$DEMO_KONG_IMAGE" + +### Show configuration ######################################################### + +message "This is the configuration loaded into Kong:" + +cat config/demo.yml + +sleep 5 + +### Issue requests ############################################################# + +message "Now let's send a request to see the filter in effect:" + +http POST :8000/anything foo=bar + +message "Finishing up!" + +#docker stop $DEMO_KONG_CONTAINER +#docker rm $DEMO_KONG_CONTAINER + diff --git a/examples/service_request/reconfigure.sh b/examples/service_request/reconfigure.sh new file mode 100755 index 0000000..88fe3f7 --- /dev/null +++ b/examples/service_request/reconfigure.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash + +http :8001/config config=@config/demo.yml + diff --git a/examples/service_request/reset.sh b/examples/service_request/reset.sh new file mode 100755 index 0000000..37de6e2 --- /dev/null +++ b/examples/service_request/reset.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +rm -rf wasm diff --git a/src/config.rs b/src/config.rs index db3b331..36ffccf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,37 +1,225 @@ use crate::nodes; -use crate::nodes::{NodeConfig, NodeMap}; +use crate::nodes::{NodeConfig, NodeVec}; use crate::DependencyGraph; -use lazy_static::lazy_static; +use derivative::Derivative; use serde::de::{Error, MapAccess, Visitor}; use serde::{Deserialize, Deserializer}; use serde_json::Value; use serde_json_wasm::de; +use std::cmp::Ordering; use std::collections::BTreeMap; -use std::collections::HashSet; -use std::fmt; - -lazy_static! { - static ref RESERVED_NODE_NAMES: HashSet<&'static str> = [ - "request_headers", - "request_body", - "service_request_headers", - "service_request_body", - "service_response_headers", - "service_response_body", - "response_headers", - "response_body", - ] - .iter() - .copied() - .collect(); +use std::fmt::{self, Formatter}; + +pub struct ImplicitNode { + name: String, + node_type: String, +} + +impl ImplicitNode { + pub fn new(name: &str, node_type: &str) -> ImplicitNode { + ImplicitNode { + name: name.into(), + node_type: node_type.into(), + } + } +} + +#[derive(PartialEq, Debug)] +struct UserNodePort { + node: Option, + port: Option, +} + +impl std::fmt::Display for UserNodePort { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "{}.{}", + self.node.as_deref().unwrap_or(""), + self.port.as_deref().unwrap_or("") + ) + } +} + +#[derive(PartialEq, Debug)] +struct UserLink { + from: UserNodePort, + to: UserNodePort, } -pub struct UserNodeConfig { +#[derive(PartialEq, Debug)] +struct UserNodeDesc { node_type: String, name: String, +} + +#[derive(PartialEq, Debug)] +struct UserNodeConfig { + desc: UserNodeDesc, bt: BTreeMap, - inputs: Vec, - outputs: Vec, + links: Vec, + n_inputs: usize, + n_outputs: usize, + named_ins: Vec, + named_outs: Vec, +} + +impl UserLink { + pub fn new( + from_node: Option, + from_port: Option, + to_node: Option, + to_port: Option, + ) -> Self { + UserLink { + from: UserNodePort { + node: from_node, + port: from_port, + }, + to: UserNodePort { + node: to_node, + port: to_port, + }, + } + } + + pub fn new_reverse( + from_node: Option, + from_port: Option, + to_node: Option, + to_port: Option, + ) -> Self { + UserLink { + from: UserNodePort { + node: to_node, + port: to_port, + }, + to: UserNodePort { + node: from_node, + port: from_port, + }, + } + } + + fn accept_port_name(port: &String, ports: &mut Vec, user: bool) -> bool { + if ports.contains(port) { + true + } else if user { + ports.push(port.into()); + true + } else { + false + } + } + + fn get_or_create_output( + np: &UserNodePort, + outs: &mut Vec, + user: bool, + ) -> Result { + // If out ports list has a first port declared + // (either explicitly or implicitly), use it + if let Some(&port) = outs.first().as_ref() { + Ok(port.into()) + } else if user { + let new_port = make_port_name(np)?; + + // otherwise the if outs.first() would have returned it + assert!(!outs.contains(&new_port)); + + outs.push(new_port.clone()); + Ok(new_port) + } else { + Err("node in link has no output ports".into()) + } + } + + fn create_or_get_input( + np: &UserNodePort, + ins: &mut Vec, + user: bool, + n: usize, + ) -> Result { + if user { + let new_port = make_port_name(np)?; + if ins.contains(&new_port) { + return Err(format!("duplicated input port {new_port}")); + } + ins.push(new_port.clone()); + Ok(new_port.clone()) + } else if let Some(&port) = ins.get(n - 1).as_ref() { + Ok(port.into()) + } else { + Err(format!( + "too many inputs declared (node type supports {} inputs)", + ins.len() + )) + } + } + + fn resolve_port_names( + self: &mut UserLink, + src: &mut PortInfo, + dst: &mut PortInfo, + n_ins: usize, + ) -> Result<(), String> { + let mut from_port = None; + let mut to_port = None; + + let outs = &mut src.outs; + let user_outs = src.user_outs; + let ins = &mut dst.ins; + let user_ins = dst.user_ins; + + match &self.from.port { + Some(port) => { + if !Self::accept_port_name(port, outs, user_outs) { + return Err(format!("invalid output port name {port}")); + } + } + None => { + from_port = Some(Self::get_or_create_output(&self.to, outs, user_outs)?); + } + } + + match &self.to.port { + Some(port) => { + if !Self::accept_port_name(port, ins, user_ins) { + return Err(format!("invalid input port name {port}")); + } + } + None => { + to_port = Some(Self::create_or_get_input(&self.from, ins, user_ins, n_ins)?); + } + } + + // assign in the end, so that the input and output resolution + // are not affected by the order of links when calling make_port_name + if from_port.is_some() { + self.from.port = from_port; + } + if to_port.is_some() { + self.to.port = to_port; + } + assert!(self.from.port.is_some()); + assert!(self.to.port.is_some()); + + Ok(()) + } +} + +fn parse_node_port(value: String) -> (Option, Option) { + let trim = value.trim().to_string(); + + if let Some(dot) = trim.find('.') { + let (node, port) = trim.split_at(dot); + ( + Some(node.trim().to_string()), + Some(port[1..].trim().to_string()), + ) + } else { + (Some(trim), None) + } } impl<'a> Deserialize<'a> for UserNodeConfig { @@ -55,8 +243,9 @@ impl<'a> Deserialize<'a> for UserNodeConfig { let mut bt = BTreeMap::new(); let mut typ: Option = None; let mut name: Option = None; - let mut inputs = Vec::new(); - let mut outputs = Vec::new(); + let mut links: Vec = Vec::new(); + let mut named_ins: Vec = Vec::new(); + let mut named_outs: Vec = Vec::new(); while let Some(key) = map.next_key::()? { match key.as_str() { "type" => { @@ -70,27 +259,27 @@ impl<'a> Deserialize<'a> for UserNodeConfig { } } "input" => { - if let Ok(serde_json::Value::String(value)) = map.next_value() { - inputs.push(value); + if let Ok(serde_json::Value::String(node_port)) = map.next_value() { + let (node, port) = parse_node_port(node_port); + links.push(UserLink::new(node, port, None, None)); } } "inputs" => { - if let Ok(values) = map.next_value() { - if let Ok(v) = serde_json::from_value::>(values) { - inputs = v; - } + if let Ok(v) = map.next_value::() { + read_links(&mut links, v, &mut named_ins, &UserLink::new) + .map_err(Error::custom::<&str>)?; } } "output" => { if let Ok(serde_json::Value::String(value)) = map.next_value() { - outputs.push(value); + let (node, port) = parse_node_port(value); + links.push(UserLink::new(None, None, node, port)); } } "outputs" => { - if let Ok(values) = map.next_value() { - if let Ok(v) = serde_json::from_value::>(values) { - outputs = v; - } + if let Ok(v) = map.next_value::() { + read_links(&mut links, v, &mut named_outs, &UserLink::new_reverse) + .map_err(Error::custom::<&str>)?; } } _ => { @@ -101,14 +290,30 @@ impl<'a> Deserialize<'a> for UserNodeConfig { } } + let name = name.unwrap_or_else(|| format!("{:p}", &bt)); + + let mut n_inputs = 0; + let mut n_outputs = 0; + for link in &mut links { + if link.to.node.is_none() { + link.to.node = Some(name.clone()); + n_inputs += 1; + } + if link.from.node.is_none() { + link.from.node = Some(name.clone()); + n_outputs += 1; + } + } + if let Some(node_type) = typ { - let name = name.unwrap_or_else(|| format!("{:p}", &bt)); Ok(UserNodeConfig { - node_type, - name, + desc: UserNodeDesc { node_type, name }, bt, - inputs, - outputs, + links, + n_inputs, + n_outputs, + named_ins, + named_outs, }) } else { Err(Error::missing_field("type")) @@ -120,97 +325,321 @@ impl<'a> Deserialize<'a> for UserNodeConfig { } } -#[derive(Deserialize, Default)] +fn read_links( + links: &mut Vec, + value: Value, + named: &mut Vec, + ctor: &impl Fn(Option, Option, Option, Option) -> UserLink, +) -> Result<(), &'static str> { + match value { + Value::Object(map) => { + for (my_port, v) in map { + named.push(my_port.clone()); + + let Value::String(node_port) = v else { + return Err("invalid map value"); + }; + + let (node, port) = parse_node_port(node_port); + links.push(ctor(node, port, None, Some(my_port))); + } + } + + Value::Array(vec) => { + for v in vec { + match v { + Value::Object(map) => { + read_links(links, map.into(), named, ctor)?; + } + + Value::String(node_port) => { + let (node, port) = parse_node_port(node_port); + links.push(ctor(node, port, None, None)); + } + + _ => { + return Err("invalid list value"); + } + } + } + } + + _ => return Err("invalid object"), + } + Ok(()) +} + +#[derive(Deserialize, Default, PartialEq, Debug)] pub struct UserConfig { nodes: Vec, #[serde(default)] debug: bool, } +#[derive(Derivative)] +#[derivative(PartialEq, Debug)] struct NodeInfo { name: String, node_type: String, + #[derivative(PartialEq = "ignore")] + #[derivative(Debug = "ignore")] node_config: Box, } +#[derive(PartialEq, Debug)] pub struct Config { + n_nodes: usize, + n_implicits: usize, node_list: Vec, - node_names: Vec, graph: DependencyGraph, debug: bool, } -fn add_default_connections(unc: &UserNodeConfig, nc: &dyn NodeConfig, graph: &mut DependencyGraph) { - let name: &str = &unc.name; - if unc.inputs.is_empty() { +struct PortInfo { + ins: Vec, + outs: Vec, + user_ins: bool, + user_outs: bool, +} + +fn add_default_links( + name: &str, + n_inputs: usize, + n_outputs: usize, + links: &mut Vec, + nc: &dyn NodeConfig, +) { + if n_inputs == 0 { if let Some(default_inputs) = nc.default_inputs() { for input in &default_inputs { - graph.add(input, name) + links.push(UserLink { + from: UserNodePort { + node: Some(input.other_node.clone()), + port: Some(input.other_port.clone()), + }, + to: UserNodePort { + node: Some(name.into()), + port: Some(input.this_port.clone()), + }, + }); } } } - if unc.outputs.is_empty() { + if n_outputs == 0 { if let Some(default_outputs) = nc.default_outputs() { for output in &default_outputs { - graph.add(name, output) + links.push(UserLink { + from: UserNodePort { + node: Some(name.into()), + port: Some(output.this_port.clone()), + }, + to: UserNodePort { + node: Some(output.other_node.clone()), + port: Some(output.other_port.clone()), + }, + }); } } } } -impl Config { - pub fn new(config_bytes: Vec) -> Result { - match de::from_slice::(&config_bytes) { - Ok(user_config) => { - let mut node_list = Vec::new(); - let mut node_names = Vec::new(); - let mut graph: DependencyGraph = Default::default(); +fn make_port_name(np: &UserNodePort) -> Result { + Ok(match (&np.node, &np.port) { + (Some(n), Some(p)) => format!("{n}.{p}"), + (Some(n), None) => n.into(), + (None, _) => return Err("could not resolve a name".into()), + }) +} - for unc in &user_config.nodes { - let name: &str = &unc.name; +fn err_at_node(desc: &UserNodeDesc, e: &str) -> String { + let name = &desc.name; + let nt = &desc.node_type; + format!("in node `{name}` of type `{nt}`: {e}") +} - if RESERVED_NODE_NAMES.contains(name) { - return Err(format!("cannot use reserved node name '{name}'")); - } +fn get_link_str(o: &Option, _name: &str) -> Result { + o.as_ref() + .ok_or_else(|| "bad link definition in node {_name}".into()) + .cloned() +} - node_names.push(name.to_string()); - for input in &unc.inputs { - graph.add(input, name); - } - for output in &unc.outputs { - graph.add(name, output); - } - } +impl PortInfo { + fn new(node_type: &str, named_ins: &[String], named_outs: &[String]) -> Self { + let ins_pc = nodes::default_input_ports(node_type).unwrap(); + let outs_pc = nodes::default_output_ports(node_type).unwrap(); + PortInfo { + user_ins: ins_pc.user_defined_ports, + user_outs: outs_pc.user_defined_ports, + ins: ins_pc.into_port_list(named_ins), + outs: outs_pc.into_port_list(named_outs), + } + } +} - for unc in &user_config.nodes { - let inputs = graph.get_input_names(&unc.name); - match nodes::new_config(&unc.node_type, &unc.name, inputs, &unc.bt) { - Ok(nc) => { - add_default_connections(unc, &*nc, &mut graph); - - node_list.push(NodeInfo { - name: unc.name.to_string(), - node_type: unc.node_type.to_string(), - node_config: nc, - }); - } - Err(err) => { - return Err(err); - } - }; - } +fn node_position(node_names: &[String], np: &UserNodePort) -> Result { + node_names + .iter() + .position(|name: &String| Some(name) == np.node.as_ref()) + .ok_or_else(|| format!("unknown node in link: {}", np)) +} + +fn get_source_dest_ports( + port_list: &mut [PortInfo], + s: usize, + d: usize, +) -> Result<(&mut PortInfo, &mut PortInfo), &'static str> { + match s.cmp(&d) { + Ordering::Less => { + let (ss, ds) = port_list.split_at_mut(s + 1); + Ok((&mut ss[s], &mut ds[d - (s + 1)])) + } + Ordering::Greater => { + let (ds, ss) = port_list.split_at_mut(d + 1); + Ok((&mut ss[s - (d + 1)], &mut ds[d])) + } + Ordering::Equal => Err("node cannot connect to itself"), + } +} + +fn fixup_missing_port_names( + unc: &mut UserNodeConfig, + node_names: &[String], + port_list: &mut [PortInfo], + linked_inputs: &mut [usize], +) -> Result<(), String> { + for link in &mut unc.links { + let s = node_position(node_names, &link.from)?; + let d = node_position(node_names, &link.to)?; + let (src, dst) = get_source_dest_ports(port_list, s, d)?; + + linked_inputs[d] += 1; - Ok(Config { - node_list, - node_names, - graph, - debug: user_config.debug, - }) + link.resolve_port_names(src, dst, linked_inputs[d])?; + } + Ok(()) +} + +fn make_node_info(unc: &mut UserNodeConfig, port_info: &PortInfo) -> Result { + let name = &unc.desc.name; + let node_type = &unc.desc.node_type; + + let nc = nodes::new_config(node_type, name, &port_info.ins, &port_info.outs, &unc.bt)?; + + add_default_links(name, unc.n_inputs, unc.n_outputs, &mut unc.links, &*nc); + + Ok(NodeInfo { + name: name.to_string(), + node_type: node_type.to_string(), + node_config: nc, + }) +} + +fn into_name_lists(ports: Vec) -> (Vec>, Vec>) { + let n = ports.len(); + let mut input_names = Vec::with_capacity(n); + let mut output_names = Vec::with_capacity(n); + for pi in ports.into_iter() { + input_names.push(pi.ins); + output_names.push(pi.outs); + } + (input_names, output_names) +} + +impl UserConfig { + fn into_config(mut self, implicits: &[ImplicitNode]) -> Result { + let p = implicits.len(); + let n = self.nodes.len() + p; + + let mut node_names: Vec = Vec::with_capacity(n); + let mut nodes = Vec::with_capacity(n); + let mut ports = Vec::with_capacity(n); + + // This is performed in several loops to ensure that the resolution + // order for links does not depend on the order of the nodes given + // in the input file. + + for inode in implicits.iter() { + node_names.push(inode.name.clone()); + nodes.push(NodeInfo { + name: inode.name.clone(), + node_type: inode.node_type.clone(), + node_config: Box::new(nodes::implicit::ImplicitConfig {}), + }); + ports.push(PortInfo::new(&inode.node_type, &[], &[])); + } + + for unc in &self.nodes { + let desc = &unc.desc; + let name = &desc.name; + let node_type = &desc.node_type; + + // at this point, node_names contains only the implicit entries + if node_names.iter().any(|n| n == name) { + return Err(err_at_node(desc, "cannot use reserved node name")); } - Err(err) => Err(format!( - "failed parsing configuration: {}: {err}", - String::from_utf8(config_bytes).unwrap() - )), + + if !nodes::is_valid_type(node_type) { + return Err(err_at_node(desc, "unknown node type")); + } + + ports.push(PortInfo::new(node_type, &unc.named_ins, &unc.named_outs)); + } + + for unc in &self.nodes { + let name = &unc.desc.name; + + if node_names.contains(name) { + return Err(format!("multiple definitions of node `{name}`")); + } + + node_names.push(name.into()); + } + + let mut linked_inputs = vec![0; node_names.len()]; + for unc in self.nodes.iter_mut() { + fixup_missing_port_names(unc, &node_names, &mut ports, &mut linked_inputs) + .map_err(|e| err_at_node(&unc.desc, &e))?; + } + + // Now that all user-given links are resolved, + // we can create the user-given nodes + // (which may add default links of their own into implicit nodes) + for (u, unc) in self.nodes.iter_mut().enumerate() { + nodes.push(make_node_info(unc, &ports[u + p]).map_err(|e| err_at_node(&unc.desc, &e))?); + } + + let (input_names, output_names) = into_name_lists(ports); + let mut graph = DependencyGraph::new(node_names, input_names, output_names); + + for unc in &self.nodes { + let name = &unc.desc.name; + for link in &unc.links { + graph.add( + &get_link_str(&link.from.node, name)?, + &get_link_str(&link.from.port, name)?, + &get_link_str(&link.to.node, name)?, + &get_link_str(&link.to.port, name)?, + )?; + } + } + + Ok(Config { + n_nodes: n, + n_implicits: p, + node_list: nodes, + graph, + debug: self.debug, + }) + } +} + +impl Config { + pub fn new(config_bytes: Vec, implicits: &[ImplicitNode]) -> Result { + match de::from_slice::(&config_bytes) { + Ok(user_config) => user_config + .into_config(implicits) + .map_err(|err| format!("failed checking configuration: {err}")), + Err(err) => Err(format!("failed parsing configuration: {err}")), } } @@ -218,8 +647,12 @@ impl Config { self.debug } - pub fn get_node_names(&self) -> &Vec { - &self.node_names + pub fn node_count(&self) -> usize { + self.n_nodes + } + + pub fn number_of_implicits(&self) -> usize { + self.n_implicits } pub fn node_types(&self) -> impl Iterator { @@ -232,19 +665,13 @@ impl Config { &self.graph } - pub fn build_nodes(&self) -> NodeMap { - let mut nodes = NodeMap::new(); + pub fn build_nodes(&self) -> NodeVec { + let mut nodes = NodeVec::with_capacity(self.node_list.len()); for info in &self.node_list { - let name = &info.name; - match nodes::new_node(&info.node_type, &*info.node_config) { - Ok(node) => { - nodes.insert(name.to_string(), node); - } - Err(err) => { - log::error!("{err}"); - } + Ok(node) => nodes.push(node), + Err(err) => log::error!("{err}"), } } @@ -259,3 +686,425 @@ pub fn get_config_value serde::Deserialize<'de>>( bt.get(key) .and_then(|v| serde_json::from_value(v.clone()).ok()) } + +#[cfg(test)] +mod test { + use super::*; + use serde_json::json; + use std::any::Any; + + fn deserialize_user_config(cfg: &str) -> UserConfig { + de::from_slice::(cfg.as_bytes()).unwrap() + } + + #[test] + fn deserialize_empty_nodes() { + let uc = deserialize_user_config( + r#"{ + "nodes": [] + }"#, + ); + assert_eq!( + uc, + UserConfig { + nodes: vec![], + debug: false, + } + ); + } + + #[test] + fn deserialize_complete_example() { + let uc = deserialize_user_config( + r#"{ + "nodes": [ + { + "name": "jq1", + "type": "jq", + "input": "request.headers", + "jq": "{ \"x-bar\": $request_headers[\"x-foo\"] }" + }, + { + "name": "mycall", + "type": "call", + "input": "jq1", + "url": "http://example.com" + }, + { + "name": "jq2", + "type": "jq", + "inputs": { + "$mycall": "mycall", + "$request": "request.body" + }, + "jq": "{ \"bee\": $mycall.bee, \"boo\": $request.boo }" + } + ] + }"#, + ); + assert_eq!( + uc, + UserConfig { + nodes: vec![ + UserNodeConfig { + desc: UserNodeDesc { + node_type: "jq".into(), + name: "jq1".into(), + }, + bt: BTreeMap::from([( + "jq".into(), + json!("{ \"x-bar\": $request_headers[\"x-foo\"] }") + )]), + links: vec![UserLink { + from: UserNodePort { + node: Some("request".into()), + port: Some("headers".into()) + }, + to: UserNodePort { + node: Some("jq1".into()), + port: None + } + }], + n_inputs: 1, + n_outputs: 0, + named_ins: vec![], + named_outs: vec![] + }, + UserNodeConfig { + desc: UserNodeDesc { + node_type: "call".into(), + name: "mycall".into() + }, + bt: BTreeMap::from([("url".to_string(), json!("http://example.com"))]), + links: vec![UserLink { + from: UserNodePort { + node: Some("jq1".into()), + port: None + }, + to: UserNodePort { + node: Some("mycall".into()), + port: None + } + }], + n_inputs: 1, + n_outputs: 0, + named_ins: vec![], + named_outs: vec![] + }, + UserNodeConfig { + desc: UserNodeDesc { + node_type: "jq".into(), + name: "jq2".into() + }, + bt: BTreeMap::from([( + "jq".to_string(), + json!("{ \"bee\": $mycall.bee, \"boo\": $request.boo }") + )]), + links: vec![ + UserLink { + from: UserNodePort { + node: Some("mycall".into()), + port: None + }, + to: UserNodePort { + node: Some("jq2".into()), + port: Some("$mycall".into()) + } + }, + UserLink { + from: UserNodePort { + node: Some("request".into()), + port: Some("body".into()) + }, + to: UserNodePort { + node: Some("jq2".into()), + port: Some("$request".into()) + } + } + ], + n_inputs: 2, + n_outputs: 0, + named_ins: vec!["$mycall".into(), "$request".into()], + named_outs: vec![] + } + ], + debug: false + } + ); + } + + #[test] + fn test_parse_node_port() { + let cases = vec![ + ("", (Some(""), None)), + (" ", (Some(""), None)), + (".", (Some(""), Some(""))), + (". ", (Some(""), Some(""))), + (" . ", (Some(""), Some(""))), + (".foo", (Some(""), Some("foo"))), + (".foo.bar", (Some(""), Some("foo.bar"))), + ("..foo.bar", (Some(""), Some(".foo.bar"))), + (". .foo.bar", (Some(""), Some(".foo.bar"))), + ("f.bar", (Some("f"), Some("bar"))), + ("foo", (Some("foo"), None)), + ("foo.", (Some("foo"), Some(""))), + ("foo.b", (Some("foo"), Some("b"))), + ("foo.b ", (Some("foo"), Some("b"))), + ("foo.bar", (Some("foo"), Some("bar"))), + ("foo . bar", (Some("foo"), Some("bar"))), + ("foo..baz", (Some("foo"), Some(".baz"))), + ("foo.bar.", (Some("foo"), Some("bar."))), + ("foo.bar..", (Some("foo"), Some("bar.."))), + ("foo.bar.baz", (Some("foo"), Some("bar.baz"))), + ("foo.bar baz", (Some("foo"), Some("bar baz"))), + ("foo bar.baz bla", (Some("foo bar"), Some("baz bla"))), + (" foo . bar.baz ", (Some("foo"), Some("bar.baz"))), + ]; + for (node_port, pair) in cases { + assert_eq!( + parse_node_port(node_port.to_owned()), + (pair.0.map(str::to_owned), pair.1.map(str::to_owned)) + ); + } + } + + fn accept_config(cfg: &str) -> Config { + let result = Config::new(cfg.as_bytes().to_vec(), &[]); + + result.unwrap() + } + + fn reject_config_with(cfg: &str, message: &str) { + nodes::register_node("source", Box::new(nodes::implicit::SourceFactory {})); + nodes::register_node("sink", Box::new(nodes::implicit::SinkFactory {})); + let implicits = vec![ + ImplicitNode::new("request", "source"), + ImplicitNode::new("service_request", "sink"), + ImplicitNode::new("service_response", "source"), + ImplicitNode::new("response", "sink"), + ]; + + let result = Config::new(cfg.as_bytes().to_vec(), &implicits); + + let err = result.unwrap_err(); + assert_eq!(err, message); + } + + #[test] + fn config_no_json() { + reject_config_with( + "", + "failed parsing configuration: EOF while parsing a JSON value.", + ) + } + + #[test] + fn config_bad_json() { + reject_config_with( + "{", + "failed parsing configuration: EOF while parsing an object.", + ) + } + + #[test] + fn config_empty_json() { + reject_config_with("{}", "failed parsing configuration: missing field `nodes`") + } + + #[test] + fn config_empty_nodes() { + accept_config( + r#"{ + "nodes": [] + }"#, + ); + } + + #[test] + fn config_missing_type() { + reject_config_with( + r#"{ + "nodes": [ + { + "name": "MY_NODE" + } + ] + }"#, + "failed parsing configuration: missing field `type`", + ) + } + + #[test] + fn config_invalid_type() { + reject_config_with( + r#"{ + "nodes": [ + { + "name": "MY_NODE", + "type": "INVALID" + } + ] + }"#, + "failed checking configuration: in node `MY_NODE` of type `INVALID`: unknown node type", + ) + } + + #[test] + fn config_invalid_name() { + nodes::register_node("jq", Box::new(nodes::jq::JqFactory {})); + reject_config_with( + r#"{ + "nodes": [ + { + "name": "response", + "type": "jq" + } + ] + }"#, + "failed checking configuration: in node `response` of type `jq`: cannot use reserved node name", + ) + } + + #[test] + fn config_invalid_loop() { + nodes::register_node("jq", Box::new(nodes::jq::JqFactory {})); + reject_config_with( + r#"{ + "nodes": [ + { + "name": "MY_NODE", + "type": "jq", + "inputs": { + "input": "MY_NODE" + } + } + ] + }"#, + "failed checking configuration: in node `MY_NODE` of type `jq`: node cannot connect to itself", + ) + } + + struct IgnoreConfig {} + impl NodeConfig for IgnoreConfig { + fn as_any(&self) -> &dyn Any { + self + } + } + + #[test] + fn convert_complete_example() { + let uc = deserialize_user_config( + r#"{ + "nodes": [ + { + "name": "jq1", + "type": "jq", + "input": "request.headers", + "jq": "{ \"x-bar\": $request_headers[\"x-foo\"] }" + }, + { + "name": "mycall", + "type": "call", + "input": "jq1", + "url": "http://example.com" + }, + { + "name": "jq2", + "type": "jq", + "inputs": { + "$mycall": "mycall", + "$request": "request.body" + }, + "jq": "{ \"bee\": $mycall.bee, \"boo\": $request.boo }" + } + ] + }"#, + ); + + nodes::register_node("source", Box::new(nodes::implicit::SourceFactory {})); + nodes::register_node("sink", Box::new(nodes::implicit::SinkFactory {})); + nodes::register_node("call", Box::new(nodes::call::CallFactory {})); + nodes::register_node("jq", Box::new(nodes::jq::JqFactory {})); + + let implicits = vec![ + ImplicitNode::new("request", "source"), + ImplicitNode::new("service_request", "sink"), + ImplicitNode::new("service_response", "source"), + ImplicitNode::new("response", "sink"), + ]; + + let config = uc.into_config(&implicits).unwrap(); + assert!(!config.debug); + assert_eq!(config.n_nodes, 7); + assert_eq!(config.n_implicits, 4); + assert_eq!( + config.node_list, + vec![ + NodeInfo { + name: "request".into(), + node_type: "source".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "service_request".into(), + node_type: "sink".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "service_response".into(), + node_type: "source".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "response".into(), + node_type: "sink".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "jq1".into(), + node_type: "jq".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "mycall".into(), + node_type: "call".into(), + node_config: Box::new(IgnoreConfig {}), + }, + NodeInfo { + name: "jq2".into(), + node_type: "jq".into(), + node_config: Box::new(IgnoreConfig {}), + }, + ] + ); + let input_lists: &[&[Option<(usize, usize)>]] = &[ + &[], + &[None, None, None], + &[], + &[None, None, None], + &[Some((0, 1))], + &[Some((4, 0)), None, None], + &[Some((5, 0)), Some((0, 0))], + ]; + for (i, &input_list) in input_lists.iter().enumerate() { + let given: Vec<_> = input_list.iter().collect(); + let computed: Vec<_> = config.graph.each_input(i).collect(); + assert_eq!(given, computed); + } + + let output_lists: &[&[&[(usize, usize)]]] = &[ + &[&[(6, 1)], &[(4, 0)]], + &[&[], &[]], + &[&[], &[]], + &[&[], &[]], + &[&[(5, 0)]], + &[&[(6, 0)], &[]], + &[], + ]; + for (i, &output_list) in output_lists.iter().enumerate() { + let given: Vec<_> = output_list.iter().collect(); + let computed: Vec<_> = config.graph.each_output(i).collect(); + assert_eq!(given, computed); + } + } +} diff --git a/src/data.rs b/src/data.rs index bc7d69b..bd18e44 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,7 +1,5 @@ -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; - use crate::dependency_graph::DependencyGraph; +use crate::payload::Payload; #[allow(clippy::enum_variant_names)] #[derive(PartialEq, Clone, Copy)] @@ -19,272 +17,177 @@ pub struct Input<'a> { } #[derive(Debug)] -pub enum Payload { - Raw(Vec), - Json(serde_json::Value), - Error(String), +pub enum State { + Waiting(u32), + Done(Vec>), + Fail(Vec>), } -impl Payload { - pub fn content_type(&self) -> Option<&str> { - match &self { - Payload::Json(_) => Some("application/json"), - _ => None, - } - } +pub struct Data { + graph: DependencyGraph, + states: Vec>, +} - pub fn from_bytes(bytes: Vec, content_type: Option<&str>) -> Option { - match content_type { - Some(ct) => { - if ct.contains("application/json") { - match serde_json::from_slice(&bytes) { - Ok(v) => Some(Payload::Json(v)), - Err(e) => Some(Payload::Error(e.to_string())), - } - } else { - Some(Payload::Raw(bytes)) - } - } - _ => None, +fn set_port( + ports: &mut [Option], + port: usize, + payload: Payload, +) -> Result<(), &'static str> { + match &ports[port] { + Some(_) => Err("cannot overwrite a payload"), + None => { + ports[port] = Some(payload); + Ok(()) } } +} - pub fn to_json(&self) -> Result { - match &self { - Payload::Json(value) => Ok(value.clone()), - Payload::Raw(vec) => match std::str::from_utf8(vec) { - Ok(s) => serde_json::to_value(s).map_err(|e| e.to_string()), - Err(e) => Err(e.to_string()), - }, - Payload::Error(e) => Err(e.clone()), - } - } +fn default_vec(n: usize) -> Vec +where + T: Default, +{ + let mut vec = Vec::with_capacity(n); + vec.resize_with(n, Default::default); + vec +} - pub fn to_bytes(&self) -> Result, String> { - match &self { - Payload::Json(value) => match serde_json::to_string(value) { - Ok(s) => Ok(s.into_bytes()), - Err(e) => Err(e.to_string()), - }, - Payload::Raw(s) => Ok(s.clone()), // it would be nice to be able to avoid this copy - Payload::Error(e) => Err(e.clone()), - } +impl Data { + pub fn new(graph: DependencyGraph) -> Data { + let n = graph.number_of_nodes(); + let states = default_vec(n); + Data { graph, states } } - pub fn len(&self) -> Option { - match &self { - Payload::Json(_) => None, - Payload::Raw(s) => Some(s.len()), - Payload::Error(e) => Some(e.len()), - } + pub fn get_node_name(&self, i: usize) -> &str { + self.graph.get_node_name(i).expect("valid index") } - pub fn to_pwm_headers(&self) -> Vec<(&str, &str)> { - match &self { - Payload::Json(value) => { - let mut vec: Vec<(&str, &str)> = vec![]; - if let serde_json::Value::Object(map) = value { - for (k, entry) in map { - match entry { - serde_json::Value::Array(vs) => { - for v in vs { - if let serde_json::Value::String(s) = v { - vec.push((k, s)); - } - } - } - - // accept string values as well - serde_json::Value::String(s) => { - vec.push((k, s)); - } - - _ => {} - } - } - } + pub fn set(&mut self, node: usize, state: State) { + self.states[node] = Some(state); + } - vec - } - _ => { - // TODO - log::debug!("NYI: converting payload into headers vector"); - vec![] + pub fn fill_port( + &mut self, + node: usize, + port: usize, + payload: Payload, + ) -> Result<(), &'static str> { + match &mut self.states[node] { + None => { + let mut ports: Vec> = + default_vec(self.graph.number_of_outputs(node)); + ports[port] = Some(payload); + let state = State::Done(ports); + self.states[node] = Some(state); + Ok(()) } + Some(State::Waiting(_)) => Err("cannot force payload on a waiting node"), + Some(State::Done(ports)) => set_port(ports, port, payload), + Some(State::Fail(ports)) => set_port(ports, port, payload), } } -} - -#[derive(Serialize, Deserialize, Debug)] -#[serde(untagged)] -enum StringOrVec { - String(String), - Vec(Vec), -} -pub fn from_pwm_headers(vec: Vec<(String, String)>) -> Payload { - let mut map = BTreeMap::new(); - for (k, v) in vec { - let lk = k.to_lowercase(); - if let Some(vs) = map.get_mut(&lk) { - match vs { - StringOrVec::String(s) => { - let ss = s.to_string(); - map.insert(lk, StringOrVec::Vec(vec![ss, v])); - } - StringOrVec::Vec(vs) => { - vs.push(v); - } - }; - } else { - map.insert(lk, StringOrVec::String(v)); + pub fn get_state(&self, node: usize) -> Result<&State, &'static str> { + match &self.states[node] { + None => Err("fill_port must have created a state"), + Some(state) => Ok(state), } } - let value = serde_json::to_value(map).expect("serializable map"); - Payload::Json(value) -} - -pub fn to_pwm_headers(payload: Option<&Payload>) -> Vec<(&str, &str)> { - payload.map_or_else(Vec::new, |p| p.to_pwm_headers()) -} - -/// To use this result in proxy-wasm calls as an Option<&[u8]>, use: -/// `data::to_pwm_body(p).as_deref()`. -pub fn to_pwm_body(payload: Option<&Payload>) -> Result>, String> { - match payload { - Some(p) => match p.to_bytes() { - Ok(b) => Ok(Some(Vec::into_boxed_slice(b))), - Err(e) => Err(e), - }, - None => Ok(None), - } -} - -#[derive(Debug)] -pub enum State { - Waiting(u32), - Done(Option), - Fail(Option), -} - -#[derive(Default)] -pub struct Data { - graph: DependencyGraph, - states: BTreeMap, -} - -impl Data { - pub fn new(graph: DependencyGraph) -> Data { - Data { - graph, - states: Default::default(), + pub fn fetch_port(&self, node: usize, port: usize) -> Option<&Payload> { + match self.graph.get_provider(node, port) { + Some((n, p)) => match self.states.get(n).unwrap() { + Some(State::Waiting(_)) => None, + Some(State::Done(ports)) | Some(State::Fail(ports)) => match ports.get(p) { + Some(Some(ref payload)) => Some(payload), + Some(None) => None, + None => None, + }, + None => None, + }, + None => None, } } - pub fn set(&mut self, name: &str, state: State) { - self.states.insert(name.to_string(), state); + fn can_trigger(&self, i: usize, waiting: Option) -> bool { + // This is intentionally written with all of the match arms + // stated explicitly (instead of using _ catch-alls), + // so that the trigger logic and all its states + // are considered by the reader. + match &self.states[i] { + // state was never created, trigger + None => true, + Some(state) => match state { + // never retrigger Done + State::Done(_) => false, + // never retrigger Fail + State::Fail(_) => false, + State::Waiting(w) => match &waiting { + // we're waiting on the right id, allow triggering + Some(id) if w == id => true, + // waiting on something else, skip + Some(_) => false, + // not called from a wait state + None => false, + }, + }, + } } - fn can_trigger(&self, name: &str, waiting: Option) -> bool { - // If node is Done, avoid producing inputs - // and re-triggering its execution. - if let Some(state) = self.states.get(name) { - match state { - State::Done(_) => { - return false; - } - State::Waiting(w) => match &waiting { - Some(id) => { - if w != id { - return false; + fn for_each_input<'a, T>( + &'a self, + i: usize, + f: impl for<'b> Fn(Option<&'a Payload>, &'b mut T), + mut t: T, + ) -> Option { + for input in self.graph.each_input(i) { + // if input port is connected in the graph + match *input { + Some((n, p)) => { + // check if other node is Done + match &self.states[n] { + Some(State::Done(ports)) => { + // check if port has payload available + match &ports[p] { + // ok, has payload + Some(payload) => f(Some(payload), &mut t), + // no payload available + None => return None, + } } + Some(State::Waiting(_)) => return None, + Some(State::Fail(_)) => return None, + None => return None, } - None => return false, - }, - State::Fail(_) => { - return false; } + None => f(None, &mut t), // ok, port is not connected } } - // Check that all inputs have payloads available - for input in self.graph.each_input(name) { - let val = self.states.get(input); - match val { - Some(State::Done(_)) => {} - _ => { - return false; - } - }; - } - - true + Some(t) } pub fn get_inputs_for( &self, - name: &str, + node: usize, waiting: Option, ) -> Option>> { - if !self.can_trigger(name, waiting) { + if !self.can_trigger(node, waiting) { return None; } - // If so, allocate the vector with the result. - let mut vec: Vec> = Vec::new(); - for input in self.graph.each_input(name) { - if let Some(State::Done(p)) = self.states.get(input) { - vec.push(p.as_ref()); - } - } - - Some(vec) - } - - /// If the node is triggerable, that is, it has all its required - /// inputs available to trigger (i.e. none of its inputs are in a - /// `Waiting` state), then return the payload of the first input that - /// is in a `Done state. - /// - /// Note that by returning an `Option<&Payload>` this makes no - /// distinction between the node being not triggerable or the - /// node being triggerable via a `Done(None)` input. - /// - /// This is not an issue because this function is intended for use - /// with the implicit nodes (`response_body`, etc.) which are - /// handled as special cases directly by the filter. - pub fn first_input_for(&self, name: &str, waiting: Option) -> Option<&Payload> { - if !self.can_trigger(name, waiting) { - return None; - } + // Check first that all connected inputs are ready + self.for_each_input(node, |_, _| (), &mut ())?; - for input in self.graph.each_input(name) { - if let Some(State::Done(p)) = self.states.get(input) { - return p.as_ref(); - } - } - - None + // If so, allocate the vector with the result. + let n = self.graph.number_of_inputs(node); + self.for_each_input( + node, + |payload, v: &mut Vec>| match payload { + Some(p) => v.push(Some(p)), + None => v.push(None), + }, + Vec::with_capacity(n), + ) } } - -#[derive(Serialize)] -struct ErrorMessage<'a> { - message: &'a str, - #[serde(skip_serializing_if = "Option::is_none")] - request_id: Option, -} - -pub fn to_json_error_body(message: &str, request_id: Option>) -> String { - serde_json::to_value(ErrorMessage { - message, - request_id: match request_id { - Some(vec) => std::str::from_utf8(&vec).map(|v| v.to_string()).ok(), - None => None, - }, - }) - .ok() - .map(|v| v.to_string()) - .expect("JSON error object") -} diff --git a/src/debug.rs b/src/debug.rs index efcbeaa..a95d12e 100644 --- a/src/debug.rs +++ b/src/debug.rs @@ -1,5 +1,7 @@ use crate::config::Config; -use crate::data::{Payload, State}; +use crate::data::State; +use crate::payload::Payload; + use serde::Serialize; use serde_json::Value; use std::collections::HashMap; @@ -21,11 +23,16 @@ struct RunOperation { action: RunMode, } +#[derive(Serialize)] +struct PortValue { + data_type: String, + value: Option, +} + struct SetOperation { node_name: String, - data_type: String, status: DataMode, - value: Option, + values: Vec, } enum Operation { @@ -50,17 +57,26 @@ impl State { } } -fn payload_to_op_info(p: &Option, default_type: &str) -> (String, Option) { - if let Some(payload) = p { - let dt = payload.content_type().unwrap_or(default_type).to_string(); - - match payload.to_json() { - Ok(v) => (dt, Some(v)), - Err(e) => ("fail".to_string(), Some(serde_json::json!(e))), - } - } else { - ("none".to_string(), None) - } +fn payloads_to_values(payloads: &[Option], default_type: &str) -> Vec { + payloads + .iter() + .map(|p| match p { + Some(payload) => match payload.to_json() { + Ok(v) => PortValue { + data_type: payload.content_type().unwrap_or(default_type).to_string(), + value: Some(v), + }, + Err(e) => PortValue { + data_type: "fail".into(), + value: Some(serde_json::json!(e)), + }, + }, + None => PortValue { + data_type: "none".into(), + value: None, + }, + }) + .collect() } impl Debug { @@ -80,17 +96,14 @@ impl Debug { pub fn set_data(&mut self, name: &str, state: &State) { if self.trace { - let (data_type, value) = match state { - State::Done(p) => payload_to_op_info(p, "raw"), - State::Waiting(_) => ("waiting".to_string(), None), - State::Fail(p) => payload_to_op_info(p, "fail"), - }; - self.operations.push(Operation::Set(SetOperation { node_name: name.to_string(), - data_type, status: state.to_data_mode(), - value, + values: match state { + State::Waiting(_) => vec![], + State::Done(p) => payloads_to_values(p, "raw"), + State::Fail(p) => payloads_to_values(p, "fail"), + }, })); } } @@ -133,7 +146,7 @@ impl Debug { #[serde(skip_serializing_if = "Option::is_none")] r#type: Option<&'a str>, #[serde(skip_serializing_if = "Option::is_none")] - value: Option<&'a Value>, + values: Option<&'a Vec>, } let mut actions: Vec = vec![]; @@ -145,28 +158,28 @@ impl Debug { RunMode::Run => "run", RunMode::Resume => "resume", }, - name: &run.node_name, r#type: Some(&run.node_type), - value: None, + name: &run.node_name, + values: None, }, Operation::Set(set) => match set.status { DataMode::Done => TraceAction { action: "value", name: &set.node_name, - r#type: Some(&set.data_type), - value: set.value.as_ref(), + r#type: None, + values: Some(&set.values), }, DataMode::Waiting => TraceAction { action: "wait", name: &set.node_name, r#type: None, - value: None, + values: None, }, DataMode::Fail => TraceAction { action: "fail", name: &set.node_name, r#type: None, - value: set.value.as_ref(), + values: Some(&set.values), }, }, }); diff --git a/src/dependency_graph.rs b/src/dependency_graph.rs index f08aac8..96f59be 100644 --- a/src/dependency_graph.rs +++ b/src/dependency_graph.rs @@ -1,55 +1,146 @@ -use core::slice::Iter; -use std::collections::BTreeMap; - -#[derive(Default, Clone)] +#[derive(Clone, PartialEq, Debug)] pub struct DependencyGraph { - dependents: BTreeMap>, - providers: BTreeMap>, - empty: Vec, + node_names: Vec, + input_names: Vec>, + output_names: Vec>, + dependents: Vec>>, + providers: Vec>>, } -fn add_to(map: &mut BTreeMap>, key: &str, value: &str) { - match map.get_mut(key) { - Some(key_items) => { - let v = value.to_string(); - if !key_items.contains(&v) { - key_items.push(v); - } - } - None => { - map.insert(key.to_string(), vec![value.to_string()]); - } - }; +pub fn find( + node: &str, + port: &str, + node_names: &[String], + port_names: &[Vec], +) -> (usize, usize) { + let n = node_names + .iter() + .position(|x| x == node) + .expect("node registered in node_names"); + let p = port_names + .get(n) + .expect("valid node index") + .iter() + .position(|x| x == port) + .expect("port registered in port_names"); + (n, p) } impl DependencyGraph { - pub fn add(&mut self, src: &str, dst: &str) { - add_to(&mut self.dependents, src, dst); - add_to(&mut self.providers, dst, src); + pub fn new( + node_names: Vec, + input_names: Vec>, + output_names: Vec>, + ) -> DependencyGraph { + let n = node_names.len(); + let mut dependents = Vec::with_capacity(n); + let mut providers = Vec::with_capacity(n); + for ports in &input_names { + providers.push(vec![None; ports.len()]); + } + for ports in &output_names { + let np = ports.len(); + let mut lists = Vec::with_capacity(np); + lists.resize_with(np, Vec::new); + dependents.push(lists); + } + DependencyGraph { + node_names, + input_names, + output_names, + dependents, + providers, + } } - pub fn has_dependents(&self, name: &str) -> bool { - self.dependents.contains_key(name) + pub fn get_node_name(&self, i: usize) -> Option<&str> { + self.node_names.get(i).map(|o| o.as_ref()) } - pub fn has_providers(&self, name: &str) -> bool { - self.providers.contains_key(name) + pub fn number_of_nodes(&self) -> usize { + self.node_names.len() } - pub fn get_input_names(&self, name: &str) -> &Vec { - if let Some(items) = self.providers.get(name) { - items - } else { - &self.empty - } + pub fn number_of_outputs(&self, node: usize) -> usize { + self.output_names[node].len() + } + + pub fn number_of_inputs(&self, node: usize) -> usize { + self.input_names[node].len() } - pub fn each_input(&self, name: &str) -> Iter { - if let Some(items) = self.providers.get(name) { - items.iter() - } else { - // FIXME is there a better way to do this? - self.empty.iter() + fn add_dependent(&mut self, node: usize, port: usize, entry: (usize, usize)) { + let node_list = &mut self.dependents; + let port_list = node_list.get_mut(node).expect("valid node index"); + let entries_list = port_list.get_mut(port).expect("valid port index"); + entries_list.push(entry); + } + + fn add_provider( + &mut self, + node: usize, + port: usize, + entry: (usize, usize), + ) -> Result<(), String> { + let node_list = &mut self.providers; + let port_list = node_list.get_mut(node).expect("valid node index"); + match *port_list.get(port).expect("valid port index") { + Some((other_n, other_p)) => self.err_already_connected(node, port, other_n, other_p), + None => { + port_list[port] = Some(entry); + Ok(()) + } } } + + fn err_already_connected( + &self, + n: usize, + p: usize, + oth_n: usize, + oth_p: usize, + ) -> Result<(), String> { + let this_node = self.node_names.get(n).expect("valid node"); + let this_port = self.output_names[n].get(p).expect("valid port"); + let other_node = self.node_names.get(oth_n).expect("valid node"); + let other_port = self.output_names[oth_n].get(oth_p).expect("valid port"); + Err(format!( + "{this_node}.{this_port} is already connected to {other_node}.{other_port}" + )) + } + + pub fn add( + &mut self, + src_node: &str, + src_port: &str, + dst_node: &str, + dst_port: &str, + ) -> Result<(), String> { + let (sn, sp) = find(src_node, src_port, &self.node_names, &self.output_names); + let (dn, dp) = find(dst_node, dst_port, &self.node_names, &self.input_names); + self.add_dependent(sn, sp, (dn, dp)); + self.add_provider(dn, dp, (sn, sp)) + } + + pub fn has_dependents(&self, node: usize, port: usize) -> bool { + !self.dependents[node][port].is_empty() + } + + pub fn has_providers(&self, node: usize, port: usize) -> bool { + self.providers[node][port].is_some() + } + + pub fn get_provider(&self, node: usize, port: usize) -> Option<(usize, usize)> { + self.providers[node][port] + } + + pub fn each_input(&self, node: usize) -> std::slice::Iter> { + self.providers[node].iter() + } + + /// used in tests only + #[allow(dead_code)] + pub fn each_output(&self, node: usize) -> std::slice::Iter> { + self.dependents[node].iter() + } } diff --git a/src/filter.rs b/src/filter.rs index 4821cda..6648b91 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,3 +1,4 @@ +use lazy_static::lazy_static; use proxy_wasm::{traits::*, types::*}; use std::rc::Rc; @@ -6,12 +7,55 @@ mod data; mod debug; mod dependency_graph; mod nodes; +mod payload; -use crate::config::Config; -use crate::data::{Data, Input, Payload, Phase, Phase::*, State}; +use crate::config::{Config, ImplicitNode}; +use crate::data::{Data, Input, Phase, Phase::*, State}; use crate::debug::{Debug, RunMode}; use crate::dependency_graph::DependencyGraph; -use crate::nodes::{Node, NodeMap}; +use crate::nodes::{Node, NodeVec}; +use crate::payload::Payload; +use crate::ImplicitNodeId::*; +use crate::ImplicitPortId::*; + +// ----------------------------------------------------------------------------- +// Implicit nodes +// ----------------------------------------------------------------------------- + +#[derive(Copy, Clone)] +enum ImplicitNodeId { + Request = 0, + ServiceRequest = 1, + ServiceResponse = 2, + Response = 3, +} + +impl From for usize { + fn from(n: ImplicitNodeId) -> Self { + n as usize + } +} + +#[derive(Copy, Clone)] +enum ImplicitPortId { + Body = 0, + Headers = 1, +} + +impl From for usize { + fn from(p: ImplicitPortId) -> Self { + p as usize + } +} + +lazy_static! { + static ref IMPLICIT_NODES: Vec = vec![ + ImplicitNode::new("request", "source"), + ImplicitNode::new("service_request", "sink"), + ImplicitNode::new("service_response", "source"), + ImplicitNode::new("response", "sink"), + ]; +} // ----------------------------------------------------------------------------- // Root Context @@ -26,7 +70,7 @@ impl Context for DataKitFilterRootContext {} impl RootContext for DataKitFilterRootContext { fn on_configure(&mut self, _config_size: usize) -> bool { match self.get_plugin_configuration() { - Some(config_bytes) => match Config::new(config_bytes) { + Some(config_bytes) => match Config::new(config_bytes, &IMPLICIT_NODES) { Ok(config) => { self.config = Some(Rc::new(config)); true @@ -60,14 +104,18 @@ impl RootContext for DataKitFilterRootContext { // to avoid cloning every time? let data = Data::new(graph.clone()); - let do_request_headers = graph.has_dependents("request_headers"); - let do_request_body = graph.has_dependents("request_body"); - let do_service_request_headers = graph.has_providers("service_request_headers"); - let do_service_request_body = graph.has_providers("service_request_body"); - let do_service_response_headers = graph.has_dependents("service_response_headers"); - let do_service_response_body = graph.has_dependents("service_response_body"); - let do_response_headers = graph.has_providers("response_headers"); - let do_response_body = graph.has_providers("response_body"); + let do_request_headers = graph.has_dependents(Request.into(), Headers.into()); + let do_request_body = graph.has_dependents(Request.into(), Body.into()); + + let do_service_request_headers = graph.has_providers(ServiceRequest.into(), Headers.into()); + let do_service_request_body = graph.has_providers(ServiceRequest.into(), Body.into()); + + let do_service_response_headers = + graph.has_dependents(ServiceResponse.into(), Headers.into()); + let do_service_response_body = graph.has_dependents(ServiceResponse.into(), Body.into()); + + let do_response_headers = graph.has_providers(Response.into(), Headers.into()); + let do_response_body = graph.has_providers(Response.into(), Body.into()); Some(Box::new(DataKitFilter { config, @@ -93,7 +141,7 @@ impl RootContext for DataKitFilterRootContext { pub struct DataKitFilter { config: Rc, - nodes: NodeMap, + nodes: NodeVec, data: Data, debug: Option, failed: bool, @@ -148,7 +196,7 @@ impl DataKitFilter { } fn send_default_fail_response(&self) { - let body = data::to_json_error_body( + let body = payload::to_json_error_body( "An unexpected error ocurred", self.get_property(vec!["ngx", "kong_request_id"]), ); @@ -159,16 +207,36 @@ impl DataKitFilter { ); } - fn set_data(&mut self, name: &str, state: State) { - if let Some(ref mut debug) = self.debug { - debug.set_data(name, &state); + fn set_implicit_data(&mut self, node: ImplicitNodeId, port: ImplicitPortId, payload: Payload) { + let r = self.data.fill_port(node.into(), port.into(), payload); + match r { + Ok(()) => { + if let Some(debug) = &mut self.debug { + let name = self.data.get_node_name(node.into()); + if let Ok(state) = self.data.get_state(node.into()) { + debug.set_data(name, state); + } + } + } + Err(e) => panic!("error setting implicit node data: {e}"), } - self.data.set(name, state); } - fn set_headers_data(&mut self, vec: Vec<(String, String)>, name: &str) { - let payload = data::from_pwm_headers(vec); - self.set_data(name, State::Done(Some(payload))); + fn set_headers_data(&mut self, node: ImplicitNodeId, vec: Vec<(String, String)>) { + let payload = payload::from_pwm_headers(vec); + self.set_implicit_data(node, Headers, payload); + } + + fn set_body_data(&mut self, node: ImplicitNodeId, payload: Payload) { + self.set_implicit_data(node, Body, payload); + } + + fn get_headers_data(&self, node: ImplicitNodeId) -> Option<&Payload> { + self.data.fetch_port(node.into(), Headers.into()) + } + + fn get_body_data(&self, node: ImplicitNodeId) -> Option<&Payload> { + self.data.fetch_port(node.into(), Body.into()) } fn run_nodes(&mut self, phase: Phase) -> Action { @@ -179,15 +247,18 @@ impl DataKitFilter { debug_is_tracing = debug.is_tracing(); } + let from = self.config.number_of_implicits(); + let to = self.config.node_count(); + while !self.failed { let mut any_ran = false; - for name in self.config.get_node_names() { + for i in from..to { let node: &dyn Node = self .nodes - .get(name) - .expect("self.nodes doesn't match self.node_names") + .get(i) + .expect("self.nodes doesn't match node_count") .as_ref(); - if let Some(inputs) = self.data.get_inputs_for(name, None) { + if let Some(inputs) = self.data.get_inputs_for(i, None) { any_ran = true; let input = Input { @@ -197,6 +268,7 @@ impl DataKitFilter { let state = node.run(self as &dyn HttpContext, &input); if let Some(ref mut debug) = self.debug { + let name = self.data.get_node_name(i); debug.run(name, &inputs, &state, RunMode::Run); } @@ -213,7 +285,7 @@ impl DataKitFilter { } } - self.data.set(name, state); + self.data.set(i, state); } } if !any_ran { @@ -226,17 +298,41 @@ impl DataKitFilter { fn set_service_request_headers(&mut self) { if self.do_service_request_headers { - if let Some(payload) = self.data.first_input_for("service_request_headers", None) { - let headers = data::to_pwm_headers(Some(payload)); + if let Some(payload) = self.get_headers_data(ServiceRequest) { + let headers = payload::to_pwm_headers(Some(payload)); self.set_http_request_headers(headers); self.do_service_request_headers = false; } } } + fn set_content_headers( + &self, + node: ImplicitNodeId, + set_header: impl Fn(&DataKitFilter, &str, Option<&str>), + ) { + if let Some(payload) = self.get_body_data(node) { + if let Some(content_type) = payload.content_type() { + set_header(self, "Content-Type", Some(content_type)); + } + if let Some(content_length) = payload.len().map(|n| n.to_string()) { + set_header(self, "Content-Length", Some(&content_length)); + } else { + set_header(self, "Content-Length", Some("")); // FIXME: why doesn't None work? + } + } + set_header(self, "Content-Encoding", None); + } + + fn prep_service_request_body(&mut self) { + if self.do_service_request_body { + self.set_content_headers(ServiceRequest, |s, k, v| s.set_http_request_header(k, v)); + } + } + fn set_service_request_body(&mut self) { if self.do_service_request_body { - if let Some(payload) = self.data.first_input_for("service_request_body", None) { + if let Some(payload) = self.get_body_data(ServiceRequest) { if let Ok(bytes) = payload.to_bytes() { self.set_http_request_body(0, bytes.len(), &bytes); } @@ -256,13 +352,16 @@ impl Context for DataKitFilter { ) { log::debug!("DataKitFilter: on http call response, id = {:?}", token_id); - for name in self.config.get_node_names() { + let from = self.config.number_of_implicits(); + let to = self.config.node_count(); + + for i in from..to { let node: &dyn Node = self .nodes - .get(name) - .expect("self.nodes doesn't match self.node_names") + .get(i) + .expect("self.nodes doesn't match node_count") .as_ref(); - if let Some(inputs) = self.data.get_inputs_for(name, Some(token_id)) { + if let Some(inputs) = self.data.get_inputs_for(i, Some(token_id)) { let input = Input { data: &inputs, phase: HttpCallResponse, @@ -270,10 +369,11 @@ impl Context for DataKitFilter { let state = node.resume(self, &input); if let Some(ref mut debug) = self.debug { + let name = self.data.get_node_name(i); debug.run(name, &inputs, &state, RunMode::Resume); } - self.data.set(name, state); + self.data.set(i, state); break; } } @@ -281,7 +381,7 @@ impl Context for DataKitFilter { self.run_nodes(HttpCallResponse); self.set_service_request_headers(); - // self.set_service_request_body(); + self.prep_service_request_body(); self.resume_http_request(); } @@ -295,7 +395,7 @@ impl HttpContext for DataKitFilter { if self.do_request_headers { let vec = self.get_http_request_headers(); - self.set_headers_data(vec, "request_headers"); + self.set_headers_data(Request, vec); } let action = self.run_nodes(HttpRequestHeaders); @@ -304,9 +404,10 @@ impl HttpContext for DataKitFilter { && self.get_http_request_header("Transfer-Encoding").is_none() { self.set_service_request_headers(); - // self.set_service_request_body(); } + self.prep_service_request_body(); + action } @@ -314,8 +415,9 @@ impl HttpContext for DataKitFilter { if eof && self.do_request_body { if let Some(bytes) = self.get_http_request_body(0, body_size) { let content_type = self.get_http_request_header("Content-Type"); - let body_payload = Payload::from_bytes(bytes, content_type.as_deref()); - self.set_data("request_body", State::Done(body_payload)); + if let Some(payload) = Payload::from_bytes(bytes, content_type.as_deref()) { + self.set_body_data(Request, payload); + } } } @@ -330,27 +432,20 @@ impl HttpContext for DataKitFilter { fn on_http_response_headers(&mut self, _nheaders: usize, _eof: bool) -> Action { if self.do_service_response_headers { let vec = self.get_http_response_headers(); - self.set_headers_data(vec, "service_response_headers"); + self.set_headers_data(ServiceResponse, vec); } let action = self.run_nodes(HttpResponseHeaders); if self.do_response_headers { - if let Some(payload) = self.data.first_input_for("response_headers", None) { - let headers = data::to_pwm_headers(Some(payload)); + if let Some(payload) = self.get_headers_data(Response) { + let headers = payload::to_pwm_headers(Some(payload)); self.set_http_response_headers(headers); } } if self.do_response_body { - if let Some(payload) = self.data.first_input_for("response_body", None) { - let content_length = payload.len().map(|n| n.to_string()); - self.set_http_response_header("Content-Length", content_length.as_deref()); - self.set_http_response_header("Content-Type", payload.content_type()); - } else { - self.set_http_response_header("Content-Length", None); - } - self.set_http_response_header("Content-Encoding", None); + self.set_content_headers(Response, |s, k, v| s.set_http_response_header(k, v)); } if self.debug.is_some() { @@ -368,15 +463,16 @@ impl HttpContext for DataKitFilter { if eof && self.do_service_response_body { if let Some(bytes) = self.get_http_response_body(0, body_size) { let content_type = self.get_http_response_header("Content-Type"); - let payload = Payload::from_bytes(bytes, content_type.as_deref()); - self.set_data("service_response_body", State::Done(payload)); + if let Some(payload) = Payload::from_bytes(bytes, content_type.as_deref()) { + self.set_body_data(ServiceResponse, payload); + } } } let action = self.run_nodes(HttpResponseBody); if self.do_response_body { - if let Some(payload) = self.data.first_input_for("response_body", None) { + if let Some(payload) = self.get_body_data(Response) { if let Ok(bytes) = payload.to_bytes() { self.set_http_response_body(0, bytes.len(), &bytes); } else { @@ -385,8 +481,9 @@ impl HttpContext for DataKitFilter { } else if let Some(debug) = &self.debug { if let Some(bytes) = self.get_http_response_body(0, body_size) { let content_type = debug.response_body_content_type(); - let payload = Payload::from_bytes(bytes, content_type.as_deref()); - self.set_data("response_body", State::Done(payload)); + if let Some(payload) = Payload::from_bytes(bytes, content_type.as_deref()) { + self.set_body_data(Response, payload); + } } } } @@ -400,9 +497,11 @@ impl HttpContext for DataKitFilter { } proxy_wasm::main! {{ - nodes::register_node("template", Box::new(nodes::template::TemplateFactory {})); + nodes::register_node("source", Box::new(nodes::implicit::SourceFactory {})); + nodes::register_node("sink", Box::new(nodes::implicit::SinkFactory {})); + nodes::register_node("handlebars", Box::new(nodes::handlebars::HandlebarsFactory {})); nodes::register_node("call", Box::new(nodes::call::CallFactory {})); - nodes::register_node("response", Box::new(nodes::response::ResponseFactory {})); + nodes::register_node("exit", Box::new(nodes::exit::ExitFactory {})); nodes::register_node("jq", Box::new(nodes::jq::JqFactory {})); proxy_wasm::set_log_level(LogLevel::Debug); @@ -412,3 +511,8 @@ proxy_wasm::main! {{ }) }); }} + +// interesting test to try out: +// multiple callouts at once with different settings: http 1.0, 1.1, chunked encoding, content-length + +// test with bad responses diff --git a/src/nodes.rs b/src/nodes.rs index d302f1e..11dbef9 100644 --- a/src/nodes.rs +++ b/src/nodes.rs @@ -7,30 +7,64 @@ use std::sync::{Mutex, OnceLock}; use crate::data::{Input, State, State::*}; pub mod call; +pub mod exit; +pub mod handlebars; pub mod jq; -pub mod response; -pub mod template; -pub type NodeMap = BTreeMap>; +pub type NodeVec = Vec>; + +#[derive(Clone, Debug)] +pub struct PortConfig { + pub defaults: Option>, + pub user_defined_ports: bool, +} + +impl PortConfig { + fn names(list: &[&str]) -> Option> { + Some(list.iter().map(|&s| str::to_owned(s)).collect()) + } + + /// Combine defaults and user-given ports + /// into the final ordered list of ports. + pub fn into_port_list(self: PortConfig, given: &[String]) -> Vec { + let mut list = self.defaults.unwrap_or_default(); + + if self.user_defined_ports { + for port in given { + if !list.iter().any(|p| p == port) { + list.push(port.into()); + } + } + } + + list + } +} pub trait Node { fn run(&self, _ctx: &dyn HttpContext, _input: &Input) -> State { - Done(None) + Done(vec![None]) } fn resume(&self, _ctx: &dyn HttpContext, _input: &Input) -> State { - Done(None) + Done(vec![None]) } } +pub struct NodeDefaultLink { + pub this_port: String, + pub other_node: String, + pub other_port: String, +} + pub trait NodeConfig { fn as_any(&self) -> &dyn Any; - fn default_inputs(&self) -> Option> { + fn default_inputs(&self) -> Option> { None } - fn default_outputs(&self) -> Option> { + fn default_outputs(&self) -> Option> { None } } @@ -40,10 +74,15 @@ pub trait NodeFactory: Send { &self, name: &str, inputs: &[String], + outputs: &[String], bt: &BTreeMap, ) -> Result, String>; fn new_node(&self, config: &dyn NodeConfig) -> Box; + + fn default_input_ports(&self) -> PortConfig; + + fn default_output_ports(&self) -> PortConfig; } type NodeTypeMap = BTreeMap>; @@ -53,31 +92,125 @@ fn node_types() -> &'static Mutex { NODE_TYPES.get_or_init(|| Mutex::new(BTreeMap::new())) } -pub fn register_node(name: &str, factory: Box) -> bool { - node_types() - .lock() - .unwrap() - .insert(String::from(name), factory); - true +pub fn register_node(name: &str, factory: Box) { + node_types().lock().unwrap().insert(name.into(), factory); +} + +fn with_node_type(node_type: &str, f: impl Fn(&Box) -> T) -> Option +where + T: Sized, +{ + node_types().lock().unwrap().get(node_type).map(f) +} + +pub fn is_valid_type(node_type: &str) -> bool { + with_node_type(node_type, |_| true).unwrap_or(false) +} + +pub fn default_input_ports(node_type: &str) -> Option { + with_node_type(node_type, |nf| nf.default_input_ports()) +} + +pub fn default_output_ports(node_type: &str) -> Option { + with_node_type(node_type, |nf| nf.default_output_ports()) } pub fn new_config( node_type: &str, name: &str, inputs: &[String], + outputs: &[String], bt: &BTreeMap, ) -> Result, String> { - if let Some(nf) = node_types().lock().unwrap().get(node_type) { - nf.new_config(name, inputs, bt) - } else { - Err(format!("no such node type: {node_type}")) + match with_node_type(node_type, |nf| nf.new_config(name, inputs, outputs, bt)) { + Some(Ok(ok)) => Ok(ok), + Some(Err(e)) => Err(e), + None => Err(format!("no such node type: {node_type}")), } } pub fn new_node(node_type: &str, config: &dyn NodeConfig) -> Result, String> { - if let Some(nf) = node_types().lock().unwrap().get(node_type) { - Ok(nf.new_node(config)) - } else { - Err(format!("no such node type: {node_type}")) + with_node_type(node_type, |nf| nf.new_node(config)) + .ok_or(format!("no such node type: {node_type}")) +} + +pub mod implicit { + use super::*; + + #[derive(Clone)] + pub struct Implicit {} + + impl Node for Implicit {} + + pub struct SourceFactory {} + pub struct SinkFactory {} + + #[derive(Debug)] + pub struct ImplicitConfig {} + + impl NodeConfig for ImplicitConfig { + fn as_any(&self) -> &dyn Any { + self + } + } + + impl NodeFactory for SourceFactory { + fn default_input_ports(&self) -> PortConfig { + PortConfig { + defaults: None, + user_defined_ports: false, + } + } + + fn default_output_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["body", "headers"]), + user_defined_ports: false, + } + } + + fn new_config( + &self, + _name: &str, + _inputs: &[String], + _outputs: &[String], + _bt: &BTreeMap, + ) -> Result, String> { + Ok(Box::new(ImplicitConfig {})) + } + + fn new_node(&self, _config: &dyn NodeConfig) -> Box { + Box::new(Implicit {}) + } + } + + impl NodeFactory for SinkFactory { + fn default_input_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["body", "headers", "query"]), + user_defined_ports: false, + } + } + + fn default_output_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["body", "headers"]), + user_defined_ports: false, + } + } + + fn new_config( + &self, + _name: &str, + _inputs: &[String], + _outputs: &[String], + _bt: &BTreeMap, + ) -> Result, String> { + Ok(Box::new(ImplicitConfig {})) + } + + fn new_node(&self, _config: &dyn NodeConfig) -> Box { + Box::new(Implicit {}) + } } } diff --git a/src/nodes/call.rs b/src/nodes/call.rs index 95943a8..df6250d 100644 --- a/src/nodes/call.rs +++ b/src/nodes/call.rs @@ -7,9 +7,10 @@ use std::time::Duration; use url::Url; use crate::config::get_config_value; -use crate::data; -use crate::data::{Input, Payload, State, State::*}; -use crate::nodes::{Node, NodeConfig, NodeFactory}; +use crate::data::{Input, State, State::*}; +use crate::nodes::{Node, NodeConfig, NodeFactory, PortConfig}; +use crate::payload; +use crate::payload::Payload; #[derive(Clone, Debug)] pub struct CallConfig { @@ -43,7 +44,7 @@ impl Node for Call { Ok(u) => u, Err(err) => { log::error!("call: failed parsing URL from 'url' field: {err}"); - return Done(None); + return Done(vec![]); } }; @@ -51,18 +52,13 @@ impl Node for Call { Some(h) => h, None => { log::error!("call: failed getting host from URL"); - return Done(None); + return Done(vec![]); } }; - let mut headers_vec = data::to_pwm_headers(*headers); - headers_vec.push((":method", self.config.method.as_str())); - headers_vec.push((":path", call_url.path())); - headers_vec.push((":scheme", call_url.scheme())); - - let body_slice = match data::to_pwm_body(*body) { + let body_slice = match payload::to_pwm_body(*body) { Ok(slice) => slice, - Err(e) => return Fail(Some(Payload::Error(e))), + Err(e) => return Fail(vec![Some(Payload::Error(e))]), }; let trailers = vec![]; @@ -73,6 +69,12 @@ impl Node for Call { None => host.to_owned(), }; + let mut headers_vec = payload::to_pwm_headers(*headers); + headers_vec.push((":method", self.config.method.as_str())); + headers_vec.push((":path", call_url.path())); + headers_vec.push((":scheme", call_url.scheme())); + headers_vec.push((":authority", &host_port)); + let result = ctx.dispatch_http_call( &host_port, headers_vec, @@ -86,14 +88,21 @@ impl Node for Call { log::debug!("call: dispatch call id: {:?}", id); Waiting(id) } - Err(status) => Fail(Some(Payload::Error(format!("error: {:?}", status)))), + Err(status) => { + log::debug!("call: dispatch call failed: {:?}", status); + Fail(vec![Some(Payload::Error(format!("error: {:?}", status)))]) + } } } fn resume(&self, ctx: &dyn HttpContext, _inputs: &Input) -> State { log::debug!("call: resume"); - let r = if let Some(body) = ctx.get_http_call_response_body(0, usize::MAX) { + let headers = Some(payload::from_pwm_headers( + ctx.get_http_call_response_headers(), + )); + + let body = if let Some(body) = ctx.get_http_call_response_body(0, usize::MAX) { let content_type = ctx.get_http_call_response_header("Content-Type"); Payload::from_bytes(body, content_type.as_deref()) @@ -101,20 +110,34 @@ impl Node for Call { None }; - // TODO once we have multiple outputs, - // also return headers and produce a Fail() status on HTTP >= 400 + // TODO only produce an output if it is connected + // TODO produce a Fail() status on HTTP >= 400 - Done(r) + Done(vec![body, headers]) } } pub struct CallFactory {} impl NodeFactory for CallFactory { + fn default_input_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["body", "headers", "query"]), + user_defined_ports: false, + } + } + fn default_output_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["body", "headers"]), + user_defined_ports: false, + } + } + fn new_config( &self, _name: &str, _inputs: &[String], + _outputs: &[String], bt: &BTreeMap, ) -> Result, String> { Ok(Box::new(CallConfig { diff --git a/src/nodes/response.rs b/src/nodes/exit.rs similarity index 58% rename from src/nodes/response.rs rename to src/nodes/exit.rs index c3bf5f7..a299dec 100644 --- a/src/nodes/response.rs +++ b/src/nodes/exit.rs @@ -6,20 +6,21 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use crate::config::get_config_value; -use crate::data; -use crate::data::{Input, Payload, Phase, State, State::*}; -use crate::nodes::{Node, NodeConfig, NodeFactory}; +use crate::data::{Input, Phase, State, State::*}; +use crate::nodes::{Node, NodeConfig, NodeDefaultLink, NodeFactory, PortConfig}; +use crate::payload; +use crate::payload::Payload; #[derive(Debug)] -pub struct ResponseConfig { +pub struct ExitConfig { name: String, status: Option, warn_headers_sent: AtomicBool, } -impl Clone for ResponseConfig { - fn clone(&self) -> ResponseConfig { - ResponseConfig { +impl Clone for ExitConfig { + fn clone(&self) -> ExitConfig { + ExitConfig { name: self.name.clone(), status: self.status, warn_headers_sent: AtomicBool::new(self.warn_headers_sent.load(Relaxed)), @@ -27,22 +28,33 @@ impl Clone for ResponseConfig { } } -impl NodeConfig for ResponseConfig { +impl NodeConfig for ExitConfig { fn as_any(&self) -> &dyn Any { self } - fn default_outputs(&self) -> Option> { - Some(vec!["response_body".to_string()]) + fn default_outputs(&self) -> Option> { + Some(vec![ + NodeDefaultLink { + this_port: "body".into(), + other_node: "response".into(), + other_port: "body".into(), + }, + NodeDefaultLink { + this_port: "headers".into(), + other_node: "response".into(), + other_port: "headers".into(), + }, + ]) } } #[derive(Clone)] -pub struct Response { - config: ResponseConfig, +pub struct Exit { + config: ExitConfig, } -fn warn_headers_sent(config: &ResponseConfig, set_headers: bool) { +fn warn_headers_sent(config: &ExitConfig, set_headers: bool) { let name = &config.name; let set_status = config.status.is_some(); @@ -55,7 +67,7 @@ fn warn_headers_sent(config: &ResponseConfig, set_headers: bool) { "headers" }; log::warn!( - "response: node '{name}' cannot set {what} when processing response body, \ + "exit: node '{name}' cannot set {what} when processing response body, \ headers already sent; set 'warn_headers_sent' to false \ to silence this warning", ); @@ -63,13 +75,13 @@ fn warn_headers_sent(config: &ResponseConfig, set_headers: bool) { config.warn_headers_sent.store(false, Relaxed); } -impl Node for Response { +impl Node for Exit { fn run(&self, ctx: &dyn HttpContext, input: &Input) -> State { let config = &self.config; let body = input.data.first().unwrap_or(&None).as_deref(); let headers = input.data.get(1).unwrap_or(&None).as_deref(); - let mut headers_vec = data::to_pwm_headers(headers); + let mut headers_vec = payload::to_pwm_headers(headers); if let Some(payload) = body { if let Some(content_type) = payload.content_type() { @@ -77,9 +89,9 @@ impl Node for Response { } } - let body_slice = match data::to_pwm_body(body) { + let body_slice = match payload::to_pwm_body(body) { Ok(slice) => slice, - Err(e) => return Fail(Some(Payload::Error(e))), + Err(e) => return Fail(vec![Some(Payload::Error(e))]), }; if input.phase == Phase::HttpResponseBody { @@ -95,20 +107,35 @@ impl Node for Response { ctx.send_http_response(status, headers_vec, body_slice.as_deref()); } - Done(None) + Done(vec![None]) } } -pub struct ResponseFactory {} +pub struct ExitFactory {} + +impl NodeFactory for ExitFactory { + fn default_input_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["body", "headers"]), + user_defined_ports: false, + } + } + + fn default_output_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["body", "headers"]), + user_defined_ports: false, + } + } -impl NodeFactory for ResponseFactory { fn new_config( &self, name: &str, _inputs: &[String], + _outputs: &[String], bt: &BTreeMap, ) -> Result, String> { - Ok(Box::new(ResponseConfig { + Ok(Box::new(ExitConfig { name: name.to_string(), status: get_config_value(bt, "status"), warn_headers_sent: AtomicBool::new( @@ -118,8 +145,8 @@ impl NodeFactory for ResponseFactory { } fn new_node(&self, config: &dyn NodeConfig) -> Box { - match config.as_any().downcast_ref::() { - Some(cc) => Box::new(Response { config: cc.clone() }), + match config.as_any().downcast_ref::() { + Some(cc) => Box::new(Exit { config: cc.clone() }), None => panic!("incompatible NodeConfig"), } } diff --git a/src/nodes/template.rs b/src/nodes/handlebars.rs similarity index 51% rename from src/nodes/template.rs rename to src/nodes/handlebars.rs index f5f6b5b..918ed73 100644 --- a/src/nodes/template.rs +++ b/src/nodes/handlebars.rs @@ -5,114 +5,132 @@ use std::any::Any; use std::collections::BTreeMap; use crate::config::get_config_value; -use crate::data::{Input, Payload, State}; -use crate::nodes::{Node, NodeConfig, NodeFactory}; +use crate::data::{Input, State}; +use crate::nodes::{Node, NodeConfig, NodeFactory, PortConfig}; +use crate::payload::Payload; #[derive(Clone, Debug)] -pub struct TemplateConfig { +pub struct HandlebarsConfig { template: String, content_type: String, inputs: Vec, } -impl NodeConfig for TemplateConfig { +impl NodeConfig for HandlebarsConfig { fn as_any(&self) -> &dyn Any { self } } #[derive(Clone)] -pub struct Template<'a> { - config: TemplateConfig, +pub struct HandlebarsNode<'a> { + config: HandlebarsConfig, handlebars: Handlebars<'a>, } -impl Template<'_> { - fn new(config: TemplateConfig) -> Self { - let mut hb = Handlebars::new(); +impl HandlebarsNode<'_> { + fn new(config: HandlebarsConfig) -> Self { + let mut handlebars = Handlebars::new(); - match hb.register_template_string("template", &config.template) { + match handlebars.register_template_string("template", &config.template) { Ok(()) => {} Err(err) => { - log::error!("template: error registering template: {err}"); + log::error!("handlebars: error registering template: {err}"); } } - Template { - config, - handlebars: hb, - } + HandlebarsNode { config, handlebars } } } -impl Node for Template<'_> { +fn sanitize_handlebars_variable(input: &str) -> String { + input.replace('.', "_") +} + +impl Node for HandlebarsNode<'_> { fn run(&self, _ctx: &dyn HttpContext, input: &Input) -> State { let mut vs = Vec::new(); let mut data = BTreeMap::new(); for (input_name, input) in self.config.inputs.iter().zip(input.data.iter()) { + let var = sanitize_handlebars_variable(input_name); match input { Some(Payload::Json(value)) => { - data.insert(input_name, value); + data.insert(var, value); } Some(Payload::Raw(vec_bytes)) => { match std::str::from_utf8(vec_bytes) { Ok(s) => { let v = serde_json::to_value::(s.into()) .expect("valid UTF-8 string"); - vs.push((input_name, v)); + vs.push((var, v)); } Err(err) => { - log::error!("template: input string is not valid UTF-8: {err}"); + log::error!("handlebars: input string is not valid UTF-8: {err}"); } }; } Some(Payload::Error(error)) => { - vs.push((input_name, serde_json::json!(error))); + vs.push((var, serde_json::json!(error))); } None => {} } } - for (input_name, v) in vs.iter() { - data.insert(input_name, v); + for (var, val) in vs.iter() { + data.insert(var.clone(), val); } match self.handlebars.render("template", &data) { Ok(output) => { log::debug!("output: {output}"); match Payload::from_bytes(output.into(), Some(&self.config.content_type)) { - p @ Some(Payload::Error(_)) => State::Fail(p), - p => State::Done(p), + p @ Some(Payload::Error(_)) => State::Fail(vec![p]), + p => State::Done(vec![p]), } } - Err(err) => State::Fail(Some(Payload::Error(format!( - "error rendering template: {err}" - )))), + Err(err) => State::Fail(vec![Some(Payload::Error(format!( + "handlebars: error rendering template: {err}" + )))]), } } } -pub struct TemplateFactory {} +pub struct HandlebarsFactory {} + +impl NodeFactory for HandlebarsFactory { + fn default_input_ports(&self) -> PortConfig { + PortConfig { + defaults: None, + user_defined_ports: true, + } + } + + fn default_output_ports(&self) -> PortConfig { + PortConfig { + defaults: PortConfig::names(&["output"]), + user_defined_ports: false, + } + } -impl NodeFactory for TemplateFactory { fn new_config( &self, _name: &str, inputs: &[String], + _outputs: &[String], bt: &BTreeMap, ) -> Result, String> { - Ok(Box::new(TemplateConfig { + Ok(Box::new(HandlebarsConfig { inputs: inputs.to_vec(), template: get_config_value(bt, "template").unwrap_or_else(|| String::from("")), content_type: get_config_value(bt, "content_type") - .unwrap_or_else(|| String::from("application/json")), + .unwrap_or_else(|| String::from("text/plain")), })) } fn new_node(&self, config: &dyn NodeConfig) -> Box { - match config.as_any().downcast_ref::() { - Some(cc) => Box::new(Template::new(cc.clone())), + match config.as_any().downcast_ref::() { + Some(cc) => Box::new(HandlebarsNode::new(cc.clone())), None => panic!("incompatible NodeConfig"), } } diff --git a/src/nodes/jq.rs b/src/nodes/jq.rs index 1b2d6fa..21c3178 100644 --- a/src/nodes/jq.rs +++ b/src/nodes/jq.rs @@ -7,13 +7,15 @@ use std::any::Any; use std::collections::BTreeMap; use crate::config::get_config_value; -use crate::data::{Input, Payload, State}; -use crate::nodes::{Node, NodeConfig, NodeFactory}; +use crate::data::{Input, State}; +use crate::nodes::{Node, NodeConfig, NodeFactory, PortConfig}; +use crate::payload::Payload; #[derive(Clone, Debug)] pub struct JqConfig { jq: String, inputs: Vec, + _outputs: Vec, // TODO: implement multiple outputs } impl NodeConfig for JqConfig { @@ -68,12 +70,12 @@ impl Errors { impl From for State { fn from(val: Errors) -> Self { - State::Fail(Some(Payload::Error(if val.is_empty() { + State::Fail(vec![Some(Payload::Error(if val.is_empty() { // should be unreachable "unknown jq error".to_string() } else { val.0.join(", ") - }))) + }))]) } } @@ -177,22 +179,19 @@ impl Jq { impl Node for Jq { fn run(&self, _ctx: &dyn HttpContext, input: &Input) -> State { match self.exec(input.data) { - Ok(mut results) => { - State::Done(match results.len() { + Ok(results) => { + match results.len() { // empty - 0 => None, - - // single - 1 => { - let Some(item) = results.pop() else { - unreachable!(); - }; - Some(Payload::Json(item)) - } - - // more than one, return as an array - _ => Some(Payload::Json(results.into())), - }) + 0 => State::Done(vec![None]), + + // one or more + _ => State::Done( + results + .into_iter() + .map(|item| Some(Payload::Json(item))) + .collect(), + ), + } } Err(errs) => errs.into(), } @@ -201,16 +200,41 @@ impl Node for Jq { pub struct JqFactory {} +fn sanitize_jq_inputs(inputs: &[String]) -> Vec { + // TODO: this is a minimal implementation. + // Ideally we need to validate input names into valid jq variables + inputs + .iter() + .map(|input| input.replace('.', "_").replace('$', "")) + .collect() +} + impl NodeFactory for JqFactory { + fn default_input_ports(&self) -> PortConfig { + PortConfig { + defaults: None, + user_defined_ports: true, + } + } + + fn default_output_ports(&self) -> PortConfig { + PortConfig { + defaults: None, + user_defined_ports: true, + } + } + fn new_config( &self, _name: &str, inputs: &[String], + outputs: &[String], bt: &BTreeMap, ) -> Result, String> { Ok(Box::new(JqConfig { jq: get_config_value(bt, "jq").unwrap_or(".".to_string()), - inputs: inputs.to_vec(), + inputs: sanitize_jq_inputs(inputs), + _outputs: outputs.to_vec(), })) } diff --git a/src/payload.rs b/src/payload.rs new file mode 100644 index 0000000..a1d90be --- /dev/null +++ b/src/payload.rs @@ -0,0 +1,165 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +#[derive(Debug)] +pub enum Payload { + Raw(Vec), + Json(serde_json::Value), + Error(String), +} + +impl Payload { + pub fn content_type(&self) -> Option<&str> { + match &self { + Payload::Json(_) => Some("application/json"), + _ => None, + } + } + + pub fn from_bytes(bytes: Vec, content_type: Option<&str>) -> Option { + match content_type { + Some(ct) => { + if ct.contains("application/json") { + match serde_json::from_slice(&bytes) { + Ok(v) => Some(Payload::Json(v)), + Err(e) => Some(Payload::Error(e.to_string())), + } + } else { + Some(Payload::Raw(bytes)) + } + } + _ => None, + } + } + + pub fn to_json(&self) -> Result { + match &self { + Payload::Json(value) => Ok(value.clone()), + Payload::Raw(vec) => match std::str::from_utf8(vec) { + Ok(s) => serde_json::to_value(s).map_err(|e| e.to_string()), + Err(e) => Err(e.to_string()), + }, + Payload::Error(e) => Err(e.clone()), + } + } + + pub fn to_bytes(&self) -> Result, String> { + match &self { + Payload::Json(value) => match serde_json::to_string(value) { + Ok(s) => Ok(s.into_bytes()), + Err(e) => Err(e.to_string()), + }, + Payload::Raw(s) => Ok(s.clone()), // it would be nice to be able to avoid this copy + Payload::Error(e) => Err(e.clone()), + } + } + + pub fn len(&self) -> Option { + match &self { + Payload::Json(_) => None, + Payload::Raw(s) => Some(s.len()), + Payload::Error(e) => Some(e.len()), + } + } + + pub fn to_pwm_headers(&self) -> Vec<(&str, &str)> { + match &self { + Payload::Json(value) => { + let mut vec: Vec<(&str, &str)> = vec![]; + if let serde_json::Value::Object(map) = value { + for (k, entry) in map { + match entry { + serde_json::Value::Array(vs) => { + for v in vs { + if let serde_json::Value::String(s) = v { + vec.push((k, s)); + } + } + } + + // accept string values as well + serde_json::Value::String(s) => { + vec.push((k, s)); + } + + _ => {} + } + } + } + + vec + } + _ => { + // TODO + log::debug!("NYI: converting payload into headers vector"); + vec![] + } + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +enum StringOrVec { + String(String), + Vec(Vec), +} + +pub fn from_pwm_headers(vec: Vec<(String, String)>) -> Payload { + let mut map = BTreeMap::new(); + for (k, v) in vec { + let lk = k.to_lowercase(); + if let Some(vs) = map.get_mut(&lk) { + match vs { + StringOrVec::String(s) => { + let ss = s.to_string(); + map.insert(lk, StringOrVec::Vec(vec![ss, v])); + } + StringOrVec::Vec(vs) => { + vs.push(v); + } + }; + } else { + map.insert(lk, StringOrVec::String(v)); + } + } + + let value = serde_json::to_value(map).expect("serializable map"); + Payload::Json(value) +} + +pub fn to_pwm_headers(payload: Option<&Payload>) -> Vec<(&str, &str)> { + payload.map_or_else(Vec::new, |p| p.to_pwm_headers()) +} + +/// To use this result in proxy-wasm calls as an Option<&[u8]>, use: +/// `data::to_pwm_body(p).as_deref()`. +pub fn to_pwm_body(payload: Option<&Payload>) -> Result>, String> { + match payload { + Some(p) => match p.to_bytes() { + Ok(b) => Ok(Some(Vec::into_boxed_slice(b))), + Err(e) => Err(e), + }, + None => Ok(None), + } +} + +#[derive(Serialize)] +struct ErrorMessage<'a> { + message: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + request_id: Option, +} + +pub fn to_json_error_body(message: &str, request_id: Option>) -> String { + serde_json::to_value(ErrorMessage { + message, + request_id: match request_id { + Some(vec) => std::str::from_utf8(&vec).map(|v| v.to_string()).ok(), + None => None, + }, + }) + .ok() + .map(|v| v.to_string()) + .expect("JSON error object") +} diff --git a/test/config/demo.yml b/test/config/demo.yml deleted file mode 100644 index 0262526..0000000 --- a/test/config/demo.yml +++ /dev/null @@ -1,32 +0,0 @@ - -_format_version: "1.1" - -services: -- name: demo - url: http://httpbin.org -# url: http://host.docker.internal:6502 - routes: - - name: my-route - paths: - - / - strip_path: false - filter_chains: - - filters: - - name: datakit - config: - nodes: - - type: call - name: first - url: http://127.0.0.1:6502/json - - type: call - name: second - url: http://127.0.0.1:8008/object - - type: template - inputs: - - first - - second - template: | - { - "foo": {{ first.field }}, - "bar": {{ second.field }} - }