These are self contained Java functions, each implements java.util.function.Function
and can be used as a component that provides a value for a given input value in a variety of streaming application platforms such as Spring Cloud Data Flow, Project RIFF etc.
The functions can be run as part of standalone Spring Boot applications.
splitter-function
This is a "one-to-many" Java function with a signature as Function<Message<?>, List<Message<?>>>
. So, every inbound message is splitted into several output messages.
This function is fully based on the Spring integration AbstractMessageSplitter
and supports a delimiters
, expression
, applySequence
properties.
Also if one of the fileMarkers
or charset
properties is present, the FileSplitter
implementation is used and an incoming payload is treated as file for splitting.
spel-function
Transform incoming message payload based on a SpEL expression and produces an outbound message.
filter-function
Filter an incoming message based on a boolean SpEL expression and produces that message or nothing according expression result.
header-enricher-function
Enrich the incoming message headers based on expression evaluated headers.
tasklauncher-function
This is Java function with signature Function<LaunchRequest, Optional<Long>
that submits a task launch request against a Spring Cloud Data Flow server.
The LaunchRequest
is defined as:
public class LaunchRequest {
@JsonProperty("args")
private List<String> commandlineArguments = new ArrayList<>();
@JsonProperty("deploymentProps")
private Map<String, String> deploymentProperties = new HashMap<>();
@JsonProperty("name")
private String taskName;
...
}
And posted as JSON. e.g.,
{"name":"timestamp"}
or
{"name":"mytask","args":["a0=v0","a1=v1"],"deploymentProps":{"prop0":"val0"}}
The only required field is the task name
which represents an existing task
defined in Data Flow.
If the Data Flow server is not deployed to the default URL (http://localhost:9393), you must configure DataFlowClientProperties, including authentication if necessary.
The function will return an Optional
with the taskId
as the value. Before submitting the request, it first checks that the configured task platform is not at its current task execution limit (20 by default).
If the task platform is at its limit, the function returns Optional.empty()
and will log a warning message.