Skip to content

Commit

Permalink
Merge pull request #512 from TheFatRatre/virtual-thread
Browse files Browse the repository at this point in the history
[ISSUE #483] Virtual thread compatible
  • Loading branch information
yanhom1314 authored Dec 4, 2024
2 parents c228531 + 64afeae commit 31643cb
Show file tree
Hide file tree
Showing 42 changed files with 692 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dynamictp.common.entity.NotifyPlatform;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.entity.TpExecutorProps;
import org.dromara.dynamictp.common.entity.TpMainFields;
import org.dromara.dynamictp.common.event.CustomContextRefreshedEvent;
Expand Down Expand Up @@ -104,20 +104,20 @@ public Map<String, ExecutorWrapper> getExecutorWrappers() {
}

/**
* Get multi thread pool stats.
* Get multi executor stats.
*
* @return thead pools stats
* @return Executors stats
*/
@Override
public List<ThreadPoolStats> getMultiPoolStats() {
public List<ExecutorStats> getMultiExecutorStats() {
val executorWrappers = getExecutorWrappers();
if (MapUtils.isEmpty(executorWrappers)) {
return Collections.emptyList();
}

List<ThreadPoolStats> threadPoolStats = Lists.newArrayList();
executorWrappers.forEach((k, v) -> threadPoolStats.add(ExecutorConverter.toMetrics(v)));
return threadPoolStats;
List<ExecutorStats> executorStats = Lists.newArrayList();
executorWrappers.forEach((k, v) -> executorStats.add(ExecutorConverter.toMetrics(v)));
return executorStats;
}

public void refresh(List<TpExecutorProps> propsList, List<NotifyPlatform> platforms) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void doCollect(DtpProperties dtpProperties) {
if (MapUtils.isEmpty(handlerMap)) {
return;
}
handlerMap.forEach((k, v) -> v.getMultiPoolStats().forEach(ps ->
handlerMap.forEach((k, v) -> v.getMultiExecutorStats().forEach(ps ->
CollectorHandler.getInstance().collect(ps, dtpProperties.getCollectorTypes())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,9 @@ private DynamicTpConst() { }
public static final String TRUE_STR = "true";

public static final String FALSE_STR = "false";

/**
* jre
*/
public static final String THREAD_PER_TASK_EXECUTOR = "java.util.concurrent.ThreadPerTaskExecutor";
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,18 @@ public enum JreEnum {

JAVA_18,

JAVA_19;
JAVA_19,

JAVA_20,

JAVA_21;

private static final JreEnum VERSION = getJre();

public static final String DEFAULT_JAVA_VERSION = "1.8";

private static final int JRE_VERSION_OFFSET = 8;

/**
* get current JRE version
*
Expand All @@ -69,6 +75,14 @@ public static JreEnum currentVersion() {
return VERSION;
}

/**
* get current JRE integer version
* @return JRE integer version
*/
public static int currentIntVersion() {
return JreEnum.currentVersion().ordinal() + JRE_VERSION_OFFSET;
}

/**
* is current version
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,24 @@


/**
* ThreadPoolStats related
* ExecutorStats related
*
* @author yanhom
* @since 1.0.0
**/
@Data
@EqualsAndHashCode(callSuper = true)
public class ThreadPoolStats extends Metrics {
public class ExecutorStats extends Metrics {

/**
* 线程池名字
* 执行器名字
*/
private String poolName;
private String executorName;

/**
* 线程池别名
* 执行器别名
*/
private String poolAliasName;
private String executorAliasName;

/**
* 核心线程数
Expand All @@ -51,6 +51,31 @@ public class ThreadPoolStats extends Metrics {
*/
private int maximumPoolSize;

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 是否为DtpExecutor
*/
private boolean dynamic;

/**
* 是否为虚拟线程执行器
*/
private boolean isVirtualExecutor;

/**
* 空闲时间 (ms)
*/
Expand Down Expand Up @@ -81,16 +106,6 @@ public class ThreadPoolStats extends Metrics {
*/
private int queueRemainingCapacity;

/**
* 正在执行任务的活跃线程大致总数
*/
private int activeCount;

/**
* 大致任务总数
*/
private long taskCount;

/**
* 已执行完成的大致任务总数
*/
Expand Down Expand Up @@ -121,16 +136,6 @@ public class ThreadPoolStats extends Metrics {
*/
private String rejectHandlerName;

/**
* 是否DtpExecutor线程池
*/
private boolean dynamic;

/**
* 执行超时任务数量
*/
private long runTimeoutCount;

/**
* 在队列等待超时任务数量
*/
Expand Down Expand Up @@ -185,4 +190,5 @@ public class ThreadPoolStats extends Metrics {
* 满足99.9%的任务执行所需的最低耗时
*/
private double tp999;

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@
**/
@Data
public class Metrics {

}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ private static void doRefreshCommon(ExecutorWrapper executorWrapper, DtpExecutor
if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {
executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());
}

ExecutorAdapter<?> executor = executorWrapper.getExecutor();
// update reject handler
String currentRejectHandlerType = executor.getRejectHandlerType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.dromara.dynamictp.core.aware;

import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;

import java.util.Collections;
import java.util.List;
Expand All @@ -31,20 +31,20 @@
public interface MetricsAware extends DtpAware {

/**
* Get thread pool stats.
* Get executors stats.
*
* @return the thread pool stats
* @return the executors stats
*/
default ThreadPoolStats getPoolStats() {
default ExecutorStats getExecutorStats() {
return null;
}

/**
* Get multi thread pool stats.
* Get multi executors stats.
*
* @return thead pools stats
* @return executors stats
*/
default List<ThreadPoolStats> getMultiPoolStats() {
default List<ExecutorStats> getMultiExecutorStats() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.dromara.dynamictp.core.converter;

import lombok.val;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.entity.TpMainFields;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.monitor.PerformanceProvider;
Expand Down Expand Up @@ -53,51 +53,53 @@ public static TpMainFields toMainFields(ExecutorWrapper executorWrapper) {
return mainFields;
}

public static ThreadPoolStats toMetrics(ExecutorWrapper wrapper) {
public static ExecutorStats toMetrics(ExecutorWrapper wrapper) {
ExecutorAdapter<?> executor = wrapper.getExecutor();
if (executor == null) {
return null;
}
ThreadPoolStatProvider provider = wrapper.getThreadPoolStatProvider();
PerformanceProvider performanceProvider = provider.getPerformanceProvider();
val performanceSnapshot = performanceProvider.getSnapshotAndReset();
ThreadPoolStats poolStats = convertCommon(executor);
poolStats.setPoolName(wrapper.getThreadPoolName());
poolStats.setPoolAliasName(wrapper.getThreadPoolAliasName());
poolStats.setRunTimeoutCount(provider.getRunTimeoutCount());
poolStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
poolStats.setRejectCount(provider.getRejectedTaskCount());
poolStats.setDynamic(executor instanceof DtpExecutor);
ExecutorStats executorStats = convertCommon(executor);
executorStats.setExecutorName(wrapper.getThreadPoolName());
executorStats.setExecutorAliasName(wrapper.getThreadPoolAliasName());
executorStats.setRunTimeoutCount(provider.getRunTimeoutCount());
executorStats.setQueueTimeoutCount(provider.getQueueTimeoutCount());
executorStats.setRejectCount(provider.getRejectedTaskCount());

poolStats.setTps(performanceSnapshot.getTps());
poolStats.setAvg(performanceSnapshot.getAvg());
poolStats.setMaxRt(performanceSnapshot.getMaxRt());
poolStats.setMinRt(performanceSnapshot.getMinRt());
poolStats.setTp50(performanceSnapshot.getTp50());
poolStats.setTp75(performanceSnapshot.getTp75());
poolStats.setTp90(performanceSnapshot.getTp90());
poolStats.setTp95(performanceSnapshot.getTp95());
poolStats.setTp99(performanceSnapshot.getTp99());
poolStats.setTp999(performanceSnapshot.getTp999());
return poolStats;
executorStats.setVirtualExecutor(wrapper.isVirtualThreadExecutor());

executorStats.setDynamic(executor instanceof DtpExecutor);
executorStats.setTps(performanceSnapshot.getTps());
executorStats.setAvg(performanceSnapshot.getAvg());
executorStats.setMaxRt(performanceSnapshot.getMaxRt());
executorStats.setMinRt(performanceSnapshot.getMinRt());
executorStats.setTp50(performanceSnapshot.getTp50());
executorStats.setTp75(performanceSnapshot.getTp75());
executorStats.setTp90(performanceSnapshot.getTp90());
executorStats.setTp95(performanceSnapshot.getTp95());
executorStats.setTp99(performanceSnapshot.getTp99());
executorStats.setTp999(performanceSnapshot.getTp999());
return executorStats;
}

private static ThreadPoolStats convertCommon(ExecutorAdapter<?> executor) {
ThreadPoolStats poolStats = new ThreadPoolStats();
poolStats.setCorePoolSize(executor.getCorePoolSize());
poolStats.setMaximumPoolSize(executor.getMaximumPoolSize());
poolStats.setPoolSize(executor.getPoolSize());
poolStats.setActiveCount(executor.getActiveCount());
poolStats.setLargestPoolSize(executor.getLargestPoolSize());
poolStats.setQueueType(executor.getQueueType());
poolStats.setQueueCapacity(executor.getQueueCapacity());
poolStats.setQueueSize(executor.getQueueSize());
poolStats.setQueueRemainingCapacity(executor.getQueueRemainingCapacity());
poolStats.setTaskCount(executor.getTaskCount());
poolStats.setCompletedTaskCount(executor.getCompletedTaskCount());
poolStats.setWaitTaskCount(executor.getQueueSize());
poolStats.setRejectHandlerName(executor.getRejectHandlerType());
poolStats.setKeepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS));
return poolStats;
private static ExecutorStats convertCommon(ExecutorAdapter<?> executor) {
ExecutorStats executorStats = new ExecutorStats();
executorStats.setCorePoolSize(executor.getCorePoolSize());
executorStats.setMaximumPoolSize(executor.getMaximumPoolSize());
executorStats.setPoolSize(executor.getPoolSize());
executorStats.setActiveCount(executor.getActiveCount());
executorStats.setLargestPoolSize(executor.getLargestPoolSize());
executorStats.setQueueType(executor.getQueueType());
executorStats.setQueueCapacity(executor.getQueueCapacity());
executorStats.setQueueSize(executor.getQueueSize());
executorStats.setQueueRemainingCapacity(executor.getQueueRemainingCapacity());
executorStats.setTaskCount(executor.getTaskCount());
executorStats.setCompletedTaskCount(executor.getCompletedTaskCount());
executorStats.setWaitTaskCount(executor.getQueueSize());
executorStats.setRejectHandlerName(executor.getRejectHandlerType());
executorStats.setKeepAliveTime(executor.getKeepAliveTime(TimeUnit.MILLISECONDS));
return executorStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
import lombok.Getter;
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
import org.dromara.dynamictp.core.support.proxy.VirtualThreadExecutorProxy;

/**
* ExecutorType related
Expand Down Expand Up @@ -55,7 +56,12 @@ public enum ExecutorType {
/**
* Priority executor type.
*/
PRIORITY("priority", PriorityDtpExecutor.class);
PRIORITY("priority", PriorityDtpExecutor.class),

/**
* Virtual thread executor adapter type.
*/
VIRTUAL("virtual", VirtualThreadExecutorProxy.class);

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dynamictp.common.entity.ThreadPoolStats;
import org.dromara.dynamictp.common.entity.ExecutorStats;
import org.dromara.dynamictp.common.util.ExtensionServiceLoader;
import org.dromara.dynamictp.core.monitor.collector.InternalLogCollector;
import org.dromara.dynamictp.core.monitor.collector.LogCollector;
Expand Down Expand Up @@ -56,14 +56,14 @@ private CollectorHandler() {
COLLECTORS.put(jmxCollector.type(), jmxCollector);
}

public void collect(ThreadPoolStats poolStats, List<String> types) {
if (poolStats == null || CollectionUtils.isEmpty(types)) {
public void collect(ExecutorStats executorStats, List<String> types) {
if (executorStats == null || CollectionUtils.isEmpty(types)) {
return;
}
for (String collectorType : types) {
MetricsCollector collector = COLLECTORS.get(collectorType.toLowerCase());
if (collector != null) {
collector.collect(poolStats);
collector.collect(executorStats);
}
}
}
Expand Down
Loading

0 comments on commit 31643cb

Please sign in to comment.