diff --git a/core/common/src/main/java/alluxio/Server.java b/core/common/src/main/java/alluxio/Server.java index adbbc69c0695..d29437020323 100644 --- a/core/common/src/main/java/alluxio/Server.java +++ b/core/common/src/main/java/alluxio/Server.java @@ -13,6 +13,8 @@ import alluxio.grpc.GrpcService; import alluxio.grpc.ServiceType; +import alluxio.heartbeat.HeartbeatThreadManager; +import alluxio.wire.HeartbeatThreadInfo; import java.io.IOException; import java.util.Map; @@ -57,4 +59,12 @@ public interface Server { * Closes the server. */ void close() throws IOException; + + /** + * Gets heartbeat threads info. + * @return the heartbeat threads info + */ + default Map getHeartbeatThreads() { + return HeartbeatThreadManager.getHeartbeatThreads(); + } } diff --git a/core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java b/core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java index b7fc2342c7ca..185b0d95c622 100644 --- a/core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java +++ b/core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java @@ -11,13 +11,17 @@ package alluxio.heartbeat; +import alluxio.Constants; import alluxio.conf.AlluxioConfiguration; import alluxio.conf.Reconfigurable; +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; import alluxio.conf.ReconfigurableRegistry; import alluxio.security.authentication.AuthenticatedClientUser; import alluxio.security.user.UserState; import alluxio.util.CommonUtils; import alluxio.util.SecurityUtils; +import alluxio.wire.HeartbeatThreadInfo; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -26,6 +30,7 @@ import java.io.IOException; import java.time.Clock; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; @@ -36,13 +41,19 @@ @NotThreadSafe public final class HeartbeatThread implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(HeartbeatThread.class); + private static final String DATE_PATTERN = + Configuration.getString(PropertyKey.USER_DATE_FORMAT_PATTERN); private final String mThreadName; private final HeartbeatExecutor mExecutor; private final UserState mUserState; private HeartbeatTimer mTimer; private AlluxioConfiguration mConfiguration; - private Status mStatus; + private volatile Status mStatus; + private AtomicLong mCounter = new AtomicLong(0L); + private volatile long mStartTickTime; + private volatile long mStartHeartbeatTime; + private volatile long mEndHeartbeatTime; /** * @param executorName the executor name defined in {@link HeartbeatContext} @@ -132,7 +143,6 @@ public HeartbeatThread(String executorName, HeartbeatExecutor executor, @Override public void run() { - long counter = 0L; try { if (SecurityUtils.isSecurityEnabled(mConfiguration) && AuthenticatedClientUser.get(mConfiguration) == null) { @@ -149,10 +159,15 @@ public void run() { while (!Thread.interrupted()) { // TODO(peis): Fix this. The current implementation consumes one thread even when ticking. mStatus = Status.WAITING; + mStartTickTime = CommonUtils.getCurrentMs(); long limitTime = mTimer.tick(); mStatus = Status.RUNNING; - LOG.debug("{} #{} will run limited in {}s", mThreadName, counter++, limitTime / 1000); + mStartHeartbeatTime = CommonUtils.getCurrentMs(); + LOG.debug("{} #{} will run limited in {}s", mThreadName, mCounter.get(), + limitTime / Constants.SECOND_MS); mExecutor.heartbeat(limitTime); + mEndHeartbeatTime = CommonUtils.getCurrentMs(); + updateState(); } } catch (InterruptedException e) { // Allow thread to exit. @@ -167,11 +182,44 @@ public void run() { } } + private synchronized void updateState() { + mCounter.incrementAndGet(); + mStartTickTime = 0L; + mStartHeartbeatTime = 0L; + } + + /** + * @return the thread name + */ + public String getThreadName() { + return mThreadName; + } + /** - * @return the status of current heartbeat thread + * @return the {@link HeartbeatThreadInfo} for the heartbeat thread */ - public Status getStatus() { - return mStatus; + public synchronized HeartbeatThreadInfo toHeartbeatThreadInfo() { + String previousReport = String.format("#%d [%s - %s - %s] ticked(s) %d, run(s) %d.", + mCounter.get(), + CommonUtils.convertMsToDate(mStartTickTime, DATE_PATTERN), + CommonUtils.convertMsToDate(mStartHeartbeatTime, DATE_PATTERN), + CommonUtils.convertMsToDate(mEndHeartbeatTime, DATE_PATTERN), + (mStartHeartbeatTime - mStartTickTime) / Constants.SECOND_MS, + (mEndHeartbeatTime - mStartHeartbeatTime) / Constants.SECOND_MS); + HeartbeatThreadInfo heartbeatThreadInfo = new HeartbeatThreadInfo() + .setThreadName(mThreadName) + .setCount(mCounter.get()) + .setStatus(mStatus) + .setPreviousReport(previousReport); + if (mStartTickTime != 0) { + heartbeatThreadInfo.setStartTickTime( + CommonUtils.convertMsToDate(mStartTickTime, DATE_PATTERN)); + } + if (mStartHeartbeatTime != 0) { + heartbeatThreadInfo.setStartHeartbeatTime( + CommonUtils.convertMsToDate(mStartHeartbeatTime, DATE_PATTERN)); + } + return heartbeatThreadInfo; } /** diff --git a/core/common/src/main/java/alluxio/heartbeat/HeartbeatThreadManager.java b/core/common/src/main/java/alluxio/heartbeat/HeartbeatThreadManager.java new file mode 100644 index 000000000000..3ef6a178979a --- /dev/null +++ b/core/common/src/main/java/alluxio/heartbeat/HeartbeatThreadManager.java @@ -0,0 +1,85 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.heartbeat; + +import alluxio.wire.HeartbeatThreadInfo; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** + * Manager for heartbeat thread. + */ +public class HeartbeatThreadManager { + private static final Map HEARTBEAT_THREAD_MAP + = new HashMap<>(); + private static final Map> HEARTBEAT_THREAD_INDEX_MAP + = new HashMap<>(); + + /** + * Add a heartbeat thread. + * + * @param key the name of heartbeat thread + * @param heartbeatThread the heartbeat thread + */ + public static synchronized void register(Object key, HeartbeatThread heartbeatThread) { + List list = + HEARTBEAT_THREAD_INDEX_MAP.computeIfAbsent(key, (k) -> new LinkedList<>()); + list.add(heartbeatThread); + HEARTBEAT_THREAD_MAP.put(heartbeatThread.getThreadName(), heartbeatThread); + } + + /** + * Remove the heartbeat threads related to the given key. + * @param key the key of heartbeat thread + */ + public static synchronized void unregister(Object key) { + List heartbeatThreads = HEARTBEAT_THREAD_INDEX_MAP.remove(key); + if (heartbeatThreads != null) { + for (HeartbeatThread heartbeatThread : heartbeatThreads) { + HEARTBEAT_THREAD_MAP.remove(heartbeatThread.getThreadName()); + } + } + } + + /** + * Submits a heartbeat thread for execution and returns a Future representing that task. + * Meanwhile, register the heartbeat thread to HeartbeatThreadManager to monitor its status. + * @param executorService the executor service for executing heartbeat threads + * @param heartbeatThread the heartbeat thread + * @return a Future representing pending completion of the task + */ + public static Future submit(ExecutorService executorService, + HeartbeatThread heartbeatThread) { + HeartbeatThreadManager.register(executorService, heartbeatThread); + return executorService.submit(heartbeatThread); + } + + /** + * Gets heartbeat threads info. + * @return the heartbeat threads info + */ + public static synchronized Map getHeartbeatThreads() { + SortedMap heartbeatThreads = new TreeMap<>(); + for (HeartbeatThread heartbeatThread : HEARTBEAT_THREAD_MAP.values()) { + heartbeatThreads.put(heartbeatThread.getThreadName(), + heartbeatThread.toHeartbeatThreadInfo()); + } + return heartbeatThreads; + } +} diff --git a/core/common/src/main/java/alluxio/wire/HeartbeatThreadInfo.java b/core/common/src/main/java/alluxio/wire/HeartbeatThreadInfo.java new file mode 100644 index 000000000000..d137317ee749 --- /dev/null +++ b/core/common/src/main/java/alluxio/wire/HeartbeatThreadInfo.java @@ -0,0 +1,148 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.wire; + +import alluxio.annotation.PublicApi; +import alluxio.heartbeat.HeartbeatThread; + +import com.google.common.base.MoreObjects; + +import java.io.Serializable; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * The heartbeat thread information. + */ +@NotThreadSafe +@PublicApi +public class HeartbeatThreadInfo implements Serializable { + private static final long serialVersionUID = -5457846691141843682L; + + private String mThreadName = ""; + private long mCount; + private HeartbeatThread.Status mStatus; + private String mPreviousReport = ""; + private String mStartHeartbeatTime = ""; + private String mStartTickTime = ""; + + /** + * Creates a new instance of {@link HeartbeatThreadInfo}. + */ + public HeartbeatThreadInfo() {} + + /** + * @return the thread name + */ + public String getThreadName() { + return mThreadName; + } + + /** + * @param threadName set thread name + * @return the heartbeat thread information + */ + public HeartbeatThreadInfo setThreadName(String threadName) { + mThreadName = threadName; + return this; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("threadName", mThreadName) + .add("count", mCount) + .add("status", mStatus) + .add("startHeartbeatTime", mStartHeartbeatTime) + .add("startTickTime", mStartTickTime) + .add("previousReport", mPreviousReport) + .toString(); + } + + /** + * @param count the current heartbeat count + * @return the heartbeat thread information + */ + public HeartbeatThreadInfo setCount(long count) { + mCount = count; + return this; + } + + /** + * @param status the current heartbeat status + * @return the heartbeat thread information + */ + public HeartbeatThreadInfo setStatus(HeartbeatThread.Status status) { + mStatus = status; + return this; + } + + /** + * @param previousReport the previous heartbeat report + * @return the heartbeat thread information + */ + public HeartbeatThreadInfo setPreviousReport(String previousReport) { + mPreviousReport = previousReport; + return this; + } + + /** + * @return the current heartbeat invoke count + */ + public long getCount() { + return mCount; + } + + /** + * @return the heartbeat status + */ + public HeartbeatThread.Status getStatus() { + return mStatus; + } + + /** + * @return the report for previous heartbeat + */ + public String getPreviousReport() { + return mPreviousReport; + } + + /** + * @param startHeartbeatTime the current heartbeat thread heartbeat time + * @return the heartbeat thread information + */ + public HeartbeatThreadInfo setStartHeartbeatTime(String startHeartbeatTime) { + mStartHeartbeatTime = startHeartbeatTime; + return this; + } + + /** + * @return the current heartbeat thread heartbeat time + */ + public String getStartHeartbeatTime() { + return mStartHeartbeatTime; + } + + /** + * @param startTickTime the current heartbeat thread tick time + * @return the heartbeat thread information + */ + public HeartbeatThreadInfo setStartTickTime(String startTickTime) { + mStartTickTime = startTickTime; + return this; + } + + /** + * @return the current heartbeat thread tick time + */ + public String getStartTickTime() { + return mStartTickTime; + } +} diff --git a/core/common/src/main/java/alluxio/wire/WebUIHeartbeatThreads.java b/core/common/src/main/java/alluxio/wire/WebUIHeartbeatThreads.java new file mode 100644 index 000000000000..c9636f158440 --- /dev/null +++ b/core/common/src/main/java/alluxio/wire/WebUIHeartbeatThreads.java @@ -0,0 +1,62 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.wire; + +import com.google.common.base.MoreObjects; + +import java.io.Serializable; +import java.util.Map; +import javax.annotation.concurrent.NotThreadSafe; + +/** + * Alluxio WebUI heartbeat threads information. + */ +@NotThreadSafe +public final class WebUIHeartbeatThreads implements Serializable { + private static final long serialVersionUID = -2903043308252679410L; + + private HeartbeatThreadInfo[] mHeartbeatThreadInfos; + + /** + * Creates a new instance of {@link WebUIHeartbeatThreads}. + */ + public WebUIHeartbeatThreads() { + } + + /** + * Get heartbeat thread infos [ ]. + * + * @return the heartbeat thread info [ ] + */ + public HeartbeatThreadInfo[] getHeartbeatThreadInfos() { + return mHeartbeatThreadInfos; + } + + /** + * Sets heartbeat thread infos. + * + * @param heartbeatThreadInfos the heartbeat threads info + * @return the heartbeat thread infos + */ + public WebUIHeartbeatThreads setHeartbeatThreadInfos( + Map heartbeatThreadInfos) { + mHeartbeatThreadInfos = heartbeatThreadInfos.values().toArray(new HeartbeatThreadInfo[0]); + return this; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("debug", mDebug) + .add("heartbeatThreadInfos", mHeartbeatThreadInfos) + .toString(); + } +} diff --git a/core/server/common/src/main/java/alluxio/master/AbstractMaster.java b/core/server/common/src/main/java/alluxio/master/AbstractMaster.java index bc4a88862b40..16eedae0b3e6 100644 --- a/core/server/common/src/main/java/alluxio/master/AbstractMaster.java +++ b/core/server/common/src/main/java/alluxio/master/AbstractMaster.java @@ -14,6 +14,7 @@ import alluxio.Constants; import alluxio.Server; import alluxio.exception.status.UnavailableException; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.master.journal.Journal; import alluxio.master.journal.JournalContext; import alluxio.master.journal.StateChangeJournalContext; @@ -105,6 +106,7 @@ public void stop() throws IOException { } // Shut down the executor service, interrupting any running threads. if (mExecutorService != null) { + HeartbeatThreadManager.unregister(mExecutorService); try { mExecutorService.shutdownNow(); String awaitFailureMessage = diff --git a/core/server/common/src/main/java/alluxio/worker/AbstractWorker.java b/core/server/common/src/main/java/alluxio/worker/AbstractWorker.java index 9c0ecf592492..491a11e9c02a 100644 --- a/core/server/common/src/main/java/alluxio/worker/AbstractWorker.java +++ b/core/server/common/src/main/java/alluxio/worker/AbstractWorker.java @@ -12,6 +12,7 @@ package alluxio.worker; import alluxio.Constants; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.util.executor.ExecutorServiceFactory; import alluxio.wire.WorkerNetAddress; @@ -62,6 +63,7 @@ public void start(WorkerNetAddress address) throws IOException { public void stop() throws IOException { // Shut down the executor service, interrupting any running threads. if (mExecutorService != null) { + HeartbeatThreadManager.unregister(mExecutorService); try { mExecutorService.shutdownNow(); String awaitFailureMessage = diff --git a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java index 6100141fbd46..eb69940bb8d4 100644 --- a/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java +++ b/core/server/master/src/main/java/alluxio/master/block/DefaultBlockMaster.java @@ -47,6 +47,7 @@ import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.master.CoreMaster; import alluxio.master.CoreMasterContext; import alluxio.master.WorkerState; @@ -570,7 +571,7 @@ public void close() { public void start(Boolean isLeader) throws IOException { super.start(isLeader); if (isLeader || mWorkerRegisterToAllMasters) { - getExecutorService().submit(new HeartbeatThread( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread( HeartbeatContext.MASTER_LOST_WORKER_DETECTION, new LostWorkerDetectionHeartbeatExecutor(), () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_LOST_WORKER_DETECTION_INTERVAL)), @@ -578,7 +579,7 @@ HeartbeatContext.MASTER_LOST_WORKER_DETECTION, new LostWorkerDetectionHeartbeatE } // This periodically scans all open register streams and closes hanging ones - getExecutorService().submit(new HeartbeatThread( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread( HeartbeatContext.MASTER_WORKER_REGISTER_SESSION_CLEANER, new WorkerRegisterStreamGCExecutor(), () -> new FixedIntervalSupplier(Configuration.getMs( diff --git a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java index ae5a931b77db..3a00a21d3fa4 100644 --- a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java +++ b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java @@ -68,6 +68,7 @@ import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatThread; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.job.plan.persist.PersistConfig; import alluxio.job.wire.JobInfo; import alluxio.master.CoreMaster; @@ -733,20 +734,20 @@ public void start(Boolean isPrimary) throws IOException { .getMs(PropertyKey.MASTER_PERIODIC_BLOCK_INTEGRITY_CHECK_INTERVAL); if (blockIntegrityCheckInterval > 0) { // negative or zero interval implies disabled - getExecutorService().submit( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.MASTER_BLOCK_INTEGRITY_CHECK, new BlockIntegrityChecker(this), () -> new FixedIntervalSupplier(Configuration.getMs( PropertyKey.MASTER_PERIODIC_BLOCK_INTEGRITY_CHECK_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); } - getExecutorService().submit( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.MASTER_TTL_CHECK, new InodeTtlChecker(this, mInodeTree), () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_TTL_CHECKER_INTERVAL_MS)), Configuration.global(), mMasterContext.getUserState())); - getExecutorService().submit( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.MASTER_LOST_FILES_DETECTION, new LostFileDetector(this, mBlockMaster, mInodeTree), () -> new FixedIntervalSupplier( @@ -759,8 +760,9 @@ public void start(Boolean isPrimary) throws IOException { () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_REPLICATION_CHECK_INTERVAL_MS)), Configuration.global(), mMasterContext.getUserState()); - getExecutorService().submit(mReplicationCheckHeartbeatThread); - getExecutorService().submit( + HeartbeatThreadManager.submit(getExecutorService(), + mReplicationCheckHeartbeatThread); + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.MASTER_PERSISTENCE_SCHEDULER, new PersistenceScheduler(), () -> new FixedIntervalSupplier( @@ -772,20 +774,20 @@ public void start(Boolean isPrimary) throws IOException { new LinkedBlockingQueue<>(), alluxio.util.ThreadFactoryUtils.build("Persist-Checker-%d", true)); mPersistCheckerPool.allowCoreThreadTimeOut(true); - getExecutorService().submit( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.MASTER_PERSISTENCE_CHECKER, new PersistenceChecker(), () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_PERSISTENCE_CHECKER_INTERVAL_MS)), Configuration.global(), mMasterContext.getUserState())); - getExecutorService().submit( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.MASTER_METRICS_TIME_SERIES, new TimeSeriesRecorder(), () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_METRICS_TIME_SERIES_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); if (Configuration.getBoolean(PropertyKey.UNDERFS_CLEANUP_ENABLED)) { - getExecutorService().submit( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.MASTER_UFS_CLEANUP, new UfsCleaner(this), () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.UNDERFS_CLEANUP_INTERVAL)), diff --git a/core/server/master/src/main/java/alluxio/master/meta/AlluxioMasterRestServiceHandler.java b/core/server/master/src/main/java/alluxio/master/meta/AlluxioMasterRestServiceHandler.java index f09279100267..7e8abe5a9ac8 100644 --- a/core/server/master/src/main/java/alluxio/master/meta/AlluxioMasterRestServiceHandler.java +++ b/core/server/master/src/main/java/alluxio/master/meta/AlluxioMasterRestServiceHandler.java @@ -66,6 +66,7 @@ import alluxio.wire.ConfigHash; import alluxio.wire.FileBlockInfo; import alluxio.wire.FileInfo; +import alluxio.wire.HeartbeatThreadInfo; import alluxio.wire.MasterInfo; import alluxio.wire.MasterWebUIBrowse; import alluxio.wire.MasterWebUIConfiguration; @@ -78,6 +79,7 @@ import alluxio.wire.MasterWebUIOverview; import alluxio.wire.MasterWebUIWorkers; import alluxio.wire.MountPointInfo; +import alluxio.wire.WebUIHeartbeatThreads; import alluxio.wire.WorkerInfo; import alluxio.wire.WorkerNetAddress; @@ -151,6 +153,7 @@ public final class AlluxioMasterRestServiceHandler { public static final String WEBUI_WORKERS = "webui_workers"; public static final String WEBUI_METRICS = "webui_metrics"; public static final String WEBUI_MOUNTTABLE = "webui_mounttable"; + public static final String WEBUI_HEARTBEAT_THREADS = "webui_heartbeat_threads"; public static final String WEBUI_MASTERS = "webui_masters"; // queries @@ -1172,6 +1175,25 @@ public Response getWebUIMetrics() { }, Configuration.global()); } + /** + * Gets Web UI heartbeat threads. + * + * @return the response object + */ + @GET + @Path(WEBUI_HEARTBEAT_THREADS) + public Response getWebUIHeartbeatThreads() { + return RestUtils.call(() -> { + WebUIHeartbeatThreads response = new WebUIHeartbeatThreads(); + + Map heartbeatThreads = mMetaMaster.getHeartbeatThreads(); + + response.setHeartbeatThreadInfos(heartbeatThreads); + + return response; + }, Configuration.global()); + } + private Capacity getCapacityInternal() { return new Capacity().setTotal(mBlockMaster.getCapacityBytes()) .setUsed(mBlockMaster.getUsedBytes()); diff --git a/core/server/master/src/main/java/alluxio/master/meta/DefaultMetaMaster.java b/core/server/master/src/main/java/alluxio/master/meta/DefaultMetaMaster.java index bdd3cc06c70c..930766fd7fed 100644 --- a/core/server/master/src/main/java/alluxio/master/meta/DefaultMetaMaster.java +++ b/core/server/master/src/main/java/alluxio/master/meta/DefaultMetaMaster.java @@ -44,6 +44,7 @@ import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.master.CoreMaster; import alluxio.master.CoreMasterContext; import alluxio.master.MasterClientContext; @@ -319,19 +320,19 @@ public void start(Boolean isPrimary) throws IOException { Configuration.getConfiguration(Scope.MASTER)); // The service that detects lost standby master nodes - getExecutorService().submit(new HeartbeatThread( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread( HeartbeatContext.MASTER_LOST_MASTER_DETECTION, new LostMasterDetectionHeartbeatExecutor(), () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); - getExecutorService().submit( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.MASTER_LOG_CONFIG_REPORT_SCHEDULING, new LogConfigReportHeartbeatExecutor(), () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_LOG_CONFIG_REPORT_HEARTBEAT_INTERVAL)), Configuration.global(), mMasterContext.getUserState())); - getExecutorService().submit(new HeartbeatThread( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread( HeartbeatContext.MASTER_LOST_PROXY_DETECTION, new LostProxyDetectionHeartbeatExecutor(), () -> new FixedIntervalSupplier( @@ -344,7 +345,7 @@ public void start(Boolean isPrimary) throws IOException { mDailyBackup.start(); } if (mJournalSpaceMonitor != null) { - getExecutorService().submit(new HeartbeatThread( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread( HeartbeatContext.MASTER_JOURNAL_SPACE_MONITOR, mJournalSpaceMonitor, () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_JOURNAL_SPACE_MONITOR_INTERVAL)), @@ -358,11 +359,12 @@ public void start(Boolean isPrimary) throws IOException { } if (Configuration.getBoolean(PropertyKey.MASTER_UPDATE_CHECK_ENABLED) && !Configuration.getBoolean(PropertyKey.TEST_MODE)) { - getExecutorService().submit(new HeartbeatThread(HeartbeatContext.MASTER_UPDATE_CHECK, - new UpdateChecker(this), - () -> new FixedIntervalSupplier( - Configuration.getMs(PropertyKey.MASTER_UPDATE_CHECK_INTERVAL)), - Configuration.global(), mMasterContext.getUserState())); + HeartbeatThreadManager.submit(getExecutorService(), + new HeartbeatThread(HeartbeatContext.MASTER_UPDATE_CHECK, + new UpdateChecker(this), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_UPDATE_CHECK_INTERVAL)), + Configuration.global(), mMasterContext.getUserState())); } } else { LOG.info("Detected existing cluster ID {}", mState.getClusterID()); @@ -374,11 +376,12 @@ public void start(Boolean isPrimary) throws IOException { RetryHandlingMetaMasterMasterClient metaMasterClient = new RetryHandlingMetaMasterMasterClient(MasterClientContext .newBuilder(ClientContext.create(Configuration.global())).build()); - getExecutorService().submit(new HeartbeatThread(HeartbeatContext.META_MASTER_SYNC, - new MetaMasterSync(mMasterAddress, metaMasterClient), - () -> new FixedIntervalSupplier( - Configuration.getMs(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL)), - Configuration.global(), mMasterContext.getUserState())); + HeartbeatThreadManager.submit(getExecutorService(), + new HeartbeatThread(HeartbeatContext.META_MASTER_SYNC, + new MetaMasterSync(mMasterAddress, metaMasterClient), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.MASTER_STANDBY_HEARTBEAT_INTERVAL)), + Configuration.global(), mMasterContext.getUserState())); LOG.info("Standby master with address {} starts sending heartbeat to leader master.", mMasterAddress); } diff --git a/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java b/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java index 3ccbb8c7aba1..5837a1c296d4 100644 --- a/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java +++ b/core/server/master/src/main/java/alluxio/master/metrics/DefaultMetricsMaster.java @@ -22,6 +22,7 @@ import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.master.CoreMaster; import alluxio.master.CoreMasterContext; import alluxio.master.journal.NoopJournaled; @@ -179,7 +180,7 @@ public void start(Boolean isLeader) throws IOException { mMetricsStore.initMetricKeys(); mMetricsStore.clear(); if (isLeader) { - getExecutorService().submit(new HeartbeatThread( + HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread( HeartbeatContext.MASTER_CLUSTER_METRICS_UPDATER, new ClusterMetricsUpdater(), () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_CLUSTER_METRICS_UPDATE_INTERVAL)), diff --git a/core/server/master/src/main/java/alluxio/master/throttle/DefaultThrottleMaster.java b/core/server/master/src/main/java/alluxio/master/throttle/DefaultThrottleMaster.java index ef5eee6f489c..766c8666a6e9 100644 --- a/core/server/master/src/main/java/alluxio/master/throttle/DefaultThrottleMaster.java +++ b/core/server/master/src/main/java/alluxio/master/throttle/DefaultThrottleMaster.java @@ -23,6 +23,7 @@ import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.master.AbstractMaster; import alluxio.master.MasterContext; import alluxio.master.MasterProcess; @@ -108,7 +109,7 @@ public void start(Boolean isLeader) throws IOException { return; } LOG.info("Starting {}", getName()); - mThrottleService = getExecutorService().submit( + mThrottleService = HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.MASTER_THROTTLE, mThrottleExecutor, () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.MASTER_THROTTLE_HEARTBEAT_INTERVAL)), diff --git a/core/server/proxy/src/main/java/alluxio/proxy/AlluxioProxyProcess.java b/core/server/proxy/src/main/java/alluxio/proxy/AlluxioProxyProcess.java index 217feb9befb8..69bb035a6570 100644 --- a/core/server/proxy/src/main/java/alluxio/proxy/AlluxioProxyProcess.java +++ b/core/server/proxy/src/main/java/alluxio/proxy/AlluxioProxyProcess.java @@ -19,6 +19,7 @@ import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatThread; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.master.MasterClientContext; import alluxio.util.CommonUtils; import alluxio.util.ThreadFactoryUtils; @@ -104,10 +105,11 @@ public void start() throws Exception { MasterClientContext context = MasterClientContext.newBuilder(ClientContext.create()).build(); mMasterSync = new ProxyMasterSync( Address.fromProto(proxyAddress), context, mStartTimeMs); - mPool.submit(new HeartbeatThread(HeartbeatContext.PROXY_META_MASTER_SYNC, mMasterSync, - () -> new FixedIntervalSupplier( - Configuration.getMs(PropertyKey.PROXY_MASTER_HEARTBEAT_INTERVAL)), - Configuration.global(), context.getUserState())); + HeartbeatThreadManager.submit(mPool, + new HeartbeatThread(HeartbeatContext.PROXY_META_MASTER_SYNC, mMasterSync, + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.PROXY_MASTER_HEARTBEAT_INTERVAL)), + Configuration.global(), context.getUserState())); mLatch.await(); } @@ -122,6 +124,7 @@ public void stop() throws Exception { mMasterSync.close(); } if (mPool != null) { + HeartbeatThreadManager.unregister(mPool); mPool.shutdownNow(); mPool = null; } diff --git a/core/server/worker/src/main/java/alluxio/worker/AlluxioWorkerRestServiceHandler.java b/core/server/worker/src/main/java/alluxio/worker/AlluxioWorkerRestServiceHandler.java index 69e367b21fc6..35709f4c227a 100644 --- a/core/server/worker/src/main/java/alluxio/worker/AlluxioWorkerRestServiceHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/AlluxioWorkerRestServiceHandler.java @@ -44,6 +44,8 @@ import alluxio.web.WorkerWebServer; import alluxio.wire.AlluxioWorkerInfo; import alluxio.wire.Capacity; +import alluxio.wire.HeartbeatThreadInfo; +import alluxio.wire.WebUIHeartbeatThreads; import alluxio.wire.WorkerWebUIBlockInfo; import alluxio.wire.WorkerWebUIConfiguration; import alluxio.wire.WorkerWebUIInit; @@ -118,6 +120,7 @@ public final class AlluxioWorkerRestServiceHandler { public static final String WEBUI_BLOCKINFO = "webui_blockinfo"; public static final String WEBUI_METRICS = "webui_metrics"; public static final String WEBUI_CONFIG = "webui_config"; + public static final String WEBUI_HEARTBEAT_THREADS = "webui_heartbeat_threads"; // queries public static final String QUERY_RAW_CONFIGURATION = "raw_configuration"; @@ -620,6 +623,25 @@ public Response getWebUIConfiguration() { }, Configuration.global()); } + /** + * Gets Web UI heartbeat threads. + * + * @return the response object + */ + @GET + @Path(WEBUI_HEARTBEAT_THREADS) + public Response getWebUIHeartbeatThreads() { + return RestUtils.call(() -> { + WebUIHeartbeatThreads response = new WebUIHeartbeatThreads(); + + Map heartbeatThreads = mBlockWorker.getHeartbeatThreads(); + + response.setHeartbeatThreadInfos(heartbeatThreads); + + return response; + }, Configuration.global()); + } + private Capacity getCapacityInternal() { return new Capacity().setTotal(mStoreMeta.getCapacityBytes()) .setUsed(mStoreMeta.getUsedBytes()); diff --git a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java index 513a8664f9cf..9657fadb51f7 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java @@ -42,6 +42,7 @@ import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.metrics.MetricInfo; import alluxio.metrics.MetricKey; import alluxio.metrics.MetricsSystem; @@ -225,8 +226,8 @@ public void start(WorkerNetAddress address) throws IOException { // Setup PinListSyncer PinListSync pinListSync = mResourceCloser.register( new PinListSync(this, mFileSystemMasterClient)); - getExecutorService() - .submit(new HeartbeatThread(HeartbeatContext.WORKER_PIN_LIST_SYNC, pinListSync, + HeartbeatThreadManager.submit(getExecutorService(), + new HeartbeatThread(HeartbeatContext.WORKER_PIN_LIST_SYNC, pinListSync, () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS)), Configuration.global(), ServerUserState.global())); @@ -239,8 +240,8 @@ public void start(WorkerNetAddress address) throws IOException { // Setup storage checker if (Configuration.getBoolean(PropertyKey.WORKER_STORAGE_CHECKER_ENABLED)) { StorageChecker storageChecker = mResourceCloser.register(new StorageChecker()); - getExecutorService() - .submit(new HeartbeatThread(HeartbeatContext.WORKER_STORAGE_HEALTH, storageChecker, + HeartbeatThreadManager.submit(getExecutorService(), + new HeartbeatThread(HeartbeatContext.WORKER_STORAGE_HEALTH, storageChecker, () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS)), Configuration.global(), ServerUserState.global())); @@ -255,8 +256,8 @@ public void start(WorkerNetAddress address) throws IOException { protected void setupBlockMasterSync() throws IOException { BlockMasterSync blockMasterSync = mResourceCloser .register(new BlockMasterSync(this, mWorkerId, mAddress, mBlockMasterClientPool)); - getExecutorService() - .submit(new HeartbeatThread(HeartbeatContext.WORKER_BLOCK_SYNC, blockMasterSync, + HeartbeatThreadManager.submit(getExecutorService(), + new HeartbeatThread(HeartbeatContext.WORKER_BLOCK_SYNC, blockMasterSync, () -> new FixedIntervalSupplier( Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS)), Configuration.global(), ServerUserState.global())); diff --git a/job/server/src/main/java/alluxio/master/AlluxioJobMasterRestServiceHandler.java b/job/server/src/main/java/alluxio/master/AlluxioJobMasterRestServiceHandler.java index c7c23ce95b93..58267e7ba08a 100644 --- a/job/server/src/main/java/alluxio/master/AlluxioJobMasterRestServiceHandler.java +++ b/job/server/src/main/java/alluxio/master/AlluxioJobMasterRestServiceHandler.java @@ -15,9 +15,12 @@ import alluxio.RuntimeConstants; import alluxio.conf.Configuration; import alluxio.conf.ConfigurationValueOptions; +import alluxio.conf.PropertyKey; import alluxio.util.LogUtils; import alluxio.web.JobMasterWebServer; import alluxio.wire.AlluxioJobMasterInfo; +import alluxio.wire.HeartbeatThreadInfo; +import alluxio.wire.WebUIHeartbeatThreads; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -55,6 +58,8 @@ public final class AlluxioJobMasterRestServiceHandler { // queries public static final String QUERY_RAW_CONFIGURATION = "raw_configuration"; + public static final String WEBUI_HEARTBEAT_THREADS = "webui_heartbeat_threads"; + private final AlluxioJobMasterProcess mJobMaster; /** @@ -110,6 +115,27 @@ public Response logLevel(@QueryParam(LOG_ARGUMENT_NAME) final String logName, Configuration.global()); } + /** + * Gets Web UI heartbeat threads. + * + * @return the response object + */ + @GET + @Path(WEBUI_HEARTBEAT_THREADS) + public Response getWebUIHeartbeatThreads() { + return RestUtils.call(() -> { + WebUIHeartbeatThreads response = new WebUIHeartbeatThreads(); + + response.setDebug(Configuration.getBoolean(PropertyKey.DEBUG)); + Map heartbeatThreads = + mJobMaster.getJobMaster().getHeartbeatThreads(); + + response.setHeartbeatThreadInfos(heartbeatThreads); + + return response; + }, Configuration.global()); + } + private Map getConfigurationInternal(boolean raw) { if (raw) { return Configuration.toMap(ConfigurationValueOptions.defaults().useRawValue(raw)); diff --git a/job/server/src/main/java/alluxio/master/job/JobMaster.java b/job/server/src/main/java/alluxio/master/job/JobMaster.java index 7e0de7c731bb..ce044444049b 100644 --- a/job/server/src/main/java/alluxio/master/job/JobMaster.java +++ b/job/server/src/main/java/alluxio/master/job/JobMaster.java @@ -40,6 +40,7 @@ import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatExecutor; import alluxio.heartbeat.HeartbeatThread; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.job.CmdConfig; import alluxio.job.JobConfig; import alluxio.job.JobServerContext; @@ -247,29 +248,20 @@ public void start(Boolean isLeader) throws IOException { } if (isLeader) { LOG.info("Starting job master as primary"); - getExecutorService() - .submit(new HeartbeatThread(HeartbeatContext.JOB_MASTER_LOST_WORKER_DETECTION, - new LostWorkerDetectionHeartbeatExecutor(), - () -> new FixedIntervalSupplier( - Configuration.getMs(PropertyKey.JOB_MASTER_LOST_WORKER_INTERVAL)), - Configuration.global(), mMasterContext.getUserState())); - getExecutorService() - .submit(new HeartbeatThread(HeartbeatContext.JOB_MASTER_LOST_MASTER_DETECTION, - new LostMasterDetectionHeartbeatExecutor(), - () -> new FixedIntervalSupplier( - Configuration.getMs(PropertyKey.JOB_MASTER_LOST_MASTER_INTERVAL)), - Configuration.global(), mMasterContext.getUserState())); - /** - * The audit logger will be running all the time, and an operation checks whether - * to enable audit logs in {@link #createAuditContext}. So audit log can be turned on/off - * at runtime by updating the property key. - */ - mAsyncAuditLogWriter = new AsyncUserAccessAuditLogWriter("JOB_MASTER_AUDIT_LOG"); - mAsyncAuditLogWriter.start(); - MetricsSystem.registerGaugeIfAbsent( - MetricKey.MASTER_AUDIT_LOG_ENTRIES_SIZE.getName(), - () -> mAsyncAuditLogWriter != null - ? mAsyncAuditLogWriter.getAuditLogEntriesSize() : -1); + HeartbeatThreadManager + .submit(getExecutorService(), + new HeartbeatThread(HeartbeatContext.JOB_MASTER_LOST_WORKER_DETECTION, + new LostWorkerDetectionHeartbeatExecutor(), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.JOB_MASTER_LOST_WORKER_INTERVAL)), + Configuration.global(), mMasterContext.getUserState())); + HeartbeatThreadManager + .submit(getExecutorService(), + new HeartbeatThread(HeartbeatContext.JOB_MASTER_LOST_MASTER_DETECTION, + new LostMasterDetectionHeartbeatExecutor(), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.JOB_MASTER_LOST_MASTER_INTERVAL)), + Configuration.global(), mMasterContext.getUserState())); } else { LOG.info("Starting job master as standby"); if (ConfigurationUtils.isHaMode(Configuration.global())) { @@ -277,15 +269,23 @@ public void start(Boolean isLeader) throws IOException { RetryHandlingJobMasterMasterClient jobMasterClient = new RetryHandlingJobMasterMasterClient(JobMasterClientContext .newBuilder(ClientContext.create(Configuration.global())).build()); - getExecutorService().submit(new HeartbeatThread(HeartbeatContext.JOB_MASTER_SYNC, - new JobMasterSync(mJobMasterAddress, jobMasterClient), - () -> new FixedIntervalSupplier( - Configuration.getMs(PropertyKey.JOB_MASTER_MASTER_HEARTBEAT_INTERVAL)), - Configuration.global(), mMasterContext.getUserState())); + HeartbeatThreadManager + .submit(getExecutorService(), + new HeartbeatThread(HeartbeatContext.JOB_MASTER_SYNC, + new JobMasterSync(mJobMasterAddress, jobMasterClient), + () -> new FixedIntervalSupplier( + Configuration.getMs(PropertyKey.JOB_MASTER_MASTER_HEARTBEAT_INTERVAL)), + Configuration.global(), mMasterContext.getUserState())); LOG.info("Standby job master with address {} starts sending heartbeat to the primary.", mJobMasterAddress); } } + mAsyncAuditLogWriter = new AsyncUserAccessAuditLogWriter("JOB_MASTER_AUDIT_LOG"); + mAsyncAuditLogWriter.start(); + MetricsSystem.registerGaugeIfAbsent( + MetricKey.MASTER_AUDIT_LOG_ENTRIES_SIZE.getName(), + () -> mAsyncAuditLogWriter != null + ? mAsyncAuditLogWriter.getAuditLogEntriesSize() : -1); } @Override diff --git a/job/server/src/main/java/alluxio/worker/AlluxioJobWorkerProcess.java b/job/server/src/main/java/alluxio/worker/AlluxioJobWorkerProcess.java index e311d1998d4c..17d7f21f31a5 100644 --- a/job/server/src/main/java/alluxio/worker/AlluxioJobWorkerProcess.java +++ b/job/server/src/main/java/alluxio/worker/AlluxioJobWorkerProcess.java @@ -20,6 +20,7 @@ import alluxio.grpc.GrpcServerAddress; import alluxio.grpc.GrpcServerBuilder; import alluxio.grpc.GrpcService; +import alluxio.master.job.JobMaster; import alluxio.underfs.JobUfsManager; import alluxio.underfs.UfsManager; import alluxio.util.CommonUtils; @@ -232,6 +233,13 @@ public WorkerNetAddress getAddress() { .setWebPort(Configuration.getInt(PropertyKey.JOB_WORKER_WEB_PORT)); } + /** + * @return the {@link JobMaster} for this process + */ + public JobWorker getJobWorker() { + return mJobWorker; + } + private void startWorkers() throws Exception { mJobWorker.start(getAddress()); } diff --git a/job/server/src/main/java/alluxio/worker/AlluxioJobWorkerRestServiceHandler.java b/job/server/src/main/java/alluxio/worker/AlluxioJobWorkerRestServiceHandler.java index d765fac76e6d..0c53eb7cc561 100644 --- a/job/server/src/main/java/alluxio/worker/AlluxioJobWorkerRestServiceHandler.java +++ b/job/server/src/main/java/alluxio/worker/AlluxioJobWorkerRestServiceHandler.java @@ -18,6 +18,8 @@ import alluxio.util.LogUtils; import alluxio.web.JobWorkerWebServer; import alluxio.wire.AlluxioJobWorkerInfo; +import alluxio.wire.HeartbeatThreadInfo; +import alluxio.wire.WebUIHeartbeatThreads; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -57,14 +59,15 @@ public final class AlluxioJobWorkerRestServiceHandler { // queries public static final String QUERY_RAW_CONFIGURATION = "raw_configuration"; + public static final String WEBUI_HEARTBEAT_THREADS = "webui_heartbeat_threads"; - private final JobWorkerProcess mJobWorker; + private final AlluxioJobWorkerProcess mJobWorker; /** * @param context context for the servlet */ public AlluxioJobWorkerRestServiceHandler(@Context ServletContext context) { - mJobWorker = (JobWorkerProcess) context + mJobWorker = (AlluxioJobWorkerProcess) context .getAttribute(JobWorkerWebServer.ALLUXIO_JOB_WORKER_SERVLET_RESOURCE_KEY); } @@ -112,6 +115,26 @@ public Response logLevel(@QueryParam(LOG_ARGUMENT_NAME) final String logName, Configuration.global()); } + /** + * Gets Web UI heartbeat threads. + * + * @return the response object + */ + @GET + @Path(WEBUI_HEARTBEAT_THREADS) + public Response getWebUIHeartbeatThreads() { + return RestUtils.call(() -> { + WebUIHeartbeatThreads response = new WebUIHeartbeatThreads(); + + Map heartbeatThreads = + mJobWorker.getJobWorker().getHeartbeatThreads(); + + response.setHeartbeatThreadInfos(heartbeatThreads); + + return response; + }, Configuration.global()); + } + private Map getConfigurationInternal(boolean raw) { Set> properties = Configuration.toMap().entrySet(); SortedMap configuration = new TreeMap<>(); diff --git a/job/server/src/main/java/alluxio/worker/JobWorker.java b/job/server/src/main/java/alluxio/worker/JobWorker.java index 29a6cc054772..54a00bd68074 100644 --- a/job/server/src/main/java/alluxio/worker/JobWorker.java +++ b/job/server/src/main/java/alluxio/worker/JobWorker.java @@ -24,6 +24,7 @@ import alluxio.heartbeat.FixedIntervalSupplier; import alluxio.heartbeat.HeartbeatContext; import alluxio.heartbeat.HeartbeatThread; +import alluxio.heartbeat.HeartbeatThreadManager; import alluxio.job.JobServerContext; import alluxio.metrics.MetricsSystem; import alluxio.security.user.ServerUserState; @@ -104,7 +105,7 @@ public void start(WorkerNetAddress address) throws IOException { new TaskExecutorManager(Configuration.getInt(PropertyKey.JOB_WORKER_THREADPOOL_SIZE), address); - mCommandHandlingService = getExecutorService().submit( + mCommandHandlingService = HeartbeatThreadManager.submit(getExecutorService(), new HeartbeatThread(HeartbeatContext.JOB_WORKER_COMMAND_HANDLING, new CommandHandlingExecutor(mJobServerContext, taskExecutorManager, mJobMasterClient, address),