Skip to content

Commit c68da18

Browse files
hpoettkerfmbenhassine
authored andcommitted
Improve step execution polling and retrieval
Resolves #3790
1 parent 708f1c8 commit c68da18

File tree

5 files changed

+71
-33
lines changed

5 files changed

+71
-33
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/jsr/launch/JsrJobOperator.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2019 the original author or authors.
2+
* Copyright 2013-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import java.util.Set;
2727
import java.util.concurrent.ConcurrentHashMap;
2828
import java.util.concurrent.Semaphore;
29+
import java.util.stream.Collectors;
2930
import javax.batch.operations.BatchRuntimeException;
3031
import javax.batch.operations.JobExecutionAlreadyCompleteException;
3132
import javax.batch.operations.JobExecutionIsRunningException;
@@ -47,6 +48,7 @@
4748
import org.apache.commons.logging.LogFactory;
4849

4950
import org.springframework.batch.core.BatchStatus;
51+
import org.springframework.batch.core.Entity;
5052
import org.springframework.batch.core.ExitStatus;
5153
import org.springframework.batch.core.Job;
5254
import org.springframework.batch.core.JobParameters;
@@ -412,9 +414,11 @@ public List<StepExecution> getStepExecutions(long executionId)
412414
List<StepExecution> batchExecutions = new ArrayList<>();
413415

414416
if(executions != null) {
415-
for (org.springframework.batch.core.StepExecution stepExecution : executions) {
416-
if(!stepExecution.getStepName().contains(":partition")) {
417-
batchExecutions.add(new JsrStepExecution(jobExplorer.getStepExecution(executionId, stepExecution.getId())));
417+
Set<Long> stepExecutionIds = executions.stream().map(Entity::getId).collect(Collectors.toSet());
418+
org.springframework.batch.core.JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
419+
for (org.springframework.batch.core.StepExecution stepExecution : jobExecution.getStepExecutions()) {
420+
if(!stepExecution.getStepName().contains(":partition") && stepExecutionIds.contains(stepExecution.getId())) {
421+
batchExecutions.add(new JsrStepExecution(stepExecution));
418422
}
419423
}
420424
}

spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/RemoteStepExecutionAggregator.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2006-2013 the original author or authors.
2+
* Copyright 2006-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,12 @@
1616

1717
package org.springframework.batch.core.partition.support;
1818

19-
import java.util.ArrayList;
2019
import java.util.Collection;
20+
import java.util.List;
21+
import java.util.Set;
22+
import java.util.stream.Collectors;
2123

24+
import org.springframework.batch.core.JobExecution;
2225
import org.springframework.batch.core.StepExecution;
2326
import org.springframework.batch.core.explore.JobExplorer;
2427
import org.springframework.beans.factory.InitializingBean;
@@ -90,14 +93,16 @@ public void aggregate(StepExecution result, Collection<StepExecution> executions
9093
if (executions == null) {
9194
return;
9295
}
93-
Collection<StepExecution> updates = new ArrayList<>();
94-
for (StepExecution stepExecution : executions) {
96+
Set<Long> stepExecutionIds = executions.stream().map(stepExecution -> {
9597
Long id = stepExecution.getId();
9698
Assert.state(id != null, "StepExecution has null id. It must be saved first: " + stepExecution);
97-
StepExecution update = jobExplorer.getStepExecution(stepExecution.getJobExecutionId(), id);
98-
Assert.state(update != null, "Could not reload StepExecution from JobRepository: " + stepExecution);
99-
updates.add(update);
100-
}
99+
return id;
100+
}).collect(Collectors.toSet());
101+
JobExecution jobExecution = jobExplorer.getJobExecution(result.getJobExecutionId());
102+
Assert.state(jobExecution != null,
103+
"Could not load JobExecution from JobRepository for id " + result.getJobExecutionId());
104+
List<StepExecution> updates = jobExecution.getStepExecutions().stream()
105+
.filter(stepExecution -> stepExecutionIds.contains(stepExecution.getId())).collect(Collectors.toList());
101106
delegate.aggregate(result, updates);
102107
}
103108

spring-batch-core/src/test/java/org/springframework/batch/core/jsr/launch/JsrJobOperatorTests.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2018 the original author or authors.
2+
* Copyright 2013-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -403,8 +403,6 @@ public void testGetStepExecutionsRoseyScenario() {
403403
jobExecution.addStepExecutions(stepExecutions);
404404

405405
when(jobExplorer.getJobExecution(5L)).thenReturn(jobExecution);
406-
when(jobExplorer.getStepExecution(5L, 1L)).thenReturn(new StepExecution("step1", jobExecution, 1L));
407-
when(jobExplorer.getStepExecution(5L, 2L)).thenReturn(new StepExecution("step2", jobExecution, 2L));
408406

409407
List<javax.batch.runtime.StepExecution> results = jsrJobOperator.getStepExecutions(5L);
410408

@@ -429,8 +427,6 @@ public void testGetStepExecutionsPartitionedStepScenario() {
429427
jobExecution.addStepExecutions(stepExecutions);
430428

431429
when(jobExplorer.getJobExecution(5L)).thenReturn(jobExecution);
432-
when(jobExplorer.getStepExecution(5L, 1L)).thenReturn(new StepExecution("step1", jobExecution, 1L));
433-
when(jobExplorer.getStepExecution(5L, 2L)).thenReturn(new StepExecution("step2", jobExecution, 2L));
434430

435431
List<javax.batch.runtime.StepExecution> results = jsrJobOperator.getStepExecutions(5L);
436432

spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java

+23-14
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,35 @@
1+
/*
2+
* Copyright 2009-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package org.springframework.batch.integration.partition;
217

318
import java.util.ArrayList;
419
import java.util.Collection;
5-
import java.util.Iterator;
620
import java.util.List;
721
import java.util.Set;
822
import java.util.concurrent.Callable;
923
import java.util.concurrent.Future;
1024
import java.util.concurrent.TimeUnit;
25+
import java.util.stream.Collectors;
1126

1227
import javax.sql.DataSource;
1328

1429
import org.apache.commons.logging.Log;
1530
import org.apache.commons.logging.LogFactory;
1631

32+
import org.springframework.batch.core.JobExecution;
1733
import org.springframework.batch.core.Step;
1834
import org.springframework.batch.core.StepExecution;
1935
import org.springframework.batch.core.explore.JobExplorer;
@@ -242,19 +258,12 @@ private Collection<StepExecution> pollReplies(final StepExecution masterStepExec
242258
Callable<Collection<StepExecution>> callback = new Callable<Collection<StepExecution>>() {
243259
@Override
244260
public Collection<StepExecution> call() throws Exception {
245-
246-
for(Iterator<StepExecution> stepExecutionIterator = split.iterator(); stepExecutionIterator.hasNext(); ) {
247-
StepExecution curStepExecution = stepExecutionIterator.next();
248-
249-
if(!result.contains(curStepExecution)) {
250-
StepExecution partitionStepExecution =
251-
jobExplorer.getStepExecution(masterStepExecution.getJobExecutionId(), curStepExecution.getId());
252-
253-
if(!partitionStepExecution.getStatus().isRunning()) {
254-
result.add(partitionStepExecution);
255-
}
256-
}
257-
}
261+
Set<Long> currentStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet());
262+
JobExecution jobExecution = jobExplorer.getJobExecution(masterStepExecution.getJobExecutionId());
263+
jobExecution.getStepExecutions().stream()
264+
.filter(stepExecution -> currentStepExecutionIds.contains(stepExecution.getId()))
265+
.filter(stepExecution -> !result.contains(stepExecution))
266+
.filter(stepExecution -> !stepExecution.getStatus().isRunning()).forEach(result::add);
258267

259268
if(logger.isDebugEnabled()) {
260269
logger.debug(String.format("Currently waiting on %s partitions to finish", split.size()));

spring-batch-integration/src/test/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandlerTests.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,22 @@
1+
/*
2+
* Copyright 2020-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package org.springframework.batch.integration.partition;
218

19+
import java.util.Arrays;
320
import java.util.Collection;
421
import java.util.Collections;
522
import java.util.HashSet;
@@ -154,7 +171,12 @@ public void testHandleWithJobRepositoryPolling() throws Exception {
154171
stepExecutions.add(partition2);
155172
stepExecutions.add(partition3);
156173
when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions);
157-
when(jobExplorer.getStepExecution(eq(5L), any(Long.class))).thenReturn(partition2, partition1, partition3, partition3, partition3, partition3, partition4);
174+
JobExecution runningJobExecution = new JobExecution(5L, new JobParameters());
175+
runningJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition3));
176+
JobExecution completedJobExecution = new JobExecution(5L, new JobParameters());
177+
completedJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition4));
178+
when(jobExplorer.getJobExecution(5L)).thenReturn(runningJobExecution, runningJobExecution, runningJobExecution,
179+
completedJobExecution);
158180

159181
//set
160182
messageChannelPartitionHandler.setMessagingOperations(operations);
@@ -198,7 +220,9 @@ public void testHandleWithJobRepositoryPollingTimeout() throws Exception {
198220
stepExecutions.add(partition2);
199221
stepExecutions.add(partition3);
200222
when(stepExecutionSplitter.split(any(StepExecution.class), eq(1))).thenReturn(stepExecutions);
201-
when(jobExplorer.getStepExecution(eq(5L), any(Long.class))).thenReturn(partition2, partition1, partition3);
223+
JobExecution runningJobExecution = new JobExecution(5L, new JobParameters());
224+
runningJobExecution.addStepExecutions(Arrays.asList(partition2, partition1, partition3));
225+
when(jobExplorer.getJobExecution(5L)).thenReturn(runningJobExecution);
202226

203227
//set
204228
messageChannelPartitionHandler.setMessagingOperations(operations);

0 commit comments

Comments
 (0)