Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Dec 22, 2023
1 parent 75ba4d9 commit 1f36f57
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 84 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public String instance() {
public Object tree(SchedInstancePageRequest request,
@RequestParam(value = "resetSearch", required = false) String resetSearch) {
request.setParent(true);
return page(request, resetSearch);
return queryForPage(request, resetSearch);
}

/**
Expand All @@ -98,7 +98,7 @@ public Object tree(SchedInstancePageRequest request,
public Object flat(SchedInstancePageRequest request,
@RequestParam(value = "resetSearch", required = false) String resetSearch) {
request.setParent(false);
return page(request, resetSearch);
return queryForPage(request, resetSearch);
}

@RequiresPermissions(PERMISSION_INSTANCE)
Expand Down Expand Up @@ -244,7 +244,7 @@ private SearchJobRequest parseTerm(String term) {
return request;
}

private Object page(SchedInstancePageRequest request, String resetSearch) {
private Object queryForPage(SchedInstancePageRequest request, String resetSearch) {
if (StringUtils.isBlank(resetSearch)) {
return PageUtils.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public static void stopThread(Thread thread, long joinMillis, int sleepCount, lo
LOG.warn("Call stop on self thread: {}\n{}", thread.getName(), getStackTrace());
thread.interrupt();
stopThread(thread);
return;
}

// sleep for wait the tread run method block code execute finish
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@
public class BinaryTreePrinter<T> {

public enum Branch {
RECTANGLE, TRIANGLE
/**
* Rectangle
*/
RECTANGLE,
/**
* Triangle
*/
TRIANGLE
}

private final Appendable output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
*
* @author Ponfee
*/
@SuppressWarnings("all")
public enum PrimitiveTypes {

DOUBLE (Double.class , (byte) 0B1_0_0_0_0_0_0_0, (byte) 0B1_0_0_0_0_0_0_0 ),
Expand Down Expand Up @@ -115,7 +116,7 @@ public static <T> Class<T> unwrap(Class<T> type) {
return pt == null ? type : (Class<T>) pt.primitive;
}

private static class Hide {
private static final class Hide {
private static final Map<Class<?>, PrimitiveTypes> PRIMITIVE_OR_WRAPPER_MAPPING = new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
* @author Ponfee
*/
@SuppressWarnings("all")
public enum JobCodeMsg implements CodeMsg {

INVALID_PARAM(400, "Invalid param."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package cn.ponfee.disjob.supervisor.base;

import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable;
import com.google.common.util.concurrent.RateLimiter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -45,6 +46,18 @@ public void testStop() throws InterruptedException {
Thread.sleep(100);
Assertions.assertTrue(Threads.isStopped(thread));
thread.join();

System.out.println("\n\n------------------");
Thread t = new Thread(){
@Override
public void run() {
ThrowingRunnable.doCaught(() -> Thread.sleep(20));
Threads.stopThread(this, 0);
ThrowingRunnable.doCaught(() -> Thread.sleep(20));
}
};
t.start();
Thread.sleep(500);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,6 @@ public void run() {
try {
activePool.doExecute(workerThread, param);
} catch (InterruptedException e) {
LOG.error("Do execute occur thread interrupted.", e);
// discard this execution param
param = null;
// destroy this worker thread
stopWorkerThread(workerThread, true);
throw e;
Expand All @@ -289,8 +286,6 @@ public void run() {
stopWorkerThread(workerThread, true);
} catch (IllegalTaskException e) {
LOG.error(e.getMessage());
// discard the execute param
param = null;
// return this worker thread
idlePool.putFirst(workerThread);
} catch (DuplicateTaskException e) {
Expand Down Expand Up @@ -345,7 +340,7 @@ public WorkerMetrics.ThreadPoolMetrics metrics() {
private boolean returnWorkerThread(WorkerThread workerThread) {
if (activePool.removeThread(workerThread) == null) {
// maybe already removed by other operation
LOG.warn("Return thread failed, because not found: {}", workerThread.getName());
LOG.error("Return thread failed, because not found: {}", workerThread.getName());
return false;
}

Expand Down Expand Up @@ -455,36 +450,6 @@ private static void terminateTask(WorkerThreadPool threadPool, ExecuteTaskParam
}
}

private static void stopInstance(WorkerThreadPool threadPool, ExecuteTaskParam param, Operations ops, String errorMsg) {
if (!param.updateOperation(Operations.TRIGGER, ops)) {
LOG.info("Stop instance conflict: {} | {}", param, ops);
return;
}

LOG.info("Stop instance task: {} | {}", param.getTaskId(), ops);
terminateTask(threadPool, param, ops, ops.toState(), errorMsg);

boolean res = true;
long lockInstanceId = param.getWnstanceId() != null ? param.getWnstanceId() : param.getInstanceId();
try {
synchronized (JobConstants.INSTANCE_LOCK_POOL.intern(lockInstanceId)) {
if (ops == Operations.PAUSE) {
res = threadPool.supervisorRpcClient.pauseInstance(lockInstanceId);
} else if (ops == Operations.EXCEPTION_CANCEL) {
res = threadPool.supervisorRpcClient.cancelInstance(lockInstanceId, ops);
} else {
LOG.error("Stop instance unsupported operation: {} | {}", param.getTaskId(), ops);
}
}
if (!res) {
LOG.info("Stop instance conflict: {} | {} | {}", param.getInstanceId(), param.getTaskId(), ops);
}
} catch (Throwable t) {
LOG.error("Stop instance error: " + param.getTaskId() + " | " + ops, t);
Threads.interruptIfNecessary(t);
}
}

/**
* Active thread pool
*/
Expand Down Expand Up @@ -763,13 +728,15 @@ public void run() {
}

if (param == null) {
if (executingParam.get() != null) {
param = workQueue.poll();
}
if (param == null) {
ExecuteTaskParam executeTaskParam = executingParam.get();
if (executeTaskParam == null) {
LOG.info("Worker thread exit, idle wait timeout.");
break;
}
if ((param = workQueue.poll()) == null) {
LOG.error("Executing param exists, but work queue not task: {}", executeTaskParam);
break;
}
}

try {
Expand All @@ -782,7 +749,7 @@ public void run() {

// return this to idle thread pool
if (!threadPool.returnWorkerThread(this)) {
return;
break;
}
}

Expand Down Expand Up @@ -885,10 +852,10 @@ private void runTask(ExecuteTaskParam param) {
terminateTask(threadPool, param, Operations.TRIGGER, EXECUTE_TIMEOUT, toErrorMsg(e));
} catch (PauseTaskException e) {
LOG.error("Pause task exception: {} | {}", param, e.getMessage());
stopInstance(threadPool, param, Operations.PAUSE, toErrorMsg(e));
stopInstance(param, Operations.PAUSE, toErrorMsg(e));
} catch (CancelTaskException e) {
LOG.error("Cancel task exception: {} | {}", param, e.getMessage());
stopInstance(threadPool, param, Operations.EXCEPTION_CANCEL, toErrorMsg(e));
stopInstance(param, Operations.EXCEPTION_CANCEL, toErrorMsg(e));
} catch (Throwable t) {
if (t instanceof java.lang.ThreadDeath) {
LOG.warn("Task execute thread death: {} | {}", param, t.getMessage());
Expand All @@ -907,8 +874,39 @@ private void runTask(ExecuteTaskParam param) {
} catch (Throwable t) {
LOG.error("Task destroy error: " + param, t);
}
} // end of try catch block
} // end of runTask method
}
}

private void stopInstance(ExecuteTaskParam param, Operations ops, String errorMsg) {
if (!param.updateOperation(Operations.TRIGGER, ops)) {
LOG.info("Stop instance conflict: {} | {}", param, ops);
return;
}

LOG.info("Stop instance task: {} | {}", param.getTaskId(), ops);
terminateTask(threadPool, param, ops, ops.toState(), errorMsg);

boolean res = true;
long lockInstanceId = param.getWnstanceId() != null ? param.getWnstanceId() : param.getInstanceId();
try {
synchronized (JobConstants.INSTANCE_LOCK_POOL.intern(lockInstanceId)) {
if (ops == Operations.PAUSE) {
res = threadPool.supervisorRpcClient.pauseInstance(lockInstanceId);
} else if (ops == Operations.EXCEPTION_CANCEL) {
res = threadPool.supervisorRpcClient.cancelInstance(lockInstanceId, ops);
} else {
LOG.error("Stop instance unsupported operation: {} | {}", param.getTaskId(), ops);
}
}
if (!res) {
LOG.info("Stop instance conflict: {} | {} | {}", param.getInstanceId(), param.getTaskId(), ops);
}
} catch (Throwable t) {
LOG.error("Stop instance error: " + param.getTaskId() + " | " + ops, t);
Threads.interruptIfNecessary(t);
}
}

} // end of worker thread class definition

}

0 comments on commit 1f36f57

Please sign in to comment.