Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support collect and display the heartbeat thread status #17473

Open
wants to merge 7 commits into
base: master-2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/common/src/main/java/alluxio/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import alluxio.grpc.GrpcService;
import alluxio.grpc.ServiceType;
import alluxio.heartbeat.HeartbeatThreadManager;
import alluxio.wire.HeartbeatThreadInfo;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -57,4 +59,12 @@ public interface Server<T> {
* Closes the server.
*/
void close() throws IOException;

/**
* Gets heartbeat threads info.
* @return the heartbeat threads info
*/
default Map<String, HeartbeatThreadInfo> getHeartbeatThreads() {
return HeartbeatThreadManager.getHeartbeatThreads();
}
}
60 changes: 54 additions & 6 deletions core/common/src/main/java/alluxio/heartbeat/HeartbeatThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@

package alluxio.heartbeat;

import alluxio.Constants;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Reconfigurable;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.conf.ReconfigurableRegistry;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.security.user.UserState;
import alluxio.util.CommonUtils;
import alluxio.util.SecurityUtils;
import alluxio.wire.HeartbeatThreadInfo;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -26,6 +30,7 @@

import java.io.IOException;
import java.time.Clock;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;

Expand All @@ -36,13 +41,19 @@
@NotThreadSafe
public final class HeartbeatThread implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatThread.class);
private static final String DATE_PATTERN =
Configuration.getString(PropertyKey.USER_DATE_FORMAT_PATTERN);

private final String mThreadName;
private final HeartbeatExecutor mExecutor;
private final UserState mUserState;
private HeartbeatTimer mTimer;
private AlluxioConfiguration mConfiguration;
private Status mStatus;
private volatile Status mStatus;
private AtomicLong mCounter = new AtomicLong(0L);
private volatile long mStartTickTime;
private volatile long mStartHeartbeatTime;
private volatile long mEndHeartbeatTime;

/**
* @param executorName the executor name defined in {@link HeartbeatContext}
Expand Down Expand Up @@ -132,7 +143,6 @@ public HeartbeatThread(String executorName, HeartbeatExecutor executor,

@Override
public void run() {
long counter = 0L;
try {
if (SecurityUtils.isSecurityEnabled(mConfiguration)
&& AuthenticatedClientUser.get(mConfiguration) == null) {
Expand All @@ -149,10 +159,15 @@ public void run() {
while (!Thread.interrupted()) {
// TODO(peis): Fix this. The current implementation consumes one thread even when ticking.
mStatus = Status.WAITING;
mStartTickTime = CommonUtils.getCurrentMs();
long limitTime = mTimer.tick();
mStatus = Status.RUNNING;
LOG.debug("{} #{} will run limited in {}s", mThreadName, counter++, limitTime / 1000);
mStartHeartbeatTime = CommonUtils.getCurrentMs();
LOG.debug("{} #{} will run limited in {}s", mThreadName, mCounter.get(),
limitTime / Constants.SECOND_MS);
mExecutor.heartbeat(limitTime);
mEndHeartbeatTime = CommonUtils.getCurrentMs();
updateState();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every tick you are resetting to zero? looks like a bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it looks need a structure to store the previous and current state

}
} catch (InterruptedException e) {
// Allow thread to exit.
Expand All @@ -167,11 +182,44 @@ public void run() {
}
}

private synchronized void updateState() {
mCounter.incrementAndGet();
mStartTickTime = 0L;
mStartHeartbeatTime = 0L;
Comment on lines +187 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why reset to 0 every time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 stand for that it is not start to heartbeat this period.

}

/**
* @return the thread name
*/
public String getThreadName() {
return mThreadName;
}

/**
* @return the status of current heartbeat thread
* @return the {@link HeartbeatThreadInfo} for the heartbeat thread
*/
public Status getStatus() {
return mStatus;
public synchronized HeartbeatThreadInfo toHeartbeatThreadInfo() {
String previousReport = String.format("#%d [%s - %s - %s] ticked(s) %d, run(s) %d.",
mCounter.get(),
CommonUtils.convertMsToDate(mStartTickTime, DATE_PATTERN),
CommonUtils.convertMsToDate(mStartHeartbeatTime, DATE_PATTERN),
CommonUtils.convertMsToDate(mEndHeartbeatTime, DATE_PATTERN),
(mStartHeartbeatTime - mStartTickTime) / Constants.SECOND_MS,
(mEndHeartbeatTime - mStartHeartbeatTime) / Constants.SECOND_MS);
HeartbeatThreadInfo heartbeatThreadInfo = new HeartbeatThreadInfo()
.setThreadName(mThreadName)
.setCount(mCounter.get())
.setStatus(mStatus)
.setPreviousReport(previousReport);
if (mStartTickTime != 0) {
heartbeatThreadInfo.setStartTickTime(
CommonUtils.convertMsToDate(mStartTickTime, DATE_PATTERN));
}
if (mStartHeartbeatTime != 0) {
heartbeatThreadInfo.setStartHeartbeatTime(
CommonUtils.convertMsToDate(mStartHeartbeatTime, DATE_PATTERN));
}
return heartbeatThreadInfo;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.heartbeat;

import alluxio.wire.HeartbeatThreadInfo;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
* Manager for heartbeat thread.
*/
public class HeartbeatThreadManager {
private static final Map<String, HeartbeatThread> HEARTBEAT_THREAD_MAP
= new HashMap<>();
private static final Map<Object, List<HeartbeatThread>> HEARTBEAT_THREAD_INDEX_MAP
= new HashMap<>();

/**
* Add a heartbeat thread.
*
* @param key the name of heartbeat thread
* @param heartbeatThread the heartbeat thread
*/
public static synchronized void register(Object key, HeartbeatThread heartbeatThread) {
List<HeartbeatThread> list =
HEARTBEAT_THREAD_INDEX_MAP.computeIfAbsent(key, (k) -> new LinkedList<>());
list.add(heartbeatThread);
HEARTBEAT_THREAD_MAP.put(heartbeatThread.getThreadName(), heartbeatThread);
}

/**
* Remove the heartbeat threads related to the given key.
* @param key the key of heartbeat thread
*/
public static synchronized void unregister(Object key) {
List<HeartbeatThread> heartbeatThreads = HEARTBEAT_THREAD_INDEX_MAP.remove(key);
if (heartbeatThreads != null) {
for (HeartbeatThread heartbeatThread : heartbeatThreads) {
HEARTBEAT_THREAD_MAP.remove(heartbeatThread.getThreadName());
}
}
}

/**
* Submits a heartbeat thread for execution and returns a Future representing that task.
* Meanwhile, register the heartbeat thread to HeartbeatThreadManager to monitor its status.
* @param executorService the executor service for executing heartbeat threads
* @param heartbeatThread the heartbeat thread
* @return a Future representing pending completion of the task
*/
public static Future<?> submit(ExecutorService executorService,
HeartbeatThread heartbeatThread) {
HeartbeatThreadManager.register(executorService, heartbeatThread);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think it's a good idea to use a thread pool as key...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, what can be the better key? Thread name?

return executorService.submit(heartbeatThread);
}

/**
* Gets heartbeat threads info.
* @return the heartbeat threads info
*/
public static synchronized Map<String, HeartbeatThreadInfo> getHeartbeatThreads() {
SortedMap<String, HeartbeatThreadInfo> heartbeatThreads = new TreeMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double check if the thread is still alive, and make sure this ref is removed if the thread is already dead

for (HeartbeatThread heartbeatThread : HEARTBEAT_THREAD_MAP.values()) {
heartbeatThreads.put(heartbeatThread.getThreadName(),
heartbeatThread.toHeartbeatThreadInfo());
}
return heartbeatThreads;
}
}
148 changes: 148 additions & 0 deletions core/common/src/main/java/alluxio/wire/HeartbeatThreadInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.wire;

import alluxio.annotation.PublicApi;
import alluxio.heartbeat.HeartbeatThread;

import com.google.common.base.MoreObjects;

import java.io.Serializable;
import javax.annotation.concurrent.NotThreadSafe;

/**
* The heartbeat thread information.
*/
@NotThreadSafe
@PublicApi
public class HeartbeatThreadInfo implements Serializable {
private static final long serialVersionUID = -5457846691141843682L;

private String mThreadName = "";
private long mCount;
private HeartbeatThread.Status mStatus;
private String mPreviousReport = "";
private String mStartHeartbeatTime = "";
private String mStartTickTime = "";

/**
* Creates a new instance of {@link HeartbeatThreadInfo}.
*/
public HeartbeatThreadInfo() {}

/**
* @return the thread name
*/
public String getThreadName() {
return mThreadName;
}

/**
* @param threadName set thread name
* @return the heartbeat thread information
*/
public HeartbeatThreadInfo setThreadName(String threadName) {
mThreadName = threadName;
return this;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("threadName", mThreadName)
.add("count", mCount)
.add("status", mStatus)
.add("startHeartbeatTime", mStartHeartbeatTime)
.add("startTickTime", mStartTickTime)
.add("previousReport", mPreviousReport)
.toString();
}

/**
* @param count the current heartbeat count
* @return the heartbeat thread information
*/
public HeartbeatThreadInfo setCount(long count) {
mCount = count;
return this;
}

/**
* @param status the current heartbeat status
* @return the heartbeat thread information
*/
public HeartbeatThreadInfo setStatus(HeartbeatThread.Status status) {
mStatus = status;
return this;
}

/**
* @param previousReport the previous heartbeat report
* @return the heartbeat thread information
*/
public HeartbeatThreadInfo setPreviousReport(String previousReport) {
mPreviousReport = previousReport;
return this;
}

/**
* @return the current heartbeat invoke count
*/
public long getCount() {
return mCount;
}

/**
* @return the heartbeat status
*/
public HeartbeatThread.Status getStatus() {
return mStatus;
}

/**
* @return the report for previous heartbeat
*/
public String getPreviousReport() {
return mPreviousReport;
}

/**
* @param startHeartbeatTime the current heartbeat thread heartbeat time
* @return the heartbeat thread information
*/
public HeartbeatThreadInfo setStartHeartbeatTime(String startHeartbeatTime) {
mStartHeartbeatTime = startHeartbeatTime;
return this;
}

/**
* @return the current heartbeat thread heartbeat time
*/
public String getStartHeartbeatTime() {
return mStartHeartbeatTime;
}

/**
* @param startTickTime the current heartbeat thread tick time
* @return the heartbeat thread information
*/
public HeartbeatThreadInfo setStartTickTime(String startTickTime) {
mStartTickTime = startTickTime;
return this;
}

/**
* @return the current heartbeat thread tick time
*/
public String getStartTickTime() {
return mStartTickTime;
}
}
Loading
Loading