Skip to content

Commit

Permalink
Fix job-master leak memory when submitting distributed jobs
Browse files Browse the repository at this point in the history
fix:fix job-master leak memory when submitting a large number of distributed jobs(DIST_LOAD/DIST_CP/Persist jobs)

### What changes are proposed in this pull request?

Start a periodic thread to clear expired jobs information that cannot be trace by the client in CmdJobTracker.The default retention time is 1day,which is the same configuration as LoadV2.

### Why are the changes needed?

When many jobs are submitted,the job master finally will have an oom problem, we can find that the cmdJobTracker retains the residual job information and not cleaned regularly, resulting in memory leaks.

### Does this PR introduce any user facing changes?

Please list the user-facing changes introduced by your change, including
1.add Configuration:
          alluxio.job.master.job.trace.retention.time=xx,the default value is 1d.

Related issue: #18635
			pr-link: #18639
			change-id: cid-d4e5853a1818a22c8a0411a27bfe1141c6f24ebd
  • Loading branch information
liiuzq-xiaobai authored Jul 8, 2024
1 parent 1359cc5 commit a4ec456
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 13 deletions.
8 changes: 8 additions & 0 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -7501,6 +7501,12 @@ public String toString() {
.setDefaultValue("60sec")
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_JOB_TRACE_RETENTION_TIME =
durationBuilder(Name.JOB_MASTER_JOB_TRACE_RETENTION_TIME)
.setDescription("The length of time the client can trace the submitted job.")
.setDefaultValue("1d")
.setScope(Scope.MASTER)
.build();
public static final PropertyKey JOB_MASTER_JOB_CAPACITY =
longBuilder(Name.JOB_MASTER_JOB_CAPACITY)
.setDescription("The total possible number of available job statuses in the job master. "
Expand Down Expand Up @@ -9369,6 +9375,8 @@ public static final class Name {
"alluxio.job.master.finished.job.purge.count";
public static final String JOB_MASTER_FINISHED_JOB_RETENTION_TIME =
"alluxio.job.master.finished.job.retention.time";
public static final String JOB_MASTER_JOB_TRACE_RETENTION_TIME =
"alluxio.job.master.job.trace.retention.time";
public static final String JOB_MASTER_JOB_CAPACITY = "alluxio.job.master.job.capacity";
public static final String JOB_MASTER_MASTER_HEARTBEAT_INTERVAL =
"alluxio.job.master.master.heartbeat.interval";
Expand Down
7 changes: 7 additions & 0 deletions job/common/src/main/java/alluxio/job/wire/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public boolean isFinished() {
return this.equals(CANCELED) || this.equals(FAILED) || this.equals(COMPLETED);
}

/**
* @return whether this status represents a Completed state
*/
public boolean isCompleted() {
return this.equals(COMPLETED);
}

/**
* @return proto representation of the status
*/
Expand Down
2 changes: 1 addition & 1 deletion job/server/src/main/java/alluxio/master/job/JobMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public JobMaster(MasterContext masterContext, FileSystem filesystem,
mWorkerHealth = new ConcurrentHashMap<>();

mCmdJobTracker = new CmdJobTracker(
fsContext, this);
fsContext, this, mPlanTracker);

MetricsSystem.registerGaugeIfAbsent(
MetricKey.MASTER_JOB_COUNT.getName(),
Expand Down
20 changes: 17 additions & 3 deletions job/server/src/main/java/alluxio/master/job/plan/PlanTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class PlanTracker {
private final SortedSet<PlanInfo> mFailed;

/** A FIFO queue used to track jobs which have status {@link Status#isFinished()} as true. */
private final LinkedBlockingQueue<PlanInfo> mFinished;
private final LinkedList<PlanInfo> mFinished;

private final WorkflowTracker mWorkflowTracker;

Expand Down Expand Up @@ -114,7 +114,7 @@ public PlanTracker(long capacity, long retentionMs,
}
return Long.signum(right.getId() - left.getId());
}));
mFinished = new LinkedBlockingQueue<>();
mFinished = new LinkedList<>();
mWorkflowTracker = workflowTracker;
}

Expand Down Expand Up @@ -300,6 +300,20 @@ public Set<Long> findJobs(String name, List<Status> statusList) {
.map(Map.Entry::getKey).collect(Collectors.toSet());
}

/**
* Remove expired jobs in PlanTracker.
* @param jobIds the list of removed jobId
*/
public void removeJobs(List<Long> jobIds) {
mWorkflowTracker.cleanup(jobIds);
for (Long jobId : jobIds) {
PlanInfo removedPlanInfo = mCoordinators.get(jobId).getPlanInfo();
mCoordinators.remove(jobId);
mFailed.remove(removedPlanInfo);
mFinished.remove(removedPlanInfo);
}
}

private void checkActiveSetReplicaJobs(JobConfig jobConfig) throws JobDoesNotExistException {
if (jobConfig instanceof SetReplicaConfig) {
Set<Pair<String, Long>> activeJobs = mCoordinators.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@

import alluxio.AlluxioURI;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.JobDoesNotExistException;
import alluxio.grpc.OperationType;
import alluxio.job.CmdConfig;
import alluxio.job.cmd.load.LoadCliConfig;
import alluxio.job.cmd.migrate.MigrateCliConfig;
Expand All @@ -24,26 +27,31 @@
import alluxio.job.wire.Status;
import alluxio.master.job.JobMaster;
import alluxio.master.job.common.CmdInfo;
import alluxio.master.job.plan.PlanTracker;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;

/**
* CmdJobTracker to schedule a Cmd job to run.
*/
@ThreadSafe
public class CmdJobTracker {
public class CmdJobTracker implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CmdJobTracker.class);
private final Map<Long, CmdInfo> mInfoMap = new ConcurrentHashMap<>(0, 0.95f,
Math.max(8, 2 * Runtime.getRuntime().availableProcessors()));
Expand All @@ -52,18 +60,29 @@ public class CmdJobTracker {
private final PersistRunner mPersistRunner;
protected FileSystemContext mFsContext;
public static final String DELIMITER = ",";
private final ScheduledExecutorService mScheduleCleanExecutor;
private final Long mTraceRetentionTime;

private final PlanTracker mPlanTracker;

/**
* Create a new instance of {@link CmdJobTracker}.
* @param fsContext filesystem context
* @param jobMaster the job master
* @param planTracker the planTracker
*/
public CmdJobTracker(FileSystemContext fsContext,
JobMaster jobMaster) {
JobMaster jobMaster, PlanTracker planTracker) {
mFsContext = fsContext;
mDistLoadCliRunner = new DistLoadCliRunner(mFsContext, jobMaster);
mMigrateCliRunner = new MigrateCliRunner(mFsContext, jobMaster);
mPersistRunner = new PersistRunner(mFsContext, jobMaster);
mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor();
mScheduleCleanExecutor.scheduleAtFixedRate(this::
cleanExpiredJobInfos, 60, 600, TimeUnit.SECONDS);
mTraceRetentionTime = Configuration.getMs(
PropertyKey.JOB_MASTER_JOB_TRACE_RETENTION_TIME);
mPlanTracker = planTracker;
}

/**
Expand All @@ -72,15 +91,25 @@ public CmdJobTracker(FileSystemContext fsContext,
* @param distLoadCliRunner DistributedLoad runner
* @param migrateCliRunner DistributedCopy runner
* @param persistRunner Persist runner
* @param retentionTime job retention time
* @param planTracker the planTracker
*/
public CmdJobTracker(FileSystemContext fsContext,
DistLoadCliRunner distLoadCliRunner,
MigrateCliRunner migrateCliRunner,
PersistRunner persistRunner) {
PersistRunner persistRunner,
Long retentionTime,
PlanTracker planTracker
) {
mFsContext = fsContext;
mDistLoadCliRunner = distLoadCliRunner;
mMigrateCliRunner = migrateCliRunner;
mPersistRunner = persistRunner;
mScheduleCleanExecutor = Executors.newSingleThreadScheduledExecutor();
mScheduleCleanExecutor.scheduleAtFixedRate(this::
cleanExpiredJobInfos, 60, 600, TimeUnit.SECONDS);
mTraceRetentionTime = retentionTime;
mPlanTracker = planTracker;
}

/**
Expand Down Expand Up @@ -134,7 +163,7 @@ private void runDistributedCommand(CmdConfig cmdConfig, long jobControlId)

/**
* Get status information for a CMD.
* @param jobControlId
* @param jobControlId jobControlId to trace a CMD
* @return the Command level status
*/
public Status getCmdStatus(long jobControlId) throws JobDoesNotExistException {
Expand Down Expand Up @@ -270,4 +299,37 @@ public CmdStatusBlock getCmdStatusBlock(long jobControlId)
.collect(Collectors.toList());
return new CmdStatusBlock(cmdInfo.getJobControlId(), blockList, cmdInfo.getOperationType());
}

private void cleanExpiredJobInfos() {
long currentTime = System.currentTimeMillis();
for (Map.Entry<Long, CmdInfo> x : mInfoMap.entrySet()) {
CmdInfo cmdInfo = x.getValue();
List<Long> cleanedJobsId = new ArrayList<>();
if (OperationType.DIST_LOAD.equals(cmdInfo.getOperationType())
&& currentTime - cmdInfo.getJobSubmissionTime() > mTraceRetentionTime) {
try {
Status jobStatus = getCmdStatus(cmdInfo.getJobControlId());
if (jobStatus.isFinished()) {
for (CmdRunAttempt runAttempt : cmdInfo.getCmdRunAttempt()) {
cleanedJobsId.add(runAttempt.getJobId());
}
mPlanTracker.removeJobs(cleanedJobsId);
mInfoMap.remove(cmdInfo.getJobControlId());
LOG.info("JobControlId:{} has been cleaned in CmdJobTracker,"
+ " client will not trace the job anymore.The filePaths in CmdInfo are:{}",
cmdInfo.getJobControlId(), String.join(", ", cmdInfo.getFilePath()));
}
} catch (JobDoesNotExistException e) {
LOG.warn("JobControlId:{} can not find in CmdJobTracker when clean expired Job"
+ "with unexpected exception.The filePaths in CmdInfo are:{}",
cmdInfo.getJobControlId(), String.join(", ", cmdInfo.getFilePath()));
}
}
}
}

@Override
public void close() throws Exception {
mScheduleCleanExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;

Expand Down Expand Up @@ -145,6 +146,18 @@ public void testGetCoordinator() throws Exception {
((Queue) AlluxioMockUtil.getInternalState(mTracker, "mFinished")).size());
}

@Test
public void removeExpiredJobsInfo() throws Exception {
List<Long> jobIdList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
jobIdList.add(addJob(100));
}
mTracker.removeJobs(jobIdList);
for (Long jobId : jobIdList) {
assertNull("job id should not exist", mTracker.getCoordinator(jobId));
}
}

@Test
public void testDuplicateSetReplicaJobs() throws Exception {
long jobId = mJobIdGenerator.getNewJobId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import alluxio.job.wire.JobSource;
import alluxio.job.wire.SimpleJobStatusBlock;
import alluxio.job.wire.Status;
import alluxio.master.job.JobMaster;
import alluxio.master.job.common.CmdInfo;
import alluxio.master.job.plan.PlanTracker;
import alluxio.master.job.workflow.WorkflowTracker;

import com.beust.jcommander.internal.Lists;
import org.junit.Assert;
Expand All @@ -46,6 +49,7 @@
public final class CmdJobTrackerTest {
private static final int REPEATED_ATTEMPT_COUNT = 5;
private static final int ONE_ATTEMPT = 1;
private static final long CAPACITY = 100;

private CmdJobTracker mCmdJobTracker;
private FileSystem mFs;
Expand All @@ -55,25 +59,32 @@ public final class CmdJobTrackerTest {
private MigrateCliRunner mMigrateCliRunner;
private DistLoadCliRunner mDistLoadRunner;
private PersistRunner mPersistRunner;

private PlanTracker mPlanTracker;
private LoadCliConfig mLoad;
private MigrateCliConfig mMigrate;
private List<Status> mSearchingCriteria = Lists.newArrayList();
private WorkflowTracker mWorkflowTracker;
private JobMaster mMockJobMaster;

private Long mRetentionTime;

@Rule
public ExpectedException mException = ExpectedException.none();

@Before
public void before() throws Exception {
mFs = mock(FileSystem.class);
mRetentionTime = 1000L;
FileSystemContext fsCtx = mock(FileSystemContext.class);

mMigrateCliRunner = mock(MigrateCliRunner.class);
mDistLoadRunner = mock(DistLoadCliRunner.class);
mPersistRunner = mock(PersistRunner.class);

mPlanTracker = mock(PlanTracker.class);
mMockJobMaster = mock(JobMaster.class);
mWorkflowTracker = new WorkflowTracker(mMockJobMaster);
mCmdJobTracker = new CmdJobTracker(fsCtx,
mDistLoadRunner, mMigrateCliRunner, mPersistRunner);
mDistLoadRunner, mMigrateCliRunner, mPersistRunner, mRetentionTime, mPlanTracker);

mLoad = new LoadCliConfig("/path/to/load", 3, 1, Collections.EMPTY_SET,
Collections.EMPTY_SET, Collections.EMPTY_SET, Collections.EMPTY_SET, true);
Expand All @@ -97,6 +108,33 @@ public void runDistLoadBatchCompleteTest() throws Exception {
Assert.assertEquals(s, Status.COMPLETED);
}

@Test
public void runCleanExpiredJobsTest() throws Exception {
generateLoadCommandForStatus(Status.CANCELED);
generateLoadCommandForStatus(Status.RUNNING);
generateLoadCommandForStatus(Status.FAILED);
generateLoadCommandForStatus(Status.COMPLETED);
generateLoadCommandForStatus(Status.CREATED);
Thread.sleep(70000L);
// the expired job has been cleaned in mInfoMap
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.CANCELED);
Set<Long> cancelCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(0, cancelCmdIds.size());
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.COMPLETED);
Set<Long> completedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(0, completedCmdIds.size());
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.FAILED);
Set<Long> failedCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(0, failedCmdIds.size());
mSearchingCriteria.clear();
mSearchingCriteria.add(Status.RUNNING);
Set<Long> runningCmdIds = mCmdJobTracker.findCmdIds(mSearchingCriteria);
Assert.assertEquals(2, runningCmdIds.size());
}

@Test
public void runDistLoadBatchFailTest() throws Exception {
CmdInfo cmdInfo = new CmdInfo(mLoadJobId, OperationType.DIST_LOAD,
Expand Down Expand Up @@ -316,7 +354,7 @@ public void testGetCmdStatusBlock() throws Exception {

// Below are all help functions.
private void prepareDistLoadTest(
CmdInfo cmdInfo, LoadCliConfig loadCliConfig, long loadId) throws Exception {
CmdInfo cmdInfo, LoadCliConfig loadCliConfig, long loadId) throws Exception {
AlluxioURI filePath = new AlluxioURI(loadCliConfig.getFilePath());
int replication = loadCliConfig.getReplication();
Set<String> workerSet = loadCliConfig.getWorkerSet();
Expand All @@ -325,7 +363,7 @@ private void prepareDistLoadTest(
Set<String> excludedLocalityIds = loadCliConfig.getExcludedLocalityIds();
boolean directCache = loadCliConfig.getDirectCache();
int batch = loadCliConfig.getBatchSize();

// Mock the behavior of runDistLoad
when(mDistLoadRunner.runDistLoad(batch, filePath, replication, workerSet,
excludedWorkerSet, localityIds, excludedLocalityIds, directCache, loadId))
.thenReturn(cmdInfo);
Expand Down

0 comments on commit a4ec456

Please sign in to comment.