Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support collect and display the heartbeat thread status #17473

Open
wants to merge 7 commits into
base: master-2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/common/src/main/java/alluxio/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import alluxio.grpc.GrpcService;
import alluxio.grpc.ServiceType;
import alluxio.heartbeat.HeartbeatThreadManager;
import alluxio.wire.HeartbeatThreadInfo;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -57,4 +59,12 @@ public interface Server<T> {
* Closes the server.
*/
void close() throws IOException;

/**
* Gets heartbeat threads info.
* @return the heartbeat threads info
*/
default Map<String, HeartbeatThreadInfo> getHeartbeatThreads() {
return HeartbeatThreadManager.getHeartbeatThreads();
}
}
61 changes: 55 additions & 6 deletions core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@

package alluxio.heartbeat;

import alluxio.Constants;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.conf.ReconfigurableRegistry;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.security.user.UserState;
import alluxio.util.CommonUtils;
import alluxio.util.SecurityUtils;
import alluxio.wire.HeartbeatThreadInfo;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -25,6 +29,7 @@

import java.io.IOException;
import java.time.Clock;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;

Expand All @@ -35,13 +40,19 @@
@NotThreadSafe
public final class HeartbeatThread implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatThread.class);
private static final String DATE_PATTERN =
Configuration.getString(PropertyKey.USER_DATE_FORMAT_PATTERN);

private final String mThreadName;
private final HeartbeatExecutor mExecutor;
private final UserState mUserState;
private HeartbeatTimer mTimer;
private AlluxioConfiguration mConfiguration;
private Status mStatus;
private volatile Status mStatus;
private volatile String mPreviousReport;
private AtomicLong mCounter = new AtomicLong(0L);
private volatile long mStartTickTime;
private volatile long mStartHeartbeatTime;

/**
* @param executorName the executor name defined in {@link HeartbeatContext}
Expand Down Expand Up @@ -129,7 +140,6 @@ public HeartbeatThread(String executorName, HeartbeatExecutor executor,

@Override
public void run() {
long counter = 0L;
try {
if (SecurityUtils.isSecurityEnabled(mConfiguration)
&& AuthenticatedClientUser.get(mConfiguration) == null) {
Expand All @@ -146,10 +156,22 @@ public void run() {
while (!Thread.interrupted()) {
// TODO(peis): Fix this. The current implementation consumes one thread even when ticking.
mStatus = Status.WAITING;
mStartTickTime = System.currentTimeMillis();
jiacheliu3 marked this conversation as resolved.
Show resolved Hide resolved
long limitTime = mTimer.tick();
mStatus = Status.RUNNING;
LOG.debug("{} #{} will run limited in {}s", mThreadName, counter++, limitTime / 1000);
mStartHeartbeatTime = System.currentTimeMillis();
jiacheliu3 marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("{} #{} will run limited in {}s", mThreadName, mCounter.get(),
limitTime / Constants.SECOND_MS);
mExecutor.heartbeat(limitTime);
long endHeartbeatTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

mPreviousReport = String.format("#%d [%s - %s - %s] ticked(s) %d, run(s) %d.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't create report on run(), you may update the states but generate reports on demand

mCounter.get(),
CommonUtils.convertMsToDate(mStartTickTime, DATE_PATTERN),
CommonUtils.convertMsToDate(mStartHeartbeatTime, DATE_PATTERN),
CommonUtils.convertMsToDate(endHeartbeatTime, DATE_PATTERN),
(mStartHeartbeatTime - mStartTickTime) / Constants.SECOND_MS,
(endHeartbeatTime - mStartHeartbeatTime) / Constants.SECOND_MS);
updateState();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every tick you are resetting to zero? looks like a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it looks need a structure to store the previous and current state

}
} catch (InterruptedException e) {
// Allow thread to exit.
Expand All @@ -162,11 +184,38 @@ public void run() {
}
}

private synchronized void updateState() {
mCounter.incrementAndGet();
mStartTickTime = 0L;
mStartHeartbeatTime = 0L;
Comment on lines +187 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why reset to 0 every time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 stand for that it is not start to heartbeat this period.

}

/**
* @return the status of current heartbeat thread
* @return the thread name
*/
public Status getStatus() {
return mStatus;
public String getThreadName() {
return mThreadName;
}

/**
* @return the {@link HeartbeatThreadInfo} for the heartbeat thread
*/
public synchronized HeartbeatThreadInfo toHeartbeatThreadInfo() {

HeartbeatThreadInfo heartbeatThreadInfo = new HeartbeatThreadInfo()
.setThreadName(mThreadName)
.setCount(mCounter.get())
.setStatus(mStatus)
.setPreviousReport(mPreviousReport);
if (mStartTickTime != 0) {
heartbeatThreadInfo.setStartTickTime(
CommonUtils.convertMsToDate(mStartTickTime, DATE_PATTERN));
}
if (mStartHeartbeatTime != 0) {
heartbeatThreadInfo.setStartHeartbeatTime(
CommonUtils.convertMsToDate(mStartHeartbeatTime, DATE_PATTERN));
}
return heartbeatThreadInfo;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.heartbeat;

import alluxio.wire.HeartbeatThreadInfo;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
* Manager for heartbeat thread.
*/
public class HeartbeatThreadManager {
private static final Map<String, HeartbeatThread> HEARTBEAT_THREAD_MAP
= new ConcurrentHashMap<>();
private static final Map<Object, List<HeartbeatThread>> HEARTBEAT_THREAD_INDEX_MAP
= new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you use synchronized methods so no need for ConcurrentHashMap here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Add a heartbeat thread.
*
* @param key the name of heartbeat thread
* @param heartbeatThread the heartbeat thread
*/
public static synchronized void register(Object key, HeartbeatThread heartbeatThread) {
List<HeartbeatThread> list = HEARTBEAT_THREAD_INDEX_MAP.get(key);
if (list == null) {
list = new LinkedList<>();
HEARTBEAT_THREAD_INDEX_MAP.put(key, list);
}
HEARTBEAT_THREAD_MAP.put(heartbeatThread.getThreadName(), heartbeatThread);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map.computeIfAbsent(key, (k) -> new ArrayList<>());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/**
* Remove the heartbeat threads related to the given key.
* @param key the key of heartbeat thread
*/
public static synchronized void unregister(Object key) {
List<HeartbeatThread> heartbeatThreads = HEARTBEAT_THREAD_INDEX_MAP.remove(key);
if (heartbeatThreads != null) {
for (HeartbeatThread heartbeatThread : heartbeatThreads) {
HEARTBEAT_THREAD_MAP.remove(heartbeatThread.getThreadName());
}
}
}

/**
* Submits a heartbeat thread for execution and returns a Future representing that task.
* Meanwhile, register the heartbeat thread to HeartbeatThreadManager to monitor its status.
* @param executorService the executor service for executing heartbeat threads
* @param heartbeatThread the heartbeat thread
* @return a Future representing pending completion of the task
*/
public static Future<?> submit(ExecutorService executorService,
HeartbeatThread heartbeatThread) {
HeartbeatThreadManager.register(executorService, heartbeatThread);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think it's a good idea to use a thread pool as key...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, what can be the better key? Thread name?

return executorService.submit(heartbeatThread);
}

/**
* Gets heartbeat threads info.
* @return the heartbeat threads info
*/
public static synchronized Map<String, HeartbeatThreadInfo> getHeartbeatThreads() {
SortedMap<String, HeartbeatThreadInfo> heartbeatThreads = new TreeMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double check if the thread is still alive, and make sure this ref is removed if the thread is already dead

for (HeartbeatThread heartbeatThread : HEARTBEAT_THREAD_MAP.values()) {
heartbeatThreads.put(heartbeatThread.getThreadName(),
heartbeatThread.toHeartbeatThreadInfo());
}
return heartbeatThreads;
}
}
Loading