Skip to content

Commit 539d6ad

Browse files
committed
Invoke default pipeline of new index
1 parent 7c87b07 commit 539d6ad

File tree

4 files changed

+138
-39
lines changed

4 files changed

+138
-39
lines changed

docs/changelog/85931.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 85931
2+
summary: Invoke default pipeline of new index
3+
area: Ingest Node
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.env.Environment;
3030
import org.elasticsearch.env.NodeEnvironment;
3131
import org.elasticsearch.ingest.AbstractProcessor;
32+
import org.elasticsearch.ingest.ConfigurationUtils;
3233
import org.elasticsearch.ingest.IngestDocument;
3334
import org.elasticsearch.ingest.PipelineConfiguration;
3435
import org.elasticsearch.ingest.Processor;
@@ -49,6 +50,7 @@
4950
import java.util.Collection;
5051
import java.util.List;
5152
import java.util.Map;
53+
import java.util.Objects;
5254
import java.util.function.BiConsumer;
5355
import java.util.function.Supplier;
5456

@@ -156,7 +158,7 @@ public void testFinalPipelineOfNewDestinationIsInvoked() {
156158
assertEquals(true, target.getHits().getAt(0).getSourceAsMap().get("final"));
157159
}
158160

159-
public void testDefaultPipelineOfNewDestinationIsNotInvoked() {
161+
public void testDefaultPipelineOfNewDestinationIsInvoked() {
160162
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
161163
createIndex("index", settings);
162164

@@ -185,7 +187,39 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() {
185187
assertEquals(RestStatus.CREATED, indexResponse.status());
186188
SearchResponse target = client().prepareSearch("target").get();
187189
assertEquals(1, target.getHits().getTotalHits().value);
188-
assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
190+
assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
191+
}
192+
193+
public void testAvoidIndexingLoop() {
194+
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
195+
createIndex("index", settings);
196+
197+
settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
198+
createIndex("target", settings);
199+
200+
BytesReference defaultPipelineBody = new BytesArray("""
201+
{"processors": [{"changing_dest": {"dest": "target"}}]}""");
202+
client().admin()
203+
.cluster()
204+
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
205+
.actionGet();
206+
207+
BytesReference targetPipeline = new BytesArray("""
208+
{"processors": [{"changing_dest": {"dest": "index"}}]}""");
209+
client().admin()
210+
.cluster()
211+
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
212+
.actionGet();
213+
214+
IllegalStateException exception = expectThrows(
215+
IllegalStateException.class,
216+
() -> client().prepareIndex("index")
217+
.setId("1")
218+
.setSource(Map.of("dest", "index"))
219+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
220+
.get()
221+
);
222+
assertThat(exception.getMessage(), containsString("index cycle detected while processing pipelines: [index, target, index]"));
189223
}
190224

191225
public void testFinalPipeline() {
@@ -382,18 +416,24 @@ public String getType() {
382416
}
383417
},
384418
"changing_dest",
385-
(processorFactories, tag, description, config) -> new AbstractProcessor(tag, description) {
386-
@Override
387-
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
388-
ingestDocument.setFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), "target");
389-
return ingestDocument;
390-
}
419+
(processorFactories, tag, description, config) -> {
420+
final String dest = Objects.requireNonNullElse(
421+
ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"),
422+
"target"
423+
);
424+
return new AbstractProcessor(tag, description) {
425+
@Override
426+
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
427+
ingestDocument.setFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), dest);
428+
return ingestDocument;
429+
}
391430

392-
@Override
393-
public String getType() {
394-
return "changing_dest";
395-
}
431+
@Override
432+
public String getType() {
433+
return "changing_dest";
434+
}
396435

436+
};
397437
}
398438
);
399439
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 79 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.util.HashMap;
6969
import java.util.HashSet;
7070
import java.util.Iterator;
71+
import java.util.LinkedHashSet;
7172
import java.util.LinkedList;
7273
import java.util.List;
7374
import java.util.Locale;
@@ -714,21 +715,8 @@ protected void doRun() {
714715
continue;
715716
}
716717

717-
final String pipelineId = indexRequest.getPipeline();
718-
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
719-
final String finalPipelineId = indexRequest.getFinalPipeline();
720-
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
721-
boolean hasFinalPipeline = true;
722-
final List<String> pipelines;
723-
if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false
724-
&& IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
725-
pipelines = List.of(pipelineId, finalPipelineId);
726-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
727-
pipelines = List.of(pipelineId);
728-
hasFinalPipeline = false;
729-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
730-
pipelines = List.of(finalPipelineId);
731-
} else {
718+
Pipelines pipelines = getPipelines(indexRequest);
719+
if (pipelines.isEmpty()) {
732720
i++;
733721
continue;
734722
}
@@ -763,21 +751,80 @@ public void onFailure(Exception e) {
763751
});
764752

765753
IngestDocument ingestDocument = newIngestDocument(indexRequest);
766-
executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener);
767-
754+
LinkedHashSet<String> indexRecursionDetection = new LinkedHashSet<>();
755+
indexRecursionDetection.add(indexRequest.index());
756+
executePipelines(
757+
pipelines.iterator(),
758+
pipelines.hasFinalPipeline(),
759+
indexRequest,
760+
ingestDocument,
761+
documentListener,
762+
indexRecursionDetection
763+
);
768764
i++;
769765
}
770766
}
771767
}
772768
});
773769
}
774770

771+
private Pipelines getPipelines(IndexRequest indexRequest) {
772+
indexRequest.isPipelineResolved(false);
773+
resolvePipelines(null, indexRequest, state.metadata());
774+
final String pipelineId = indexRequest.getPipeline();
775+
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
776+
final String finalPipelineId = indexRequest.getFinalPipeline();
777+
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
778+
return new Pipelines(pipelineId, finalPipelineId);
779+
}
780+
781+
private static class Pipelines implements Iterable<String> {
782+
private String defaultPipeline;
783+
private String finalPipeline;
784+
785+
private Pipelines(String defaultPipeline, String finalPipeline) {
786+
if (NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
787+
this.defaultPipeline = defaultPipeline;
788+
}
789+
if (NOOP_PIPELINE_NAME.equals(finalPipeline) == false) {
790+
this.finalPipeline = finalPipeline;
791+
}
792+
}
793+
794+
public boolean hasFinalPipeline() {
795+
return finalPipeline != null;
796+
}
797+
798+
public boolean isEmpty() {
799+
return defaultPipeline == null && finalPipeline == null;
800+
}
801+
802+
public void withoutDefaultPipeline() {
803+
defaultPipeline = null;
804+
}
805+
806+
@Override
807+
public Iterator<String> iterator() {
808+
if (defaultPipeline != null && finalPipeline != null) {
809+
return List.of(defaultPipeline, finalPipeline).iterator();
810+
}
811+
if (finalPipeline != null) {
812+
return List.of(finalPipeline).iterator();
813+
}
814+
if (defaultPipeline != null) {
815+
return List.of(defaultPipeline).iterator();
816+
}
817+
return Collections.emptyIterator();
818+
}
819+
}
820+
775821
private void executePipelines(
776822
final Iterator<String> pipelineIds,
777823
final boolean hasFinalPipeline,
778824
final IndexRequest indexRequest,
779825
final IngestDocument ingestDocument,
780-
final ActionListener<Boolean> listener
826+
final ActionListener<Boolean> listener,
827+
final Set<String> indexRecursionDetection
781828
) {
782829
assert pipelineIds.hasNext();
783830
final String pipelineId = pipelineIds.next();
@@ -840,6 +887,14 @@ private void executePipelines(
840887
final String newIndex = indexRequest.indices()[0];
841888

842889
if (Objects.equals(originalIndex, newIndex) == false) {
890+
if (indexRecursionDetection.add(newIndex) == false) {
891+
List<String> indexRoute = new ArrayList<>(indexRecursionDetection);
892+
indexRoute.add(newIndex);
893+
listener.onFailure(
894+
new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute))
895+
);
896+
return; // document failed!
897+
}
843898
if (hasFinalPipeline && pipelineIds.hasNext() == false) {
844899
listener.onFailure(
845900
new IllegalStateException(
@@ -854,19 +909,16 @@ private void executePipelines(
854909
);
855910
return; // document failed!
856911
} else {
857-
indexRequest.isPipelineResolved(false);
858-
resolvePipelines(null, indexRequest, state.metadata());
859-
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) {
860-
newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator();
861-
newHasFinalPipeline = true;
862-
} else {
863-
newPipelineIds = Collections.emptyIterator();
864-
}
912+
// reset request pipeline that is set to _none that would override the default pipeline
913+
indexRequest.setPipeline(null);
914+
Pipelines pipelines = getPipelines(indexRequest);
915+
newHasFinalPipeline = pipelines.hasFinalPipeline();
916+
newPipelineIds = pipelines.iterator();
865917
}
866918
}
867919

868920
if (newPipelineIds.hasNext()) {
869-
executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener);
921+
executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener, indexRecursionDetection);
870922
} else {
871923
// update the index request's source and (potentially) cache the timestamp for TSDB
872924
updateIndexRequestSource(indexRequest, ingestDocument);

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ public void testExecuteIndexPipelineDoesNotExist() {
168168
List.of(DUMMY_PLUGIN),
169169
client
170170
);
171+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
172+
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));
171173
final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
172174
.source(Map.of())
173175
.setPipeline("_id")

0 commit comments

Comments
 (0)