Skip to content

Commit cdf2522

Browse files
authored
Introduce reroute method on IngestDocument (#94000)
1 parent 8a8bc4f commit cdf2522

File tree

7 files changed

+357
-53
lines changed

7 files changed

+357
-53
lines changed

docs/changelog/94000.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 94000
2+
summary: Introduce redirect method on IngestDocument
3+
area: Ingest Node
4+
type: enhancement
5+
issues:
6+
- 83653

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,7 @@ teardown:
8787
index: foo
8888
id: "1"
8989
- match: { _source.a: true }
90-
# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances
91-
# (See issue https://github.com/elastic/elasticsearch/issues/83653).
92-
# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior
93-
#- match: { _source.accumulator: [ "non-repeated-value" ] }
94-
- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] }
90+
- match: { _source.accumulator: [ "non-repeated-value" ] }
9591

9692
# only the foo index
9793
- do:
@@ -150,11 +146,7 @@ teardown:
150146
index: foo
151147
id: "1"
152148
- match: { _source.a: true }
153-
# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances
154-
# (See issue https://github.com/elastic/elasticsearch/issues/83653).
155-
# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior
156-
#- match: { _source.accumulator: [ "non-repeated-value" ] }
157-
- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] }
149+
- match: { _source.accumulator: [ "non-repeated-value" ] }
158150

159151
# only the foo index
160152
- do:

server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.env.Environment;
2929
import org.elasticsearch.env.NodeEnvironment;
3030
import org.elasticsearch.ingest.AbstractProcessor;
31+
import org.elasticsearch.ingest.ConfigurationUtils;
3132
import org.elasticsearch.ingest.IngestDocument;
3233
import org.elasticsearch.ingest.PipelineConfiguration;
3334
import org.elasticsearch.ingest.Processor;
@@ -48,6 +49,7 @@
4849
import java.util.Collection;
4950
import java.util.List;
5051
import java.util.Map;
52+
import java.util.Objects;
5153
import java.util.function.BiConsumer;
5254
import java.util.function.Supplier;
5355

@@ -97,6 +99,26 @@ public void testFinalPipelineCantChangeDestination() {
9799
);
98100
}
99101

102+
public void testFinalPipelineCantRerouteDestination() {
103+
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
104+
createIndex("index", settings);
105+
106+
final BytesReference finalPipelineBody = new BytesArray("""
107+
{"processors": [{"reroute": {}}]}""");
108+
client().admin().cluster().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet();
109+
110+
final IllegalStateException e = expectThrows(
111+
IllegalStateException.class,
112+
() -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get()
113+
);
114+
assertThat(
115+
e,
116+
hasToString(
117+
endsWith("final pipeline [final_pipeline] can't change the target index (from [index] to [target]) for document [1]")
118+
)
119+
);
120+
}
121+
100122
public void testFinalPipelineOfOldDestinationIsNotInvoked() {
101123
Settings settings = Settings.builder()
102124
.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
@@ -187,6 +209,73 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() {
187209
assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
188210
}
189211

212+
public void testDefaultPipelineOfRerouteDestinationIsInvoked() {
213+
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
214+
createIndex("index", settings);
215+
216+
settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
217+
createIndex("target", settings);
218+
219+
BytesReference defaultPipelineBody = new BytesArray("""
220+
{"processors": [{"reroute": {}}]}""");
221+
client().admin()
222+
.cluster()
223+
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
224+
.actionGet();
225+
226+
BytesReference targetPipeline = new BytesArray("""
227+
{"processors": [{"final": {}}]}""");
228+
client().admin()
229+
.cluster()
230+
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
231+
.actionGet();
232+
233+
IndexResponse indexResponse = client().prepareIndex("index")
234+
.setId("1")
235+
.setSource(Map.of("field", "value"))
236+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
237+
.get();
238+
assertEquals(RestStatus.CREATED, indexResponse.status());
239+
SearchResponse target = client().prepareSearch("target").get();
240+
assertEquals(1, target.getHits().getTotalHits().value);
241+
assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
242+
}
243+
244+
public void testAvoidIndexingLoop() {
245+
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
246+
createIndex("index", settings);
247+
248+
settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
249+
createIndex("target", settings);
250+
251+
BytesReference defaultPipelineBody = new BytesArray("""
252+
{"processors": [{"reroute": {"dest": "target"}}]}""");
253+
client().admin()
254+
.cluster()
255+
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
256+
.actionGet();
257+
258+
BytesReference targetPipeline = new BytesArray("""
259+
{"processors": [{"reroute": {"dest": "index"}}]}""");
260+
client().admin()
261+
.cluster()
262+
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
263+
.actionGet();
264+
265+
IllegalStateException exception = expectThrows(
266+
IllegalStateException.class,
267+
() -> client().prepareIndex("index")
268+
.setId("1")
269+
.setSource(Map.of("dest", "index"))
270+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
271+
.get()
272+
);
273+
assertThat(
274+
exception.getMessage(),
275+
equalTo("index cycle detected while processing pipeline [target_default_pipeline] for document [1]: [index, target, index]")
276+
);
277+
}
278+
190279
public void testFinalPipeline() {
191280
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
192281
createIndex("index", settings);
@@ -393,6 +482,26 @@ public String getType() {
393482
return "changing_dest";
394483
}
395484

485+
},
486+
"reroute",
487+
(processorFactories, tag, description, config) -> {
488+
final String dest = Objects.requireNonNullElse(
489+
ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"),
490+
"target"
491+
);
492+
return new AbstractProcessor(tag, description) {
493+
@Override
494+
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
495+
ingestDocument.reroute(dest);
496+
return ingestDocument;
497+
}
498+
499+
@Override
500+
public String getType() {
501+
return "reroute";
502+
}
503+
504+
};
396505
}
397506
);
398507
}

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
141141

142142
void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
143143
assert currentProcessor <= processorsWithMetrics.size();
144-
if (currentProcessor == processorsWithMetrics.size()) {
144+
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
145145
handler.accept(ingestDocument, null);
146146
return;
147147
}
@@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
150150
Processor processor;
151151
IngestMetric metric;
152152
// iteratively execute any sync processors
153-
while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) {
153+
while (currentProcessor < processorsWithMetrics.size()
154+
&& processorsWithMetrics.get(currentProcessor).v1().isAsync() == false
155+
&& ingestDocument.isReroute() == false) {
154156
processorWithMetric = processorsWithMetrics.get(currentProcessor);
155157
processor = processorWithMetric.v1();
156158
metric = processorWithMetric.v2();
@@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
176178
}
177179

178180
assert currentProcessor <= processorsWithMetrics.size();
179-
if (currentProcessor == processorsWithMetrics.size()) {
181+
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
180182
handler.accept(ingestDocument, null);
181183
return;
182184
}

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public final class IngestDocument {
6262

6363
// Contains all pipelines that have been executed for this document
6464
private final Set<String> executedPipelines = new LinkedHashSet<>();
65-
6665
private boolean doNoSelfReferencesCheck = false;
66+
private boolean reroute = false;
6767

6868
public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) {
6969
this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source);
@@ -80,6 +80,7 @@ public IngestDocument(IngestDocument other) {
8080
new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()),
8181
deepCopyMap(other.ingestMetadata)
8282
);
83+
this.reroute = other.reroute;
8384
}
8485

8586
/**
@@ -903,6 +904,29 @@ public String toString() {
903904
return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}';
904905
}
905906

907+
public void reroute(String destIndex) {
908+
getMetadata().setIndex(destIndex);
909+
reroute = true;
910+
}
911+
912+
/**
913+
* The document is redirected to another target.
914+
* This implies that we'll skip the current pipeline and invoke the default pipeline of the new target
915+
*
916+
* @return whether the document is redirected to another target
917+
*/
918+
boolean isReroute() {
919+
return reroute;
920+
}
921+
922+
/**
923+
* Set the {@link #reroute} flag to false so that subsequent calls to {@link #isReroute()} will return false until/unless
924+
* {@link #reroute(String)} is called.
925+
*/
926+
void resetReroute() {
927+
reroute = false;
928+
}
929+
906930
public enum Metadata {
907931
INDEX(IndexFieldMapper.NAME),
908932
TYPE("_type"),

0 commit comments

Comments
 (0)