Skip to content

Commit 2d80949

Browse files
committed
Add data_stream_router processor
1 parent 6d6c7d5 commit 2d80949

File tree

7 files changed

+497
-1
lines changed

7 files changed

+497
-1
lines changed

docs/reference/ingest/processors.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ include::processors/circle.asciidoc[]
3838
include::processors/community-id.asciidoc[]
3939
include::processors/convert.asciidoc[]
4040
include::processors/csv.asciidoc[]
41+
include::processors/data-stream-router.asciidoc[]
4142
include::processors/date.asciidoc[]
4243
include::processors/date-index-name.asciidoc[]
4344
include::processors/dissect.asciidoc[]
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
[[data-stream-router-processor]]
2+
=== Data stream router processor
3+
++++
4+
<titleabbrev>Data stream router</titleabbrev>
5+
++++
6+
7+
The `data_stream_router` processor allows to route a document from one data stream to another data stream.
8+
It can use both static values or values from the document to determine the target data stream.
9+
10+
The name of a data stream is comprised of three parts and looks like this: `<type>-<dataset>-<namespace>`.
11+
See the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme] documentation for more details.
12+
13+
NOTE: `data_stream_router` processor can only be used on data streams that follow the data streams naming scheme.
14+
Trying to use this processor on a data stream with a non-compliant name will raise an exception.
15+
16+
After a `data_stream_router` processor has been executed, all the other processors of the current pipeline are skipped.
17+
This means that at most one `data_stream_router` processor is ever executed within a pipeline,
18+
allowing to define mutually exclusive routing conditions,
19+
similar to a if, else-if, else-if, … condition.
20+
21+
[[data-stream-router-options]]
22+
.Data stream router options
23+
[options="header"]
24+
|======
25+
| Name | Required | Default | Description
26+
| `dataset` | no | - | A static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`. If not set, gets the value of the field `data_stream.dataset` from the document. When using values from the document, the processor replaces invalid characters with `_`. If the option is not set and the document also doesn't contain a corresponding field, it uses the `<dataset>` part of the index name as a fallback.
27+
| `namespace` | no | - | A static value for the namespace part of the data stream name. See the criteria for <<indices-create-api-path-params, index names>> for allowed characters. Must be no longer than 100 characters. If not set, gets the value of the field `data_stream.namespace` from the document. When using values from the document, the processor replaces invalid characters with `_`. If the option is not set and the document also doesn't contain a corresponding field, it uses the `<namespace>` part of the index name as a fallback.
28+
include::common-options.asciidoc[]
29+
|======
30+
31+
NOTE: It's not possible to change the `type` of the data stream by setting the `data_stream.type` in the document.
32+
33+
[source,js]
34+
--------------------------------------------------
35+
{
36+
"data_stream_router": {
37+
"tag": "nginx",
38+
"if" : "ctx?.log?.file?.path?.contains('nginx')",
39+
"dataset": "nginx"
40+
}
41+
}
42+
--------------------------------------------------
43+
// NOTCONSOLE
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.ingest.common;
10+
11+
import org.elasticsearch.ingest.AbstractProcessor;
12+
import org.elasticsearch.ingest.ConfigurationUtils;
13+
import org.elasticsearch.ingest.IngestDocument;
14+
import org.elasticsearch.ingest.Processor;
15+
16+
import java.util.Locale;
17+
import java.util.Map;
18+
import java.util.Objects;
19+
20+
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
21+
22+
public final class DataStreamRouterProcessor extends AbstractProcessor {
23+
public static final String TYPE = "data_stream_router";
24+
25+
private static final String DATA_STREAM_PREFIX = "data_stream.";
26+
private static final String DATA_STREAM_TYPE = DATA_STREAM_PREFIX + "type";
27+
private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
28+
private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
29+
private static final String EVENT_DATASET = "event.dataset";
30+
31+
private static final char[] DISALLOWED_IN_DATASET = new char[] { '\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':', '-' };
32+
private static final char[] DISALLOWED_IN_NAMESPACE = new char[] { '\\', '/', '*', '?', '\"', '<', '>', '|', ' ', ',', '#', ':' };
33+
private static final int MAX_LENGTH = 100;
34+
private static final char REPLACEMENT_CHAR = '_';
35+
private final String dataset;
36+
private final String namespace;
37+
38+
DataStreamRouterProcessor(String tag, String description, String dataset, String namespace) {
39+
super(tag, description);
40+
this.dataset = dataset;
41+
this.namespace = namespace;
42+
}
43+
44+
private static String sanitizeDataStreamField(String s, char[] disallowedInDataset) {
45+
if (s == null) {
46+
return null;
47+
}
48+
s = s.toLowerCase(Locale.ROOT);
49+
s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
50+
for (char c : disallowedInDataset) {
51+
s = s.replace(c, REPLACEMENT_CHAR);
52+
}
53+
return s;
54+
}
55+
56+
@Override
57+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
58+
final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
59+
final String type;
60+
final String datasetFallback;
61+
final String namespaceFallback;
62+
int indexOfFirstDash = indexName.indexOf('-');
63+
String illegalDataStreamNameMessage = "invalid data stream name: ["
64+
+ indexName
65+
+ "]; must follow naming scheme <type>-<dataset>-<namespace>";
66+
if (indexOfFirstDash < 0) {
67+
throw new IllegalArgumentException(illegalDataStreamNameMessage);
68+
}
69+
type = indexName.substring(0, indexOfFirstDash);
70+
int indexOfSecondDash = indexName.indexOf('-', indexOfFirstDash + 1);
71+
if (indexOfSecondDash < 0) {
72+
throw new IllegalArgumentException(illegalDataStreamNameMessage);
73+
}
74+
datasetFallback = indexName.substring(indexOfFirstDash + 1, indexOfSecondDash);
75+
namespaceFallback = indexName.substring(indexOfSecondDash + 1);
76+
77+
String dataset = getDataset(ingestDocument, datasetFallback);
78+
String namespace = getNamespace(ingestDocument, namespaceFallback);
79+
ingestDocument.setFieldValue(DATA_STREAM_TYPE, type);
80+
if (ingestDocument.hasField(EVENT_DATASET)) {
81+
ingestDocument.setFieldValue(EVENT_DATASET, dataset);
82+
}
83+
ingestDocument.setFieldValue(DATA_STREAM_DATASET, dataset);
84+
ingestDocument.setFieldValue(DATA_STREAM_NAMESPACE, namespace);
85+
ingestDocument.setFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), type + "-" + dataset + "-" + namespace);
86+
ingestDocument.skipCurrentPipeline();
87+
return ingestDocument;
88+
}
89+
90+
private String getDataset(IngestDocument ingestDocument, String datasetFallback) {
91+
String dataset = this.dataset;
92+
if (dataset == null) {
93+
dataset = sanitizeDataStreamField(ingestDocument.getFieldValue(DATA_STREAM_DATASET, String.class, true), DISALLOWED_IN_DATASET);
94+
}
95+
if (dataset == null) {
96+
dataset = datasetFallback;
97+
}
98+
return dataset;
99+
}
100+
101+
private String getNamespace(IngestDocument ingestDocument, String namespaceFallback) {
102+
String namespace = this.namespace;
103+
if (namespace == null) {
104+
namespace = sanitizeDataStreamField(
105+
ingestDocument.getFieldValue(DATA_STREAM_NAMESPACE, String.class, true),
106+
DISALLOWED_IN_NAMESPACE
107+
);
108+
}
109+
if (namespace == null) {
110+
namespace = namespaceFallback;
111+
}
112+
return namespace;
113+
}
114+
115+
@Override
116+
public String getType() {
117+
return TYPE;
118+
}
119+
120+
public String getDataStreamDataset() {
121+
return dataset;
122+
}
123+
124+
public String getDataStreamNamespace() {
125+
return namespace;
126+
}
127+
128+
public static final class Factory implements Processor.Factory {
129+
130+
@Override
131+
public DataStreamRouterProcessor create(
132+
Map<String, Processor.Factory> processorFactories,
133+
String tag,
134+
String description,
135+
Map<String, Object> config
136+
) throws Exception {
137+
String dataset = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "dataset");
138+
if (Objects.equals(sanitizeDataStreamField(dataset, DISALLOWED_IN_DATASET), dataset) == false) {
139+
throw newConfigurationException(TYPE, tag, "dataset", "contains illegal characters");
140+
}
141+
String namespace = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "namespace");
142+
if (Objects.equals(sanitizeDataStreamField(namespace, DISALLOWED_IN_NAMESPACE), namespace) == false) {
143+
throw newConfigurationException(TYPE, tag, "namespace", "contains illegal characters");
144+
}
145+
return new DataStreamRouterProcessor(tag, description, dataset, namespace);
146+
}
147+
}
148+
}

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
8686
entry(NetworkDirectionProcessor.TYPE, new NetworkDirectionProcessor.Factory(parameters.scriptService)),
8787
entry(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory()),
8888
entry(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory()),
89-
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory())
89+
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
90+
entry(DataStreamRouterProcessor.TYPE, new DataStreamRouterProcessor.Factory())
9091
);
9192
}
9293

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.ingest.common;
10+
11+
import org.elasticsearch.ElasticsearchParseException;
12+
import org.elasticsearch.test.ESTestCase;
13+
import org.hamcrest.Matchers;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
import static org.hamcrest.CoreMatchers.nullValue;
19+
20+
public class DataStreamRouterProcessorFactoryTests extends ESTestCase {
21+
22+
public void testSuccess() throws Exception {
23+
DataStreamRouterProcessor processor = create(null, null);
24+
assertThat(processor.getDataStreamDataset(), nullValue());
25+
assertThat(processor.getDataStreamNamespace(), nullValue());
26+
}
27+
28+
public void testInvalidDataset() throws Exception {
29+
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("my-service", null));
30+
assertThat(e.getMessage(), Matchers.equalTo("[dataset] contains illegal characters"));
31+
}
32+
33+
public void testInvalidNamespace() throws Exception {
34+
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> create("generic", "foo:bar"));
35+
assertThat(e.getMessage(), Matchers.equalTo("[namespace] contains illegal characters"));
36+
}
37+
38+
private static DataStreamRouterProcessor create(String dataset, String namespace) throws Exception {
39+
Map<String, Object> config = new HashMap<>();
40+
if (dataset != null) {
41+
config.put("dataset", dataset);
42+
}
43+
if (namespace != null) {
44+
config.put("namespace", namespace);
45+
}
46+
return new DataStreamRouterProcessor.Factory().create(null, null, null, config);
47+
}
48+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.ingest.common;
10+
11+
import org.elasticsearch.ingest.CompoundProcessor;
12+
import org.elasticsearch.ingest.IngestDocument;
13+
import org.elasticsearch.ingest.Processor;
14+
import org.elasticsearch.ingest.RandomDocumentPicks;
15+
import org.elasticsearch.ingest.WrappingProcessor;
16+
import org.elasticsearch.test.ESTestCase;
17+
18+
import static org.hamcrest.Matchers.equalTo;
19+
20+
public class DataStreamRouterProcessorTests extends ESTestCase {
21+
22+
public void testDefaults() throws Exception {
23+
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
24+
25+
DataStreamRouterProcessor processor = new DataStreamRouterProcessor(null, null, null, null);
26+
processor.execute(ingestDocument);
27+
assertDataSetFields(ingestDocument, "logs", "generic", "default");
28+
}
29+
30+
public void testSkipFirstProcessor() throws Exception {
31+
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
32+
33+
DataStreamRouterProcessor skippedProcessor = new DataStreamRouterProcessor(null, null, "skip", null);
34+
DataStreamRouterProcessor executedProcessor = new DataStreamRouterProcessor(null, null, "executed", null);
35+
CompoundProcessor processor = new CompoundProcessor(new SkipProcessor(skippedProcessor), executedProcessor);
36+
processor.execute(ingestDocument);
37+
assertDataSetFields(ingestDocument, "logs", "executed", "default");
38+
}
39+
40+
public void testSkipLastProcessor() throws Exception {
41+
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
42+
43+
DataStreamRouterProcessor executedProcessor = new DataStreamRouterProcessor(null, null, "executed", null);
44+
DataStreamRouterProcessor skippedProcessor = new DataStreamRouterProcessor(null, null, "skip", null);
45+
CompoundProcessor processor = new CompoundProcessor(executedProcessor, skippedProcessor);
46+
processor.execute(ingestDocument);
47+
assertDataSetFields(ingestDocument, "logs", "executed", "default");
48+
}
49+
50+
public void testDataStreamFieldsFromDocument() throws Exception {
51+
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
52+
ingestDocument.setFieldValue("data_stream.dataset", "foo");
53+
ingestDocument.setFieldValue("data_stream.namespace", "bar");
54+
55+
DataStreamRouterProcessor processor = new DataStreamRouterProcessor(null, null, null, null);
56+
processor.execute(ingestDocument);
57+
assertDataSetFields(ingestDocument, "logs", "foo", "bar");
58+
}
59+
60+
public void testInvalidDataStreamFieldsFromDocument() throws Exception {
61+
IngestDocument ingestDocument = createIngestDocument("logs-generic-default");
62+
ingestDocument.setFieldValue("data_stream.dataset", "foo-bar");
63+
ingestDocument.setFieldValue("data_stream.namespace", "baz#qux");
64+
65+
DataStreamRouterProcessor processor = new DataStreamRouterProcessor(null, null, null, null);
66+
processor.execute(ingestDocument);
67+
assertDataSetFields(ingestDocument, "logs", "foo_bar", "baz_qux");
68+
}
69+
70+
private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) {
71+
assertThat(ingestDocument.getFieldValue("data_stream.type", String.class), equalTo(type));
72+
assertThat(ingestDocument.getFieldValue("data_stream.dataset", String.class), equalTo(dataset));
73+
assertThat(ingestDocument.getFieldValue("data_stream.namespace", String.class), equalTo(namespace));
74+
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo(type + "-" + dataset + "-" + namespace));
75+
}
76+
77+
private static IngestDocument createIngestDocument(String dataStream) {
78+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
79+
ingestDocument.setFieldValue("_index", dataStream);
80+
return ingestDocument;
81+
}
82+
83+
private static class SkipProcessor implements WrappingProcessor {
84+
private final Processor processor;
85+
86+
SkipProcessor(Processor processor) {
87+
this.processor = processor;
88+
}
89+
90+
@Override
91+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
92+
return ingestDocument;
93+
}
94+
95+
@Override
96+
public Processor getInnerProcessor() {
97+
return processor;
98+
}
99+
100+
@Override
101+
public String getType() {
102+
return "skip";
103+
}
104+
105+
@Override
106+
public String getTag() {
107+
return null;
108+
}
109+
110+
@Override
111+
public String getDescription() {
112+
return null;
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)