diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 10d276dfefee8..83d982e1f4176 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -198,7 +198,7 @@ private static Map processorFactories(List * Also, this method marks the request as `isPipelinesResolved = true`: Due to the request could be rerouted from a coordinating node * to an ingest node, we have to be able to avoid double resolving the pipelines and also able to distinguish that either the pipeline * comes as part of the request or resolved from this method. All this is made to later be able to reject the request in case the @@ -476,10 +476,9 @@ Map pipelines() { * 'on_failure', so we report metrics for the set processor, not an on_failure processor. * * @param compoundProcessor The compound processor to start walking the non-failure processors - * @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples. - * @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor + * @param processorMetrics The list to populate with {@link Processor} {@link IngestMetric} tuples. */ - private static List> getProcessorMetrics( + private static void collectProcessorMetrics( CompoundProcessor compoundProcessor, List> processorMetrics ) { @@ -505,12 +504,11 @@ private static List> getProcessorMetrics( } while (unwrapped); if (processor instanceof CompoundProcessor cp) { - getProcessorMetrics(cp, processorMetrics); + collectProcessorMetrics(cp, processorMetrics); } else { processorMetrics.add(new Tuple<>(processor, metric)); } } - return processorMetrics; } /** @@ -840,7 +838,7 @@ public IngestStats stats() { CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); List> processorMetrics = new ArrayList<>(); - getProcessorMetrics(rootProcessor, processorMetrics); + collectProcessorMetrics(rootProcessor, processorMetrics); processorMetrics.forEach(t -> { Processor processor = t.v1(); IngestMetric processorMetric = t.v2(); @@ -1008,8 +1006,8 @@ synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) { newPipeline.getMetrics().add(oldPipeline.getMetrics()); List> oldPerProcessMetrics = new ArrayList<>(); List> newPerProcessMetrics = new ArrayList<>(); - getProcessorMetrics(oldPipeline.getCompoundProcessor(), oldPerProcessMetrics); - getProcessorMetrics(newPipeline.getCompoundProcessor(), newPerProcessMetrics); + collectProcessorMetrics(oldPipeline.getCompoundProcessor(), oldPerProcessMetrics); + collectProcessorMetrics(newPipeline.getCompoundProcessor(), newPerProcessMetrics); // Best attempt to populate new processor metrics using a parallel array of the old metrics. This is not ideal since // the per processor metrics may get reset when the arrays don't match. However, to get to an ideal model, unique and // consistent id's per processor and/or semantic equals for each processor will be needed. @@ -1144,14 +1142,11 @@ public String getType() { return new Pipeline(id, description, null, null, new CompoundProcessor(failureProcessor)); } - static class PipelineHolder { - - final PipelineConfiguration configuration; - final Pipeline pipeline; + record PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) { - PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) { - this.configuration = Objects.requireNonNull(configuration); - this.pipeline = Objects.requireNonNull(pipeline); + public PipelineHolder { + Objects.requireNonNull(configuration); + Objects.requireNonNull(pipeline); } } @@ -1236,7 +1231,7 @@ private static Optional resolvePipelinesFromIndexTemplates(IndexReque /** * Checks whether an IndexRequest has at least one pipeline defined. - * + *

* This method assumes that the pipelines are beforehand resolved. */ public static boolean hasPipeline(IndexRequest indexRequest) { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 8301f412dc569..6334abaa35e36 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -209,10 +209,12 @@ public void testUpdatePipelines() { .build(); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); assertThat(ingestService.pipelines().size(), is(1)); - assertThat(ingestService.pipelines().get("_id").pipeline.getId(), equalTo("_id")); - assertThat(ingestService.pipelines().get("_id").pipeline.getDescription(), nullValue()); - assertThat(ingestService.pipelines().get("_id").pipeline.getProcessors().size(), equalTo(1)); - assertThat(ingestService.pipelines().get("_id").pipeline.getProcessors().get(0).getType(), equalTo("set")); + + Pipeline p = ingestService.getPipeline("_id"); + assertThat(p.getId(), equalTo("_id")); + assertThat(p.getDescription(), nullValue()); + assertThat(p.getProcessors().size(), equalTo(1)); + assertThat(p.getProcessors().get(0).getType(), equalTo("set")); } public void testInnerUpdatePipelines() { @@ -224,39 +226,55 @@ public void testInnerUpdatePipelines() { ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(1)); - assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); - assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); + { + Pipeline p1 = ingestService.getPipeline("_id1"); + assertThat(p1.getId(), equalTo("_id1")); + assertThat(p1.getProcessors().size(), equalTo(0)); + } PipelineConfiguration pipeline2 = new PipelineConfiguration("_id2", new BytesArray("{\"processors\": []}"), XContentType.JSON); ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id2", pipeline2)); ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(2)); - assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); - assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); - assertThat(ingestService.pipelines().get("_id2").pipeline.getId(), equalTo("_id2")); - assertThat(ingestService.pipelines().get("_id2").pipeline.getProcessors().size(), equalTo(0)); + { + Pipeline p1 = ingestService.getPipeline("_id1"); + assertThat(p1.getId(), equalTo("_id1")); + assertThat(p1.getProcessors().size(), equalTo(0)); + Pipeline p2 = ingestService.getPipeline("_id2"); + assertThat(p2.getId(), equalTo("_id2")); + assertThat(p2.getProcessors().size(), equalTo(0)); + } PipelineConfiguration pipeline3 = new PipelineConfiguration("_id3", new BytesArray("{\"processors\": []}"), XContentType.JSON); ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id2", pipeline2, "_id3", pipeline3)); ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(3)); - assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); - assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); - assertThat(ingestService.pipelines().get("_id2").pipeline.getId(), equalTo("_id2")); - assertThat(ingestService.pipelines().get("_id2").pipeline.getProcessors().size(), equalTo(0)); - assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3")); - assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(0)); + { + Pipeline p1 = ingestService.getPipeline("_id1"); + assertThat(p1.getId(), equalTo("_id1")); + assertThat(p1.getProcessors().size(), equalTo(0)); + Pipeline p2 = ingestService.getPipeline("_id2"); + assertThat(p2.getId(), equalTo("_id2")); + assertThat(p2.getProcessors().size(), equalTo(0)); + Pipeline p3 = ingestService.getPipeline("_id3"); + assertThat(p3.getId(), equalTo("_id3")); + assertThat(p3.getProcessors().size(), equalTo(0)); + } ingestMetadata = new IngestMetadata(Map.of("_id1", pipeline1, "_id3", pipeline3)); ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(2)); - assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); - assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); - assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3")); - assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(0)); + { + Pipeline p1 = ingestService.getPipeline("_id1"); + assertThat(p1.getId(), equalTo("_id1")); + assertThat(p1.getProcessors().size(), equalTo(0)); + Pipeline p3 = ingestService.getPipeline("_id3"); + assertThat(p3.getId(), equalTo("_id3")); + assertThat(p3.getProcessors().size(), equalTo(0)); + } pipeline3 = new PipelineConfiguration("_id3", new BytesArray(""" {"processors": [{"set" : {"field": "_field", "value": "_value"}}]}"""), XContentType.JSON); @@ -264,11 +282,15 @@ public void testInnerUpdatePipelines() { ingestService.innerUpdatePipelines(ingestMetadata); assertThat(ingestService.pipelines().size(), is(2)); - assertThat(ingestService.pipelines().get("_id1").pipeline.getId(), equalTo("_id1")); - assertThat(ingestService.pipelines().get("_id1").pipeline.getProcessors().size(), equalTo(0)); - assertThat(ingestService.pipelines().get("_id3").pipeline.getId(), equalTo("_id3")); - assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().size(), equalTo(1)); - assertThat(ingestService.pipelines().get("_id3").pipeline.getProcessors().get(0).getType(), equalTo("set")); + { + Pipeline p1 = ingestService.getPipeline("_id1"); + assertThat(p1.getId(), equalTo("_id1")); + assertThat(p1.getProcessors().size(), equalTo(0)); + Pipeline p3 = ingestService.getPipeline("_id3"); + assertThat(p3.getId(), equalTo("_id3")); + assertThat(p3.getProcessors().size(), equalTo(1)); + assertThat(p3.getProcessors().get(0).getType(), equalTo("set")); + } // Perform an update with no changes: Map pipelines = ingestService.pipelines();