Skip to content

Commit 22f98ef

Browse files
spring-projectsGH-3790: Poll the count of running step executions to fix OOM
1 parent fc1f3fc commit 22f98ef

File tree

11 files changed

+177
-19
lines changed

11 files changed

+177
-19
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/BatchStatus.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.batch.core;
1818

19+
import java.util.Set;
20+
1921
/**
2022
* Enumeration representing the status of an execution.
2123
*
@@ -71,6 +73,8 @@ public enum BatchStatus {
7173
*/
7274
UNKNOWN;
7375

76+
public static final Set<BatchStatus> RUNNING_STATUSES = Set.of(STARTING, STARTED, STOPPING);
77+
7478
/**
7579
* Convenience method to return the higher value status of the statuses passed to the
7680
* method.
@@ -87,7 +91,7 @@ public static BatchStatus max(BatchStatus status1, BatchStatus status2) {
8791
* @return true if the status is STARTING, STARTED, STOPPING
8892
*/
8993
public boolean isRunning() {
90-
return this == STARTING || this == STARTED || this == STOPPING;
94+
return RUNNING_STATUSES.contains(this);
9195
}
9296

9397
/**

spring-batch-core/src/main/java/org/springframework/batch/core/explore/JobExplorer.java

+18
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.List;
1919
import java.util.Set;
2020

21+
import org.springframework.batch.core.BatchStatus;
2122
import org.springframework.batch.core.JobExecution;
2223
import org.springframework.batch.core.JobInstance;
2324
import org.springframework.batch.core.JobParameters;
@@ -87,6 +88,14 @@ default JobInstance getLastJobInstance(String jobName) {
8788
@Nullable
8889
StepExecution getStepExecution(@Nullable Long jobExecutionId, @Nullable Long stepExecutionId);
8990

91+
/**
92+
* Find {@link StepExecution}s by IDs and parent {@link JobExecution} ID
93+
* @param jobExecutionId given job execution id
94+
* @param stepExecutionIds given step execution ids
95+
* @return collection of {@link StepExecution}
96+
*/
97+
Set<StepExecution> getStepExecutions(Long jobExecutionId, Set<Long> stepExecutionIds);
98+
9099
/**
91100
* @param instanceId {@link Long} The ID for the {@link JobInstance} to obtain.
92101
* @return the {@code JobInstance} that has this ID, or {@code null} if not found.
@@ -170,4 +179,13 @@ default JobExecution getLastJobExecution(JobInstance jobInstance) {
170179
*/
171180
long getJobInstanceCount(@Nullable String jobName) throws NoSuchJobException;
172181

182+
/**
183+
* Retrieve number of step executions that match the step execution ids and the batch
184+
* statuses
185+
* @param stepExecutionIds given step execution ids
186+
* @param matchingBatchStatuses given batch statuses to match against
187+
* @return number of {@link StepExecution} matching the criteria
188+
*/
189+
long getStepExecutionCount(Set<Long> stepExecutionIds, Set<BatchStatus> matchingBatchStatuses);
190+
173191
}

spring-batch-core/src/main/java/org/springframework/batch/core/explore/support/SimpleJobExplorer.java

+22
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.batch.core.explore.support;
1818

19+
import org.springframework.batch.core.BatchStatus;
1920
import org.springframework.batch.core.JobExecution;
2021
import org.springframework.batch.core.JobInstance;
2122
import org.springframework.batch.core.JobParameters;
@@ -147,6 +148,19 @@ public StepExecution getStepExecution(@Nullable Long jobExecutionId, @Nullable L
147148
return stepExecution;
148149
}
149150

151+
@Nullable
152+
@Override
153+
public Set<StepExecution> getStepExecutions(Long jobExecutionId, Set<Long> stepExecutionIds) {
154+
JobExecution jobExecution = jobExecutionDao.getJobExecution(jobExecutionId);
155+
if (jobExecution == null) {
156+
return null;
157+
}
158+
getJobExecutionDependencies(jobExecution);
159+
Set<StepExecution> stepExecutions = stepExecutionDao.getStepExecutions(jobExecution, stepExecutionIds);
160+
stepExecutions.forEach(this::getStepExecutionDependencies);
161+
return stepExecutions;
162+
}
163+
150164
@Nullable
151165
@Override
152166
public JobInstance getJobInstance(@Nullable Long instanceId) {
@@ -180,6 +194,14 @@ public long getJobInstanceCount(@Nullable String jobName) throws NoSuchJobExcept
180194
return jobInstanceDao.getJobInstanceCount(jobName);
181195
}
182196

197+
@Override
198+
public long getStepExecutionCount(Set<Long> stepExecutionIds, Set<BatchStatus> matchingBatchStatuses) {
199+
if (stepExecutionIds.isEmpty() || matchingBatchStatuses.isEmpty()) {
200+
return 0;
201+
}
202+
return stepExecutionDao.countStepExecutions(stepExecutionIds, matchingBatchStatuses);
203+
}
204+
183205
/**
184206
* @return instance of {@link JobInstanceDao}.
185207
* @since 5.1

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/AbstractJdbcBatchMetadataDao.java

+23
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package org.springframework.batch.core.repository.dao;
1818

1919
import java.sql.Types;
20+
import java.util.Collection;
21+
import java.util.Map;
22+
import java.util.stream.Collectors;
2023

2124
import org.springframework.beans.factory.InitializingBean;
2225
import org.springframework.jdbc.core.JdbcOperations;
@@ -51,6 +54,14 @@ protected String getQuery(String base) {
5154
return StringUtils.replace(base, "%PREFIX%", tablePrefix);
5255
}
5356

57+
protected String getQuery(String base, Map<String, Collection<?>> collectionParams) {
58+
String query = getQuery(base);
59+
for (Map.Entry<String, Collection<?>> collectionParam : collectionParams.entrySet()) {
60+
query = createParameterizedQuery(query, collectionParam.getKey(), collectionParam.getValue());
61+
}
62+
return query;
63+
}
64+
5465
protected String getTablePrefix() {
5566
return tablePrefix;
5667
}
@@ -80,6 +91,18 @@ public void setClobTypeToUse(int clobTypeToUse) {
8091
this.clobTypeToUse = clobTypeToUse;
8192
}
8293

94+
/**
95+
* Replaces a given placeholder with a number of parameters (i.e. "?").
96+
* @param sqlTemplate given sql template
97+
* @param placeholder placeholder that is being used for parameters
98+
* @param parameters collection of parameters with variable size
99+
* @return sql query replaced with a number of parameters
100+
*/
101+
private static String createParameterizedQuery(String sqlTemplate, String placeholder, Collection<?> parameters) {
102+
String params = parameters.stream().map(p -> "?").collect(Collectors.joining(", "));
103+
return sqlTemplate.replace(placeholder, params);
104+
}
105+
83106
@Override
84107
public void afterPropertiesSet() throws Exception {
85108
Assert.state(jdbcTemplate != null, "JdbcOperations is required");

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java

+33
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727
import java.util.Comparator;
2828
import java.util.Iterator;
2929
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Set;
3032
import java.util.concurrent.locks.Lock;
3133
import java.util.concurrent.locks.ReentrantLock;
34+
import java.util.stream.Stream;
3235

3336
import org.apache.commons.logging.Log;
3437
import org.apache.commons.logging.LogFactory;
@@ -93,6 +96,16 @@ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implement
9396

9497
private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " AND STEP_EXECUTION_ID = ?";
9598

99+
private static final String GET_STEP_EXECUTIONS_BY_IDS = GET_RAW_STEP_EXECUTIONS
100+
+ " and STEP_EXECUTION_ID IN (%STEP_EXECUTION_IDS%)";
101+
102+
private static final String COUNT_STEP_EXECUTIONS_BY_IDS_AND_STATUSES = """
103+
SELECT COUNT(*)
104+
FROM %PREFIX%STEP_EXECUTION SE
105+
WHERE SE.STEP_EXECUTION_ID IN (%STEP_EXECUTION_IDS%)
106+
AND SE.STATUS IN (%STEP_STATUSES%)
107+
""";
108+
96109
private static final String GET_LAST_STEP_EXECUTION = """
97110
SELECT SE.STEP_EXECUTION_ID, SE.STEP_NAME, SE.START_TIME, SE.END_TIME, SE.STATUS, SE.COMMIT_COUNT, SE.READ_COUNT, SE.FILTER_COUNT, SE.WRITE_COUNT, SE.EXIT_CODE, SE.EXIT_MESSAGE, SE.READ_SKIP_COUNT, SE.WRITE_SKIP_COUNT, SE.PROCESS_SKIP_COUNT, SE.ROLLBACK_COUNT, SE.LAST_UPDATED, SE.VERSION, SE.CREATE_TIME, JE.JOB_EXECUTION_ID, JE.START_TIME, JE.END_TIME, JE.STATUS, JE.EXIT_CODE, JE.EXIT_MESSAGE, JE.CREATE_TIME, JE.LAST_UPDATED, JE.VERSION
98111
FROM %PREFIX%JOB_EXECUTION JE
@@ -337,6 +350,16 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
337350
}
338351
}
339352

353+
@Override
354+
@Nullable
355+
public Set<StepExecution> getStepExecutions(JobExecution jobExecution, Set<Long> stepExecutionIds) {
356+
List<StepExecution> executions = getJdbcTemplate().query(
357+
getQuery(GET_STEP_EXECUTIONS_BY_IDS, Map.of("%STEP_EXECUTION_IDS%", stepExecutionIds)),
358+
new StepExecutionRowMapper(jobExecution),
359+
Stream.concat(Stream.of(jobExecution.getId()), stepExecutionIds.stream()).toArray(Object[]::new));
360+
return Set.copyOf(executions);
361+
}
362+
340363
@Override
341364
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
342365
List<StepExecution> executions = getJdbcTemplate().query(getQuery(GET_LAST_STEP_EXECUTION), (rs, rowNum) -> {
@@ -360,6 +383,16 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa
360383
}
361384
}
362385

386+
@Override
387+
public long countStepExecutions(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses) {
388+
return getJdbcTemplate().queryForObject(
389+
getQuery(COUNT_STEP_EXECUTIONS_BY_IDS_AND_STATUSES,
390+
Map.of("%STEP_EXECUTION_IDS%", stepExecutionIds, "%STEP_STATUSES%", matchingBatchStatuses)),
391+
Long.class,
392+
Stream.concat(stepExecutionIds.stream(), matchingBatchStatuses.stream().map(BatchStatus::name))
393+
.toArray(Object[]::new));
394+
}
395+
363396
@Override
364397
public void addStepExecutions(JobExecution jobExecution) {
365398
getJdbcTemplate().query(getQuery(GET_STEP_EXECUTIONS), new StepExecutionRowMapper(jobExecution),

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MongoStepExecutionDao.java

+22
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import java.util.Comparator;
2121
import java.util.List;
2222
import java.util.Optional;
23+
import java.util.Set;
24+
import java.util.stream.Collectors;
2325

26+
import org.springframework.batch.core.BatchStatus;
2427
import org.springframework.batch.core.JobExecution;
2528
import org.springframework.batch.core.JobInstance;
2629
import org.springframework.batch.core.StepExecution;
@@ -95,6 +98,17 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
9598
return stepExecution != null ? this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution) : null;
9699
}
97100

101+
@Override
102+
public Set<StepExecution> getStepExecutions(JobExecution jobExecution, Set<Long> stepExecutionIds) {
103+
Query query = query(where("stepExecutionId").in(stepExecutionIds));
104+
List<org.springframework.batch.core.repository.persistence.StepExecution> stepExecutions = this.mongoOperations
105+
.find(query, org.springframework.batch.core.repository.persistence.StepExecution.class,
106+
STEP_EXECUTIONS_COLLECTION_NAME);
107+
return stepExecutions.stream()
108+
.map(stepExecution -> this.stepExecutionConverter.toStepExecution(stepExecution, jobExecution))
109+
.collect(Collectors.toSet());
110+
}
111+
98112
@Override
99113
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
100114
// TODO optimize the query
@@ -160,4 +174,12 @@ public long countStepExecutions(JobInstance jobInstance, String stepName) {
160174
return count;
161175
}
162176

177+
@Override
178+
public long countStepExecutions(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses) {
179+
Query query = query(where("jobExecutionId").is(stepExecutionIds).and("status").in(matchingBatchStatuses));
180+
return this.mongoOperations.count(query,
181+
org.springframework.batch.core.repository.persistence.StepExecution.class,
182+
STEP_EXECUTIONS_COLLECTION_NAME);
183+
}
184+
163185
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java

+20
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.springframework.batch.core.repository.dao;
1818

1919
import java.util.Collection;
20+
import java.util.Set;
2021

22+
import org.springframework.batch.core.BatchStatus;
2123
import org.springframework.batch.core.JobExecution;
2224
import org.springframework.batch.core.JobInstance;
2325
import org.springframework.batch.core.StepExecution;
@@ -62,6 +64,15 @@ public interface StepExecutionDao {
6264
@Nullable
6365
StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId);
6466

67+
/**
68+
* Get a collection of {@link StepExecution} matching job execution and step execution
69+
* ids.
70+
* @param jobExecution the parent job execution
71+
* @param stepExecutionIds the step execution ids
72+
* @return collection of {@link StepExecution}
73+
*/
74+
Set<StepExecution> getStepExecutions(JobExecution jobExecution, Set<Long> stepExecutionIds);
75+
6576
/**
6677
* Retrieve the last {@link StepExecution} for a given {@link JobInstance} ordered by
6778
* creation time and then id.
@@ -91,6 +102,15 @@ default long countStepExecutions(JobInstance jobInstance, String stepName) {
91102
throw new UnsupportedOperationException();
92103
}
93104

105+
/**
106+
* Count {@link StepExecution} that match the ids and statuses of them - avoid loading
107+
* them into memory
108+
* @param stepExecutionIds given step execution ids
109+
* @param matchingBatchStatuses
110+
* @return the count of matching steps
111+
*/
112+
long countStepExecutions(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses);
113+
94114
/**
95115
* Delete the given step execution.
96116
* @param stepExecution the step execution to delete

spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/CommandLineJobRunnerTests.java

+10
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,11 @@ public StepExecution getStepExecution(@Nullable Long jobExecutionId, @Nullable L
551551
throw new UnsupportedOperationException();
552552
}
553553

554+
@Override
555+
public Set<StepExecution> getStepExecutions(Long jobExecutionId, Set<Long> stepExecutionIds) {
556+
return Set.of();
557+
}
558+
554559
@Override
555560
public List<String> getJobNames() {
556561
throw new UnsupportedOperationException();
@@ -579,6 +584,11 @@ public long getJobInstanceCount(@Nullable String jobName) throws NoSuchJobExcept
579584
}
580585
}
581586

587+
@Override
588+
public long getStepExecutionCount(Set<Long> stepExecutionIds, Set<BatchStatus> matchingBatchStatuses) {
589+
return 0;
590+
}
591+
582592
}
583593

584594
public static class StubJobParametersConverter implements JobParametersConverter {

spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/SimpleJobRepositoryTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ void testSaveStepExecutionSetsLastUpdated() {
202202
assertNotNull(stepExecution.getLastUpdated());
203203

204204
LocalDateTime lastUpdated = stepExecution.getLastUpdated();
205-
assertTrue(lastUpdated.isAfter(before));
205+
assertFalse(lastUpdated.isBefore(before));
206206
}
207207

208208
@Test
@@ -236,7 +236,7 @@ void testUpdateStepExecutionSetsLastUpdated() {
236236
assertNotNull(stepExecution.getLastUpdated());
237237

238238
LocalDateTime lastUpdated = stepExecution.getLastUpdated();
239-
assertTrue(lastUpdated.isAfter(before));
239+
assertFalse(lastUpdated.isBefore(before));
240240
}
241241

242242
@Test

spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java

+11-16
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.commons.logging.Log;
2929
import org.apache.commons.logging.LogFactory;
3030

31-
import org.springframework.batch.core.JobExecution;
31+
import org.springframework.batch.core.BatchStatus;
3232
import org.springframework.batch.core.Step;
3333
import org.springframework.batch.core.StepExecution;
3434
import org.springframework.batch.core.explore.JobExplorer;
@@ -251,25 +251,20 @@ protected Set<StepExecution> doHandle(StepExecution managerStepExecution,
251251

252252
private Set<StepExecution> pollReplies(final StepExecution managerStepExecution, final Set<StepExecution> split)
253253
throws Exception {
254-
Set<Long> partitionStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet());
255254

256255
Callable<Set<StepExecution>> callback = () -> {
257-
JobExecution jobExecution = jobExplorer.getJobExecution(managerStepExecution.getJobExecutionId());
258-
Set<StepExecution> finishedStepExecutions = jobExecution.getStepExecutions()
259-
.stream()
260-
.filter(stepExecution -> partitionStepExecutionIds.contains(stepExecution.getId()))
261-
.filter(stepExecution -> !stepExecution.getStatus().isRunning())
262-
.collect(Collectors.toSet());
263-
264-
if (logger.isDebugEnabled()) {
265-
logger.debug(String.format("Currently waiting on %s partitions to finish", split.size()));
266-
}
267-
268-
if (finishedStepExecutions.size() == split.size()) {
269-
return finishedStepExecutions;
256+
Set<Long> currentStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet());
257+
long runningStepExecutions = jobExplorer.getStepExecutionCount(currentStepExecutionIds,
258+
BatchStatus.RUNNING_STATUSES);
259+
if (runningStepExecutions > 0 && !split.isEmpty()) {
260+
if (logger.isDebugEnabled()) {
261+
logger.debug(String.format("Currently waiting on %s out of %s partitions to finish",
262+
runningStepExecutions, split.size()));
263+
}
264+
return null;
270265
}
271266
else {
272-
return null;
267+
return jobExplorer.getStepExecutions(managerStepExecution.getJobExecutionId(), currentStepExecutionIds);
273268
}
274269
};
275270

0 commit comments

Comments
 (0)