From a66a11e8365e70128e5b7b0bc0197282f4729c95 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 21 Feb 2023 08:57:55 +0100 Subject: [PATCH 01/22] Introduce redirect method on IngestDocument - Overrides _index - Skips current pipeline - Invokes default pipeline of new index --- .../elasticsearch/index/FinalPipelineIT.java | 86 +++++++++++++++ .../ingest/CompoundProcessor.java | 8 +- .../elasticsearch/ingest/IngestDocument.java | 18 +++ .../elasticsearch/ingest/IngestService.java | 103 ++++++++++++++---- .../ingest/CompoundProcessorTests.java | 79 ++++++++++++++ 5 files changed, 267 insertions(+), 27 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 98ce3933f9d3c..504d78eaf47c8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.ingest.Processor; @@ -49,6 +50,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -188,6 +190,70 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() { assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); } + public void testDefaultPipelineOfRedirectDestinationIsInvoked() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"redirect": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"final": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + + IndexResponse indexResponse = client().prepareIndex("index") + .setId("1") + .setSource(Map.of("field", "value")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + SearchResponse target = client().prepareSearch("target").get(); + assertEquals(1, target.getHits().getTotalHits().value); + assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); + } + + public void testAvoidIndexingLoop() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"redirect": {"dest": "target"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"redirect": {"dest": "index"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + + IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> client().prepareIndex("index") + .setId("1") + .setSource(Map.of("dest", "index")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get() + ); + assertThat(exception.getMessage(), containsString("index cycle detected while processing pipelines: [index, target, index]")); + } + public void testFinalPipeline() { final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); createIndex("index", settings); @@ -394,6 +460,26 @@ public String getType() { return "changing_dest"; } + }, + "redirect", + (processorFactories, tag, description, config) -> { + final String dest = Objects.requireNonNullElse( + ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"), + "target" + ); + return new AbstractProcessor(tag, description) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.redirect(dest); + return ingestDocument; + } + + @Override + public String getType() { + return "redirect"; + } + + }; } ); } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 2910ab11e8c94..459e6c084e798 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); return; } @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC Processor processor; IngestMetric metric; // iteratively execute any sync processors - while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) { + while (currentProcessor < processorsWithMetrics.size() + && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false + && ingestDocument.isSkipCurrentPipeline() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC } assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); return; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index f471926087ae5..cdbcc4d5e412b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -62,8 +62,10 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); + private boolean skipCurrentPipeline = false; private boolean doNoSelfReferencesCheck = false; + private boolean invokeDefaultPipelineOfDestination = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); @@ -80,6 +82,7 @@ public IngestDocument(IngestDocument other) { new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); + this.invokeDefaultPipelineOfDestination = other.invokeDefaultPipelineOfDestination; } /** @@ -838,6 +841,7 @@ public void executePipeline(Pipeline pipeline, BiConsumer { + skipCurrentPipeline = false; executedPipelines.remove(pipeline.getId()); if (previousPipeline != null) { ingestMetadata.put("pipeline", previousPipeline); @@ -903,6 +907,20 @@ public String toString() { return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}'; } + public void redirect(String destIndex) { + getMetadata().setIndex(destIndex); + invokeDefaultPipelineOfDestination = true; + skipCurrentPipeline = true; + } + + public boolean isInvokeDefaultPipelineOfDestination() { + return invokeDefaultPipelineOfDestination; + } + + public boolean isSkipCurrentPipeline() { + return skipCurrentPipeline; + } + public enum Metadata { INDEX(IndexFieldMapper.NAME), TYPE("_type"), diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 0e53b6a39f0fd..9873fd997fd39 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -68,6 +68,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -714,21 +715,8 @@ protected void doRun() { continue; } - final String pipelineId = indexRequest.getPipeline(); - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - final String finalPipelineId = indexRequest.getFinalPipeline(); - indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - boolean hasFinalPipeline = true; - final List pipelines; - if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false - && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(pipelineId, finalPipelineId); - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - pipelines = List.of(pipelineId); - hasFinalPipeline = false; - } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(finalPipelineId); - } else { + Pipelines pipelines = getPipelines(indexRequest); + if (pipelines.isEmpty()) { i++; continue; } @@ -763,8 +751,16 @@ public void onFailure(Exception e) { }); IngestDocument ingestDocument = newIngestDocument(indexRequest); - executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener); - + LinkedHashSet indexRecursionDetection = new LinkedHashSet<>(); + indexRecursionDetection.add(indexRequest.index()); + executePipelines( + pipelines.iterator(), + pipelines.hasFinalPipeline(), + indexRequest, + ingestDocument, + documentListener, + indexRecursionDetection + ); i++; } } @@ -772,12 +768,61 @@ public void onFailure(Exception e) { }); } + private Pipelines getPipelines(IndexRequest indexRequest) { + final String pipelineId = indexRequest.getPipeline(); + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + final String finalPipelineId = indexRequest.getFinalPipeline(); + indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); + return new Pipelines(pipelineId, finalPipelineId); + } + + private static class Pipelines implements Iterable { + private String defaultPipeline; + private String finalPipeline; + + private Pipelines(String defaultPipeline, String finalPipeline) { + if (NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + this.defaultPipeline = defaultPipeline; + } + if (NOOP_PIPELINE_NAME.equals(finalPipeline) == false) { + this.finalPipeline = finalPipeline; + } + } + + public boolean hasFinalPipeline() { + return finalPipeline != null; + } + + public boolean isEmpty() { + return defaultPipeline == null && finalPipeline == null; + } + + public void withoutDefaultPipeline() { + defaultPipeline = null; + } + + @Override + public Iterator iterator() { + if (defaultPipeline != null && finalPipeline != null) { + return List.of(defaultPipeline, finalPipeline).iterator(); + } + if (finalPipeline != null) { + return List.of(finalPipeline).iterator(); + } + if (defaultPipeline != null) { + return List.of(defaultPipeline).iterator(); + } + return Collections.emptyIterator(); + } + } + private void executePipelines( final Iterator pipelineIds, final boolean hasFinalPipeline, final IndexRequest indexRequest, final IngestDocument ingestDocument, - final ActionListener listener + final ActionListener listener, + final Set indexRecursionDetection ) { assert pipelineIds.hasNext(); final String pipelineId = pipelineIds.next(); @@ -840,6 +885,14 @@ private void executePipelines( final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { + if (indexRecursionDetection.add(newIndex) == false) { + List indexRoute = new ArrayList<>(indexRecursionDetection); + indexRoute.add(newIndex); + listener.onFailure( + new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute)) + ); + return; // document failed! + } if (hasFinalPipeline && pipelineIds.hasNext() == false) { listener.onFailure( new IllegalStateException( @@ -854,19 +907,21 @@ private void executePipelines( ); return; // document failed! } else { + // reset request pipeline that is set to _none which would take precedence over the default pipeline + indexRequest.setPipeline(null); indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); - if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); - newHasFinalPipeline = true; - } else { - newPipelineIds = Collections.emptyIterator(); + Pipelines pipelines = getPipelines(indexRequest); + if (ingestDocument.isInvokeDefaultPipelineOfDestination() == false) { + pipelines.withoutDefaultPipeline(); } + newHasFinalPipeline = pipelines.hasFinalPipeline(); + newPipelineIds = pipelines.iterator(); } } if (newPipelineIds.hasNext()) { - executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener); + executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener, indexRecursionDetection); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 4bc581594d8a4..09b8c206fc135 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -532,6 +532,85 @@ public void testMultipleProcessorsDoNotIgnoreFailures() { } } + public void testSkipPipeline() { + TestProcessor processor1 = new TestProcessor(doc -> doc.redirect("foo")); + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipAsyncProcessor() { + TestProcessor processor1 = new TestProcessor(doc -> doc.redirect("foo")) { + @Override + public boolean isAsync() { + return true; + } + }; + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipProcessorIgnoreFailure() { + TestProcessor processor1 = new TestProcessor(doc -> { + doc.redirect("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor processor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(true, List.of(processor1, processor2), List.of(), relativeTimeProvider); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testDontSkipFailureProcessor() { + TestProcessor processor = new TestProcessor(doc -> { + doc.redirect("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor failureProcessor1 = new TestProcessor(doc -> {}); + TestProcessor failureProcessor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor), + List.of(failureProcessor1, failureProcessor2), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(failureProcessor1.getInvokedCounter(), equalTo(1)); + assertThat(failureProcessor2.getInvokedCounter(), equalTo(1)); + } + private TestProcessor getTestProcessor(String tag, boolean isAsync, boolean shouldThrowException) { return new TestProcessor( tag, From 0a41d944ee67a7c0d7fdd5963f22106c35877b44 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Wed, 22 Feb 2023 09:23:39 +0100 Subject: [PATCH 02/22] Add changelog --- docs/changelog/94000.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/94000.yaml diff --git a/docs/changelog/94000.yaml b/docs/changelog/94000.yaml new file mode 100644 index 0000000000000..a67dce242960b --- /dev/null +++ b/docs/changelog/94000.yaml @@ -0,0 +1,5 @@ +pr: 94000 +summary: Introduce redirect method on IngestDocument +area: Ingest Node +type: enhancement +issues: [] From 47581d565afefa3af56cac2f5d0d179f43601b4a Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Wed, 22 Feb 2023 16:00:36 +0100 Subject: [PATCH 03/22] Skipp full pipeline even if invoked via pipeline processor --- .../org/elasticsearch/ingest/IngestDocument.java | 13 ++++++++----- .../org/elasticsearch/ingest/IngestService.java | 1 + 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index cdbcc4d5e412b..241dde13f13a4 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -62,10 +62,9 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); - private boolean skipCurrentPipeline = false; - private boolean doNoSelfReferencesCheck = false; private boolean invokeDefaultPipelineOfDestination = false; + private boolean skipCurrentPipeline = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); @@ -83,6 +82,7 @@ public IngestDocument(IngestDocument other) { deepCopyMap(other.ingestMetadata) ); this.invokeDefaultPipelineOfDestination = other.invokeDefaultPipelineOfDestination; + this.skipCurrentPipeline = other.skipCurrentPipeline; } /** @@ -841,7 +841,6 @@ public void executePipeline(Pipeline pipeline, BiConsumer { - skipCurrentPipeline = false; executedPipelines.remove(pipeline.getId()); if (previousPipeline != null) { ingestMetadata.put("pipeline", previousPipeline); @@ -913,14 +912,18 @@ public void redirect(String destIndex) { skipCurrentPipeline = true; } - public boolean isInvokeDefaultPipelineOfDestination() { + boolean isInvokeDefaultPipelineOfDestination() { return invokeDefaultPipelineOfDestination; } - public boolean isSkipCurrentPipeline() { + boolean isSkipCurrentPipeline() { return skipCurrentPipeline; } + void resetPipelineSkipping() { + skipCurrentPipeline = false; + } + public enum Metadata { INDEX(IndexFieldMapper.NAME), TYPE("_type"), diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9873fd997fd39..07e8cff9fa030 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -835,6 +835,7 @@ private void executePipelines( final String originalIndex = indexRequest.indices()[0]; executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; + ingestDocument.resetPipelineSkipping(); if (e != null) { logger.debug( From fc34a285d43c705a25780ff14ea9105e10ca75f0 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 23 Feb 2023 09:49:25 +0100 Subject: [PATCH 04/22] Encapsulate more state in PipelineIterator --- .../elasticsearch/ingest/IngestService.java | 103 ++++++++++-------- 1 file changed, 57 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 07e8cff9fa030..10ad9853c38d8 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -715,8 +715,8 @@ protected void doRun() { continue; } - Pipelines pipelines = getPipelines(indexRequest); - if (pipelines.isEmpty()) { + PipelineIterator pipelines = getPipelines(indexRequest); + if (pipelines.hasNext() == false) { i++; continue; } @@ -754,8 +754,7 @@ public void onFailure(Exception e) { LinkedHashSet indexRecursionDetection = new LinkedHashSet<>(); indexRecursionDetection.add(indexRequest.index()); executePipelines( - pipelines.iterator(), - pipelines.hasFinalPipeline(), + pipelines, indexRequest, ingestDocument, documentListener, @@ -768,41 +767,31 @@ public void onFailure(Exception e) { }); } - private Pipelines getPipelines(IndexRequest indexRequest) { + private PipelineIterator getPipelines(IndexRequest indexRequest) { final String pipelineId = indexRequest.getPipeline(); indexRequest.setPipeline(NOOP_PIPELINE_NAME); final String finalPipelineId = indexRequest.getFinalPipeline(); indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - return new Pipelines(pipelineId, finalPipelineId); + return new PipelineIterator(pipelineId, finalPipelineId); } - private static class Pipelines implements Iterable { - private String defaultPipeline; - private String finalPipeline; + private class PipelineIterator implements Iterator { + private final String defaultPipeline; + private final String finalPipeline; + private final Iterator pipelineIds; + private String current; - private Pipelines(String defaultPipeline, String finalPipeline) { - if (NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { - this.defaultPipeline = defaultPipeline; - } - if (NOOP_PIPELINE_NAME.equals(finalPipeline) == false) { - this.finalPipeline = finalPipeline; - } - } - - public boolean hasFinalPipeline() { - return finalPipeline != null; - } - - public boolean isEmpty() { - return defaultPipeline == null && finalPipeline == null; + private PipelineIterator(String defaultPipeline, String finalPipeline) { + this.defaultPipeline = NOOP_PIPELINE_NAME.equals(defaultPipeline) ? null : defaultPipeline; + this.finalPipeline = NOOP_PIPELINE_NAME.equals(finalPipeline) ? null : finalPipeline; + this.pipelineIds = iterator(); } - public void withoutDefaultPipeline() { - defaultPipeline = null; + public PipelineIterator withoutDefaultPipeline() { + return new PipelineIterator(null, finalPipeline); } - @Override - public Iterator iterator() { + private Iterator iterator() { if (defaultPipeline != null && finalPipeline != null) { return List.of(defaultPipeline, finalPipeline).iterator(); } @@ -814,24 +803,49 @@ public Iterator iterator() { } return Collections.emptyIterator(); } + + @Override + public boolean hasNext() { + return pipelineIds.hasNext(); + } + + @Override + public String next() { + current = pipelineIds.next(); + return current; + } + + public Pipeline currentPipeline() { + if (current == null) { + throw new IllegalStateException("Invoked before next"); + } + final PipelineHolder holder = pipelines.get(current); + if (holder == null) { + throw new IllegalArgumentException("pipeline with id [" + current + "] does not exist"); + } + return holder.pipeline; + } + + public boolean isFinalPipeline() { + return hasFinalPipeline() && pipelineIds.hasNext() == false; + } + + private boolean hasFinalPipeline() { + return finalPipeline != null; + } } private void executePipelines( - final Iterator pipelineIds, - final boolean hasFinalPipeline, + final PipelineIterator pipelines, final IndexRequest indexRequest, final IngestDocument ingestDocument, final ActionListener listener, final Set indexRecursionDetection ) { - assert pipelineIds.hasNext(); - final String pipelineId = pipelineIds.next(); + assert pipelines.hasNext(); + final String pipelineId = pipelines.next(); try { - final PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); - } - final Pipeline pipeline = holder.pipeline; + final Pipeline pipeline = pipelines.currentPipeline(); final String originalIndex = indexRequest.indices()[0]; executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; @@ -881,8 +895,7 @@ private void executePipelines( return; // document failed! } - Iterator newPipelineIds = pipelineIds; - boolean newHasFinalPipeline = hasFinalPipeline; + PipelineIterator newPipelines = pipelines; final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { @@ -894,7 +907,7 @@ private void executePipelines( ); return; // document failed! } - if (hasFinalPipeline && pipelineIds.hasNext() == false) { + if (pipelines.isFinalPipeline()) { listener.onFailure( new IllegalStateException( format( @@ -912,17 +925,15 @@ private void executePipelines( indexRequest.setPipeline(null); indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); - Pipelines pipelines = getPipelines(indexRequest); + newPipelines = getPipelines(indexRequest); if (ingestDocument.isInvokeDefaultPipelineOfDestination() == false) { - pipelines.withoutDefaultPipeline(); + newPipelines = newPipelines.withoutDefaultPipeline(); } - newHasFinalPipeline = pipelines.hasFinalPipeline(); - newPipelineIds = pipelines.iterator(); } } - if (newPipelineIds.hasNext()) { - executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener, indexRecursionDetection); + if (newPipelines.hasNext()) { + executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); From b1c7b266bf9bfd52b896c900402726606b0f6900 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 23 Feb 2023 10:00:42 +0100 Subject: [PATCH 05/22] Only one boolean flag in IngestDocument --- .../ingest/CompoundProcessor.java | 7 ++--- .../elasticsearch/ingest/IngestDocument.java | 30 ++++++++++--------- .../elasticsearch/ingest/IngestService.java | 4 +-- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 459e6c084e798..940c257b3a1fc 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isRedirect()) { handler.accept(ingestDocument, null); return; } @@ -151,8 +151,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC IngestMetric metric; // iteratively execute any sync processors while (currentProcessor < processorsWithMetrics.size() - && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false - && ingestDocument.isSkipCurrentPipeline() == false) { + && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false && ingestDocument.isRedirect() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -178,7 +177,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC } assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isRedirect()) { handler.accept(ingestDocument, null); return; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 241dde13f13a4..783356e0eedd9 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -63,8 +63,7 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); private boolean doNoSelfReferencesCheck = false; - private boolean invokeDefaultPipelineOfDestination = false; - private boolean skipCurrentPipeline = false; + private boolean redirect = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); @@ -81,8 +80,7 @@ public IngestDocument(IngestDocument other) { new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); - this.invokeDefaultPipelineOfDestination = other.invokeDefaultPipelineOfDestination; - this.skipCurrentPipeline = other.skipCurrentPipeline; + this.redirect = other.redirect; } /** @@ -908,20 +906,24 @@ public String toString() { public void redirect(String destIndex) { getMetadata().setIndex(destIndex); - invokeDefaultPipelineOfDestination = true; - skipCurrentPipeline = true; + redirect = true; } - boolean isInvokeDefaultPipelineOfDestination() { - return invokeDefaultPipelineOfDestination; - } - - boolean isSkipCurrentPipeline() { - return skipCurrentPipeline; + /** + * The document is redirected to another target. + * This implies that we'll skip the current pipeline and invoke the default pipeline of the new target + * + * @return whether the document is redirected to another target + */ + boolean isRedirect() { + return redirect; } - void resetPipelineSkipping() { - skipCurrentPipeline = false; + /** + * Invoked after the pipeline for the initial target has been skipped to avoid that the pipeline of the new target is skipped as well. + */ + void resetRedirect() { + redirect = false; } public enum Metadata { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 10ad9853c38d8..906c85b589c7b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -849,7 +849,7 @@ private void executePipelines( final String originalIndex = indexRequest.indices()[0]; executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; - ingestDocument.resetPipelineSkipping(); + ingestDocument.resetRedirect(); if (e != null) { logger.debug( @@ -926,7 +926,7 @@ private void executePipelines( indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); newPipelines = getPipelines(indexRequest); - if (ingestDocument.isInvokeDefaultPipelineOfDestination() == false) { + if (ingestDocument.isRedirect() == false) { newPipelines = newPipelines.withoutDefaultPipeline(); } } From d766c6205fc8150e53fd2cc673a33d0290f3bde2 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 23 Feb 2023 10:10:11 +0100 Subject: [PATCH 06/22] Reset redirect at the end of the handler --- .../src/main/java/org/elasticsearch/ingest/IngestService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 906c85b589c7b..f6a980ae801cb 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -849,7 +849,6 @@ private void executePipelines( final String originalIndex = indexRequest.indices()[0]; executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; - ingestDocument.resetRedirect(); if (e != null) { logger.debug( @@ -933,6 +932,7 @@ private void executePipelines( } if (newPipelines.hasNext()) { + ingestDocument.resetRedirect(); executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection); } else { // update the index request's source and (potentially) cache the timestamp for TSDB From 5dd4d271919adc0d5840de27641e826019098f40 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 23 Feb 2023 10:25:47 +0100 Subject: [PATCH 07/22] Apply spotless suggestions --- .../java/org/elasticsearch/ingest/CompoundProcessor.java | 3 ++- .../main/java/org/elasticsearch/ingest/IngestService.java | 8 +------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 940c257b3a1fc..bcc35035f58b9 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -151,7 +151,8 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC IngestMetric metric; // iteratively execute any sync processors while (currentProcessor < processorsWithMetrics.size() - && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false && ingestDocument.isRedirect() == false) { + && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false + && ingestDocument.isRedirect() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index f6a980ae801cb..36e9251ee37bf 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -753,13 +753,7 @@ public void onFailure(Exception e) { IngestDocument ingestDocument = newIngestDocument(indexRequest); LinkedHashSet indexRecursionDetection = new LinkedHashSet<>(); indexRecursionDetection.add(indexRequest.index()); - executePipelines( - pipelines, - indexRequest, - ingestDocument, - documentListener, - indexRecursionDetection - ); + executePipelines(pipelines, indexRequest, ingestDocument, documentListener, indexRecursionDetection); i++; } } From 3b6472721e8e4653f14ecd41b0c5bd79a34b3e09 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 23 Feb 2023 11:14:30 +0100 Subject: [PATCH 08/22] Rename method and add javadoc --- .../java/org/elasticsearch/ingest/IngestService.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 36e9251ee37bf..289498ce0b7a1 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -820,8 +820,12 @@ public Pipeline currentPipeline() { return holder.pipeline; } - public boolean isFinalPipeline() { - return hasFinalPipeline() && pipelineIds.hasNext() == false; + /** + * Whether the {@linkplain #currentPipeline() current pipeline} of this iterator represents the + * {@linkplain IndexRequest#getFinalPipeline() final pipeline of the index request}. + */ + public boolean isCurrentPipelineFinalPipeline() { + return hasFinalPipeline() && hasNext() == false; } private boolean hasFinalPipeline() { @@ -900,7 +904,7 @@ private void executePipelines( ); return; // document failed! } - if (pipelines.isFinalPipeline()) { + if (pipelines.isCurrentPipelineFinalPipeline()) { listener.onFailure( new IllegalStateException( format( From 1bfcf5540aa362f867fb694594317b21c1a3b6a8 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Wed, 1 Mar 2023 07:50:59 +0100 Subject: [PATCH 09/22] Reroute to remain --- .../org/elasticsearch/index/FinalPipelineIT.java | 14 +++++++------- .../elasticsearch/ingest/CompoundProcessor.java | 6 +++--- .../org/elasticsearch/ingest/IngestDocument.java | 16 ++++++++-------- .../org/elasticsearch/ingest/IngestService.java | 4 ++-- .../ingest/CompoundProcessorTests.java | 8 ++++---- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 504d78eaf47c8..2cd1d93ad7963 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -190,7 +190,7 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() { assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); } - public void testDefaultPipelineOfRedirectDestinationIsInvoked() { + public void testDefaultPipelineOfRerouteDestinationIsInvoked() { Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); createIndex("index", settings); @@ -198,7 +198,7 @@ public void testDefaultPipelineOfRedirectDestinationIsInvoked() { createIndex("target", settings); BytesReference defaultPipelineBody = new BytesArray(""" - {"processors": [{"redirect": {}}]}"""); + {"processors": [{"reroute": {}}]}"""); client().admin() .cluster() .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) @@ -230,14 +230,14 @@ public void testAvoidIndexingLoop() { createIndex("target", settings); BytesReference defaultPipelineBody = new BytesArray(""" - {"processors": [{"redirect": {"dest": "target"}}]}"""); + {"processors": [{"reroute": {"dest": "target"}}]}"""); client().admin() .cluster() .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) .actionGet(); BytesReference targetPipeline = new BytesArray(""" - {"processors": [{"redirect": {"dest": "index"}}]}"""); + {"processors": [{"reroute": {"dest": "index"}}]}"""); client().admin() .cluster() .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) @@ -461,7 +461,7 @@ public String getType() { } }, - "redirect", + "reroute", (processorFactories, tag, description, config) -> { final String dest = Objects.requireNonNullElse( ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"), @@ -470,13 +470,13 @@ public String getType() { return new AbstractProcessor(tag, description) { @Override public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { - ingestDocument.redirect(dest); + ingestDocument.reroute(dest); return ingestDocument; } @Override public String getType() { - return "redirect"; + return "reroute"; } }; diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index bcc35035f58b9..3aa9a68bd3d3a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isRedirect()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) { handler.accept(ingestDocument, null); return; } @@ -152,7 +152,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC // iteratively execute any sync processors while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false - && ingestDocument.isRedirect() == false) { + && ingestDocument.isReroute() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -178,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC } assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isRedirect()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) { handler.accept(ingestDocument, null); return; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 783356e0eedd9..c8a53fd688266 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -63,7 +63,7 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); private boolean doNoSelfReferencesCheck = false; - private boolean redirect = false; + private boolean reroute = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); @@ -80,7 +80,7 @@ public IngestDocument(IngestDocument other) { new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); - this.redirect = other.redirect; + this.reroute = other.reroute; } /** @@ -904,9 +904,9 @@ public String toString() { return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}'; } - public void redirect(String destIndex) { + public void reroute(String destIndex) { getMetadata().setIndex(destIndex); - redirect = true; + reroute = true; } /** @@ -915,15 +915,15 @@ public void redirect(String destIndex) { * * @return whether the document is redirected to another target */ - boolean isRedirect() { - return redirect; + boolean isReroute() { + return reroute; } /** * Invoked after the pipeline for the initial target has been skipped to avoid that the pipeline of the new target is skipped as well. */ - void resetRedirect() { - redirect = false; + void resetReroute() { + reroute = false; } public enum Metadata { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 289498ce0b7a1..d7c002c796a6c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -923,14 +923,14 @@ private void executePipelines( indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); newPipelines = getPipelines(indexRequest); - if (ingestDocument.isRedirect() == false) { + if (ingestDocument.isReroute() == false) { newPipelines = newPipelines.withoutDefaultPipeline(); } } } if (newPipelines.hasNext()) { - ingestDocument.resetRedirect(); + ingestDocument.resetReroute(); executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection); } else { // update the index request's source and (potentially) cache the timestamp for TSDB diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 09b8c206fc135..327649a9819ba 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -533,7 +533,7 @@ public void testMultipleProcessorsDoNotIgnoreFailures() { } public void testSkipPipeline() { - TestProcessor processor1 = new TestProcessor(doc -> doc.redirect("foo")); + TestProcessor processor1 = new TestProcessor(doc -> doc.reroute("foo")); TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); LongSupplier relativeTimeProvider = mock(LongSupplier.class); when(relativeTimeProvider.getAsLong()).thenReturn(0L); @@ -551,7 +551,7 @@ public void testSkipPipeline() { } public void testSkipAsyncProcessor() { - TestProcessor processor1 = new TestProcessor(doc -> doc.redirect("foo")) { + TestProcessor processor1 = new TestProcessor(doc -> doc.reroute("foo")) { @Override public boolean isAsync() { return true; @@ -575,7 +575,7 @@ public boolean isAsync() { public void testSkipProcessorIgnoreFailure() { TestProcessor processor1 = new TestProcessor(doc -> { - doc.redirect("foo"); + doc.reroute("foo"); throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); }); TestProcessor processor2 = new TestProcessor(doc -> {}); @@ -591,7 +591,7 @@ public void testSkipProcessorIgnoreFailure() { public void testDontSkipFailureProcessor() { TestProcessor processor = new TestProcessor(doc -> { - doc.redirect("foo"); + doc.reroute("foo"); throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); }); TestProcessor failureProcessor1 = new TestProcessor(doc -> {}); From 98d7c94531a8691f365242932372b57f4a15ba19 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Wed, 1 Mar 2023 07:56:33 +0100 Subject: [PATCH 10/22] Add test that final pipeline can't reroute --- .../elasticsearch/index/FinalPipelineIT.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 2cd1d93ad7963..7a2c96af152f5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -100,6 +100,26 @@ public void testFinalPipelineCantChangeDestination() { ); } + public void testFinalPipelineCantRerouteDestination() { + final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); + createIndex("index", settings); + + final BytesReference finalPipelineBody = new BytesArray(""" + {"processors": [{"reroute": {}}]}"""); + client().admin().cluster().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet(); + + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get() + ); + assertThat( + e, + hasToString( + endsWith("final pipeline [final_pipeline] can't change the target index (from [index] to [target]) for document [1]") + ) + ); + } + public void testFinalPipelineOfOldDestinationIsNotInvoked() { Settings settings = Settings.builder() .put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") From 8ef09c03fdee14e21b470eb5b05abe20136f9f1a Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 7 Mar 2023 08:52:48 -0500 Subject: [PATCH 11/22] Update test Since this PR fixes the linked bug --- .../test/ingest/230_change_target_index.yml | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml index d419e57054e85..cf8e48c54427a 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml @@ -81,11 +81,7 @@ teardown: index: foo id: "1" - match: { _source.a: true } -# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances -# (See issue https://github.com/elastic/elasticsearch/issues/83653). -# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior -#- match: { _source.accumulator: [ "non-repeated-value" ] } -- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] } +- match: { _source.accumulator: [ "non-repeated-value" ] } # only the foo index - do: @@ -144,11 +140,7 @@ teardown: index: foo id: "1" - match: { _source.a: true } -# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances -# (See issue https://github.com/elastic/elasticsearch/issues/83653). -# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior -#- match: { _source.accumulator: [ "non-repeated-value" ] } -- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] } +- match: { _source.accumulator: [ "non-repeated-value" ] } # only the foo index - do: From f1a3b3e762eed14cfb7d1d94322578fb07a8280c Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 7 Mar 2023 14:54:58 +0100 Subject: [PATCH 12/22] Update docs/changelog/94000.yaml --- docs/changelog/94000.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/changelog/94000.yaml b/docs/changelog/94000.yaml index a67dce242960b..debbf2fd205c7 100644 --- a/docs/changelog/94000.yaml +++ b/docs/changelog/94000.yaml @@ -2,4 +2,5 @@ pr: 94000 summary: Introduce redirect method on IngestDocument area: Ingest Node type: enhancement -issues: [] +issues: + - 83653 From 117d89c817c4ce5043bc4a6c365514086c89bedb Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 7 Mar 2023 16:19:49 -0500 Subject: [PATCH 13/22] Move this resetReroute call earlier --- .../main/java/org/elasticsearch/ingest/IngestDocument.java | 3 ++- .../main/java/org/elasticsearch/ingest/IngestService.java | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index c8a53fd688266..07f7856323fb7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -920,7 +920,8 @@ boolean isReroute() { } /** - * Invoked after the pipeline for the initial target has been skipped to avoid that the pipeline of the new target is skipped as well. + * Set the {@link #reroute} flag to false so that subsequent calls to {@link #isReroute()} will return false until/unless + * {@link #reroute(String)} is called. */ void resetReroute() { reroute = false; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9325b4d203d18..c8dc0703c0256 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -842,6 +842,10 @@ private void executePipelines( ) { assert pipelines.hasNext(); final String pipelineId = pipelines.next(); + + // reset the reroute flag, at the start of a new pipeline execution this document hasn't been rerouted yet + ingestDocument.resetReroute(); + try { final Pipeline pipeline = pipelines.currentPipeline(); final String originalIndex = indexRequest.indices()[0]; @@ -930,7 +934,6 @@ private void executePipelines( } if (newPipelines.hasNext()) { - ingestDocument.resetReroute(); executePipelines(newPipelines, indexRequest, ingestDocument, listener, indexRecursionDetection); } else { // update the index request's source and (potentially) cache the timestamp for TSDB From 5bc367fce506490cb4c8f45f4d8a511e473cbdb0 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 7 Mar 2023 16:22:39 -0500 Subject: [PATCH 14/22] Pull this block out of the else --- .../elasticsearch/ingest/IngestService.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index c8dc0703c0256..55847e56b7daf 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -921,15 +921,15 @@ private void executePipelines( ) ); return; // document failed! - } else { - // reset request pipeline that is set to _none which would take precedence over the default pipeline - indexRequest.setPipeline(null); - indexRequest.isPipelineResolved(false); - resolvePipelines(null, indexRequest, state.metadata()); - newPipelines = getPipelines(indexRequest); - if (ingestDocument.isReroute() == false) { - newPipelines = newPipelines.withoutDefaultPipeline(); - } + } + + // reset request pipeline that is set to _none which would take precedence over the default pipeline + indexRequest.setPipeline(null); + indexRequest.isPipelineResolved(false); + resolvePipelines(null, indexRequest, state.metadata()); + newPipelines = getPipelines(indexRequest); + if (ingestDocument.isReroute() == false) { + newPipelines = newPipelines.withoutDefaultPipeline(); } } From 61e56171a5e13c5a3da19b0d24f8080355de8528 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 7 Mar 2023 16:27:15 -0500 Subject: [PATCH 15/22] Reorder these blocks If a final pipeline changes the indices such that a cycle is created, it's more important to error that a final pipeline changed the indices than that a cycle was created. --- .../org/elasticsearch/ingest/IngestService.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 55847e56b7daf..34e46172642c4 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -900,14 +900,6 @@ private void executePipelines( final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { - if (indexRecursionDetection.add(newIndex) == false) { - List indexRoute = new ArrayList<>(indexRecursionDetection); - indexRoute.add(newIndex); - listener.onFailure( - new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute)) - ); - return; // document failed! - } if (pipelines.isCurrentPipelineFinalPipeline()) { listener.onFailure( new IllegalStateException( @@ -923,6 +915,15 @@ private void executePipelines( return; // document failed! } + if (indexRecursionDetection.add(newIndex) == false) { + List indexRoute = new ArrayList<>(indexRecursionDetection); + indexRoute.add(newIndex); + listener.onFailure( + new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute)) + ); + return; // document failed! + } + // reset request pipeline that is set to _none which would take precedence over the default pipeline indexRequest.setPipeline(null); indexRequest.isPipelineResolved(false); From 35983d5d799443becf59d62a6798d84ee53c0a97 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 7 Mar 2023 16:36:13 -0500 Subject: [PATCH 16/22] Add/tweak comments --- .../main/java/org/elasticsearch/ingest/IngestService.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 34e46172642c4..c75c2f7709b2a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -900,6 +900,7 @@ private void executePipelines( final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { + // final pipelines cannot change the target index (either directly or by way of a reroute) if (pipelines.isCurrentPipelineFinalPipeline()) { listener.onFailure( new IllegalStateException( @@ -915,6 +916,7 @@ private void executePipelines( return; // document failed! } + // check for cycles in the visited indices if (indexRecursionDetection.add(newIndex) == false) { List indexRoute = new ArrayList<>(indexRecursionDetection); indexRoute.add(newIndex); @@ -924,11 +926,14 @@ private void executePipelines( return; // document failed! } - // reset request pipeline that is set to _none which would take precedence over the default pipeline + // clear the current pipeline, then re-resolve the pipelines for this request indexRequest.setPipeline(null); indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); newPipelines = getPipelines(indexRequest); + + // for backwards compatibility, when a pipeline changes the target index for a document without using the reroute + // mechanism, do not invoke the default pipeline of the new target index if (ingestDocument.isReroute() == false) { newPipelines = newPipelines.withoutDefaultPipeline(); } From 5dfebb69e25edf33546eeef3b11b66cf27ae26b3 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 7 Mar 2023 16:55:54 -0500 Subject: [PATCH 17/22] Add more context to error message --- .../java/org/elasticsearch/index/FinalPipelineIT.java | 5 ++++- .../java/org/elasticsearch/ingest/IngestService.java | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 37f4fc46350ad..003d8bcf9fafe 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -270,7 +270,10 @@ public void testAvoidIndexingLoop() { .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get() ); - assertThat(exception.getMessage(), containsString("index cycle detected while processing pipelines: [index, target, index]")); + assertThat( + exception.getMessage(), + equalTo("index cycle detected while processing pipeline [target_default_pipeline] for document [1]: [index, target, index]") + ); } public void testFinalPipeline() { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index c75c2f7709b2a..378cfc62ec854 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -921,7 +921,14 @@ private void executePipelines( List indexRoute = new ArrayList<>(indexRecursionDetection); indexRoute.add(newIndex); listener.onFailure( - new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute)) + new IllegalStateException( + format( + "index cycle detected while processing pipeline [%s] for document [%s]: %s", + pipelineId, + indexRequest.id(), + indexRoute + ) + ) ); return; // document failed! } From 2dc42e17c0c97b876e54eabdf951447e72528c12 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Wed, 8 Mar 2023 19:02:01 +0100 Subject: [PATCH 18/22] Adjust test to assert that final pipeline is not executed twice --- .../test/ingest/230_change_target_index.yml | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml index 6a6e8f071024b..d9154174379bd 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/230_change_target_index.yml @@ -87,11 +87,7 @@ teardown: index: foo id: "1" - match: { _source.a: true } -# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances -# (See issue https://github.com/elastic/elasticsearch/issues/83653). -# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior -#- match: { _source.accumulator: [ "non-repeated-value" ] } -- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] } +- match: { _source.accumulator: [ "non-repeated-value" ] } # only the foo index - do: @@ -150,11 +146,7 @@ teardown: index: foo id: "1" - match: { _source.a: true } -# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances -# (See issue https://github.com/elastic/elasticsearch/issues/83653). -# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior -#- match: { _source.accumulator: [ "non-repeated-value" ] } -- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] } +- match: { _source.accumulator: [ "non-repeated-value" ] } # only the foo index - do: From 8dbef758963e5dbd5958db4ff01affb93cbda9a1 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 21 Mar 2023 09:45:47 -0400 Subject: [PATCH 19/22] Make PipelineIterator an iterator over a triple --- .../elasticsearch/ingest/IngestService.java | 92 +++++++++++-------- 1 file changed, 54 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 4cfe8965c29c6..3d77793606bd0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -701,67 +702,76 @@ private PipelineIterator getPipelines(IndexRequest indexRequest) { return new PipelineIterator(pipelineId, finalPipelineId); } - private class PipelineIterator implements Iterator { + /** + * A triple for tracking the non-null id of a pipeline, the pipeline itself, and whether the pipeline is a final pipeline. + * + * @param id the non-null id of the pipeline + * @param pipeline a possibly-null reference to the pipeline for the given pipeline id + * @param isFinal true if the pipeline is a final pipeline + */ + private record PipelineSlot(String id, @Nullable Pipeline pipeline, boolean isFinal) { + public PipelineSlot { + Objects.requireNonNull(id); + } + } + + private class PipelineIterator implements Iterator { + private final String defaultPipeline; private final String finalPipeline; - private final Iterator pipelineIds; - private String current; + private final Iterator pipelineSlotIterator; private PipelineIterator(String defaultPipeline, String finalPipeline) { this.defaultPipeline = NOOP_PIPELINE_NAME.equals(defaultPipeline) ? null : defaultPipeline; this.finalPipeline = NOOP_PIPELINE_NAME.equals(finalPipeline) ? null : finalPipeline; - this.pipelineIds = iterator(); + this.pipelineSlotIterator = iterator(); } public PipelineIterator withoutDefaultPipeline() { return new PipelineIterator(null, finalPipeline); } - private Iterator iterator() { - if (defaultPipeline != null && finalPipeline != null) { - return List.of(defaultPipeline, finalPipeline).iterator(); + private Iterator iterator() { + PipelineSlot defaultPipelineSlot = null, finalPipelineSlot = null; + if (defaultPipeline != null) { + defaultPipelineSlot = new PipelineSlot(defaultPipeline, pipeline(defaultPipeline), false); } if (finalPipeline != null) { - return List.of(finalPipeline).iterator(); - } - if (defaultPipeline != null) { - return List.of(defaultPipeline).iterator(); + finalPipelineSlot = new PipelineSlot(finalPipeline, pipeline(finalPipeline), false); } - return Collections.emptyIterator(); - } - - @Override - public boolean hasNext() { - return pipelineIds.hasNext(); - } - @Override - public String next() { - current = pipelineIds.next(); - return current; + if (defaultPipeline != null && finalPipeline != null) { + return List.of(defaultPipelineSlot, finalPipelineSlot).iterator(); + } else if (finalPipeline != null) { + return List.of(finalPipelineSlot).iterator(); + } else if (defaultPipeline != null) { + return List.of(defaultPipelineSlot).iterator(); + } else { + return Collections.emptyIterator(); + } } - public Pipeline currentPipeline() { - if (current == null) { - throw new IllegalStateException("Invoked before next"); + private Pipeline pipeline(String id) { + if (id == null) { + return null; } - final PipelineHolder holder = pipelines.get(current); + + final PipelineHolder holder = pipelines.get(id); if (holder == null) { - throw new IllegalArgumentException("pipeline with id [" + current + "] does not exist"); + return null; } + return holder.pipeline; } - /** - * Whether the {@linkplain #currentPipeline() current pipeline} of this iterator represents the - * {@linkplain IndexRequest#getFinalPipeline() final pipeline of the index request}. - */ - public boolean isCurrentPipelineFinalPipeline() { - return hasFinalPipeline() && hasNext() == false; + @Override + public boolean hasNext() { + return pipelineSlotIterator.hasNext(); } - private boolean hasFinalPipeline() { - return finalPipeline != null; + @Override + public PipelineSlot next() { + return pipelineSlotIterator.next(); } } @@ -773,13 +783,19 @@ private void executePipelines( final Set indexRecursionDetection ) { assert pipelines.hasNext(); - final String pipelineId = pipelines.next(); + PipelineSlot slot = pipelines.next(); + final String pipelineId = slot.id(); + final Pipeline pipeline = slot.pipeline(); + final boolean isFinalPipeline = slot.isFinal(); // reset the reroute flag, at the start of a new pipeline execution this document hasn't been rerouted yet ingestDocument.resetReroute(); try { - final Pipeline pipeline = pipelines.currentPipeline(); + if (pipeline == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + final String originalIndex = indexRequest.indices()[0]; executePipeline(ingestDocument, pipeline, (keep, e) -> { assert keep != null; @@ -833,7 +849,7 @@ private void executePipelines( if (Objects.equals(originalIndex, newIndex) == false) { // final pipelines cannot change the target index (either directly or by way of a reroute) - if (pipelines.isCurrentPipelineFinalPipeline()) { + if (isFinalPipeline) { listener.onFailure( new IllegalStateException( format( From 53f0b24a7452e2d5389629ac1a6f07617c0e00c4 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 21 Mar 2023 09:51:07 -0400 Subject: [PATCH 20/22] Rename this method and add a docstring --- .../java/org/elasticsearch/ingest/IngestService.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3d77793606bd0..ac8a977598394 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -648,7 +648,7 @@ protected void doRun() { continue; } - PipelineIterator pipelines = getPipelines(indexRequest); + PipelineIterator pipelines = getAndResetPipelines(indexRequest); if (pipelines.hasNext() == false) { i++; continue; @@ -694,7 +694,11 @@ public void onFailure(Exception e) { }); } - private PipelineIterator getPipelines(IndexRequest indexRequest) { + /** + * Returns the pipelines of the request, and updates the request so that it no longer references + * any pipelines (both the default and final pipeline are set to the noop pipeline). + */ + private PipelineIterator getAndResetPipelines(IndexRequest indexRequest) { final String pipelineId = indexRequest.getPipeline(); indexRequest.setPipeline(NOOP_PIPELINE_NAME); final String finalPipelineId = indexRequest.getFinalPipeline(); @@ -885,7 +889,7 @@ private void executePipelines( indexRequest.setPipeline(null); indexRequest.isPipelineResolved(false); resolvePipelinesAndUpdateIndexRequest(null, indexRequest, state.metadata()); - newPipelines = getPipelines(indexRequest); + newPipelines = getAndResetPipelines(indexRequest); // for backwards compatibility, when a pipeline changes the target index for a document without using the reroute // mechanism, do not invoke the default pipeline of the new target index From 2db2a1990b7d6ba20a13934e5747faa7e38d8240 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 21 Mar 2023 10:42:04 -0400 Subject: [PATCH 21/22] The final pipeline slot should be isFinal, of course --- .../src/main/java/org/elasticsearch/ingest/IngestService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index ac8a977598394..d025c1826433b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -741,7 +741,7 @@ private Iterator iterator() { defaultPipelineSlot = new PipelineSlot(defaultPipeline, pipeline(defaultPipeline), false); } if (finalPipeline != null) { - finalPipelineSlot = new PipelineSlot(finalPipeline, pipeline(finalPipeline), false); + finalPipelineSlot = new PipelineSlot(finalPipeline, pipeline(finalPipeline), true); } if (defaultPipeline != null && finalPipeline != null) { From e86579e5eea7f3d7e059688a6a7c1673e7a9ccb7 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 21 Mar 2023 11:39:08 -0400 Subject: [PATCH 22/22] Actually, getPipeline is all we need here But make the null handling explicit rather than implicit --- .../elasticsearch/ingest/IngestService.java | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index d025c1826433b..534cafaeaa27d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -443,6 +443,10 @@ public static boolean isNoOpPipelineUpdate(ClusterState state, PutPipelineReques * Returns the pipeline by the specified id */ public Pipeline getPipeline(String id) { + if (id == null) { + return null; + } + PipelineHolder holder = pipelines.get(id); if (holder != null) { return holder.pipeline; @@ -738,10 +742,10 @@ public PipelineIterator withoutDefaultPipeline() { private Iterator iterator() { PipelineSlot defaultPipelineSlot = null, finalPipelineSlot = null; if (defaultPipeline != null) { - defaultPipelineSlot = new PipelineSlot(defaultPipeline, pipeline(defaultPipeline), false); + defaultPipelineSlot = new PipelineSlot(defaultPipeline, getPipeline(defaultPipeline), false); } if (finalPipeline != null) { - finalPipelineSlot = new PipelineSlot(finalPipeline, pipeline(finalPipeline), true); + finalPipelineSlot = new PipelineSlot(finalPipeline, getPipeline(finalPipeline), true); } if (defaultPipeline != null && finalPipeline != null) { @@ -755,19 +759,6 @@ private Iterator iterator() { } } - private Pipeline pipeline(String id) { - if (id == null) { - return null; - } - - final PipelineHolder holder = pipelines.get(id); - if (holder == null) { - return null; - } - - return holder.pipeline; - } - @Override public boolean hasNext() { return pipelineSlotIterator.hasNext();