Skip to content

Commit 8303595

Browse files
committed
add TaskMonitoring
1 parent c959735 commit 8303595

File tree

6 files changed

+177
-37
lines changed

6 files changed

+177
-37
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/application/Application.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import net.lecousin.framework.application.libraries.LibrariesManager;
2323
import net.lecousin.framework.concurrent.Console;
2424
import net.lecousin.framework.concurrent.Task;
25+
import net.lecousin.framework.concurrent.TaskMonitoring;
2526
import net.lecousin.framework.concurrent.synch.ISynchronizationPoint;
2627
import net.lecousin.framework.concurrent.synch.JoinPoint;
2728
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
@@ -68,6 +69,7 @@ private Application(
6869
this.librariesManager = librariesManager;
6970
console = new Console(this);
7071
toCloseSync.add(console);
72+
if (debugMode) TaskMonitoring.checkLocksOfBlockingTasks = true;
7173
}
7274

7375
private long startTime;

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/TaskManager.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package net.lecousin.framework.concurrent;
22

3+
import java.util.ArrayList;
34
import java.util.Iterator;
45
import java.util.LinkedList;
56
import java.util.List;
@@ -9,6 +10,7 @@
910
import net.lecousin.framework.collections.TurnArray;
1011
import net.lecousin.framework.concurrent.synch.AsyncWork;
1112
import net.lecousin.framework.exception.NoException;
13+
import net.lecousin.framework.util.DebugUtil;
1214

1315
/**
1416
* Base class to implement a TaskManager, which is responsible to execute tasks in threads.
@@ -40,6 +42,7 @@ public TaskManager(
4042

4143
private TurnArray<TaskWorker> spare;
4244
private TurnArray<TaskWorker> blocked;
45+
private LinkedList<TaskWorker> aside = new LinkedList<>();
4346
private Object stopping = null;
4447
private TaskManager transferredTo = null;
4548
private boolean stopped = false;
@@ -228,8 +231,8 @@ void imBlocked(TaskWorker worker) {
228231
if (Threading.traceBlockingTasks) {
229232
Threading.logger.error("Task " + worker.currentTask.description + " blocked", new Exception());
230233
}
231-
if (Threading.logger.debug())
232-
ThreadingDebugHelper.checkNoLockForWorker();
234+
if (TaskMonitoring.checkLocksOfBlockingTasks)
235+
TaskMonitoring.checkNoLockForWorker();
233236
if (transferredTo != null) {
234237
// we are in the process of being transferred, we cannot launch a spare
235238
Threading.logger.info("Task blocked while transferring to a new TaskManager: " + worker.currentTask.description);
@@ -303,6 +306,51 @@ boolean remove(Task<?,?> task) {
303306
}
304307
return false;
305308
}
309+
310+
List<TaskWorker> getAllActiveWorkers() {
311+
TaskWorker[] workers = getWorkers();
312+
ArrayList<TaskWorker> list = new ArrayList<>(workers.length + blocked.size() + aside.size());
313+
for (TaskWorker w : workers)
314+
list.add(w);
315+
synchronized (blocked) {
316+
list.addAll(blocked);
317+
}
318+
synchronized (aside) {
319+
list.addAll(aside);
320+
}
321+
return list;
322+
}
323+
324+
void putWorkerAside(TaskWorker worker) {
325+
TaskWorker newWorker = createWorker();
326+
worker.aside = true;
327+
synchronized (aside) {
328+
aside.add(worker);
329+
replaceWorkerBySpare(worker, newWorker);
330+
}
331+
newWorker.thread.start();
332+
}
333+
334+
@SuppressWarnings("deprecation")
335+
void killWorker(TaskWorker worker) {
336+
synchronized (aside) {
337+
if (!aside.remove(worker)) return;
338+
}
339+
StackTraceElement[] stack = worker.thread.getStackTrace();
340+
StringBuilder s = new StringBuilder(1024);
341+
s.append("Task stopped at \r\n");
342+
DebugUtil.createStackTrace(s, stack);
343+
Threading.logger.error(s.toString());
344+
worker.thread.stop();
345+
if (worker.currentTask != null)
346+
worker.currentTask.getSynch().cancel(new CancelException("Task was running since a too long time"));
347+
}
348+
349+
void asideWorkerDone(TaskWorker worker) {
350+
synchronized (aside) {
351+
aside.remove(worker);
352+
}
353+
}
306354

307355
/** Describe what threads are doing for debugging purpose. */
308356
public void debug(StringBuilder s) {
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package net.lecousin.framework.concurrent;
2+
3+
import java.io.Closeable;
4+
import java.lang.management.LockInfo;
5+
import java.lang.management.ManagementFactory;
6+
import java.lang.management.MonitorInfo;
7+
import java.lang.management.ThreadInfo;
8+
import java.lang.management.ThreadMXBean;
9+
import java.util.concurrent.ThreadFactory;
10+
11+
import net.lecousin.framework.application.LCCore;
12+
import net.lecousin.framework.util.DebugUtil;
13+
14+
public final class TaskMonitoring {
15+
16+
public static boolean checkLocksOfBlockingTasks = false;
17+
18+
public static int MINUTES_BEFORE_TO_PUT_TASK_ASIDE = 5;
19+
public static int MINUTES_BEFORE_KILL_TASK = 10;
20+
21+
private static TaskMonitor monitor;
22+
23+
static void start(ThreadFactory threadFactory) {
24+
monitor = new TaskMonitor();
25+
threadFactory.newThread(monitor);
26+
LCCore.get().toClose(monitor);
27+
}
28+
29+
/** Called when a TaskWorker is blocked, if checkLocksOfBlockingTasks is true. */
30+
static void checkNoLockForWorker() {
31+
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
32+
if (bean == null) return;
33+
Thread t = Thread.currentThread();
34+
ThreadInfo info = bean.getThreadInfo(t.getId());
35+
if (info == null) return;
36+
MonitorInfo[] monitors = info.getLockedMonitors();
37+
LockInfo[] locks = info.getLockedSynchronizers();
38+
if (monitors.length == 0 && locks.length == 0) return;
39+
StringBuilder s = new StringBuilder(4096);
40+
s.append("TaskWorker is blocked while locking objects:\r\n");
41+
DebugUtil.createStackTrace(s, new Exception("Here"), false);
42+
if (monitors.length > 0) {
43+
s.append("\r\nLocked monitors:");
44+
for (int i = 0; i < monitors.length; ++i) {
45+
StackTraceElement trace = monitors[i].getLockedStackFrame();
46+
s.append("\r\n - ").append(trace.getClassName()).append('#')
47+
.append(trace.getMethodName()).append(':').append(trace.getLineNumber());
48+
}
49+
}
50+
if (locks.length > 0) {
51+
s.append("\r\nLocked synchronizers:");
52+
for (int i = 0; i < locks.length; ++i) {
53+
s.append("\r\n - ").append(locks[i].getClassName());
54+
}
55+
}
56+
Threading.logger.error(s.toString());
57+
}
58+
59+
private static class TaskMonitor implements Runnable, Closeable {
60+
61+
private Object lock = new Object();
62+
private boolean closed = false;
63+
64+
@Override
65+
public void run() {
66+
while (!closed) {
67+
synchronized (lock) {
68+
try { lock.wait(60000); }
69+
catch (InterruptedException e) { break; }
70+
if (closed)
71+
break;
72+
}
73+
for (TaskManager manager : Threading.getAllTaskManagers())
74+
check(manager);
75+
}
76+
}
77+
78+
@Override
79+
public void close() {
80+
closed = true;
81+
synchronized (lock) {
82+
lock.notifyAll();
83+
}
84+
}
85+
}
86+
87+
private static void check(TaskManager manager) {
88+
for (TaskWorker worker : manager.getAllActiveWorkers())
89+
check(worker);
90+
}
91+
92+
private static void check(TaskWorker worker) {
93+
if (worker == null) return;
94+
long now = System.nanoTime();
95+
Task<?,?> task = worker.currentTask;
96+
if (task == null) return;
97+
long start = worker.currentTaskStart;
98+
long minutes = (now - start) / (1000000L * 1000 * 60);
99+
if (minutes < MINUTES_BEFORE_TO_PUT_TASK_ASIDE) return;
100+
if (minutes < MINUTES_BEFORE_KILL_TASK) {
101+
Threading.logger.warn("Task " + task + " is running since more than 5 minutes !");
102+
worker.manager.putWorkerAside(worker);
103+
return;
104+
}
105+
Threading.logger.error("Task " + task + " is running since more than 10 minutes ! kill it.");
106+
worker.manager.killWorker(worker);
107+
}
108+
109+
}

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/TaskWorker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ class TaskWorker implements Runnable, BlockedThreadHandler {
2121
TaskManager manager;
2222
//boolean working = false;
2323
Task<?,?> currentTask = null;
24+
long currentTaskStart = -1;
2425
long tasksDone = 0;
2526
long workingTime = 0;
2627
long waitingTime = 0;
2728
long blockedTime = 0;
2829
long lastUsed = -1;
2930
Thread thread;
31+
boolean aside = false;
3032

3133
@SuppressFBWarnings("NN_NAKED_NOTIFY")
3234
void forceStop() {
@@ -62,6 +64,7 @@ public void run() {
6264
}
6365
// take something to do
6466
//working = false;
67+
currentTaskStart = System.nanoTime();
6568
currentTask = manager.peekNextOrWait(this);
6669
if (currentTask == null) {
6770
if (finish)
@@ -99,6 +102,10 @@ public void run() {
99102
} else {
100103
workingTime += System.nanoTime() - start;
101104
}
105+
if (aside) {
106+
manager.asideWorkerDone(this);
107+
break;
108+
}
102109
}
103110
Threading.unregisterBlockedThreadHandler(this.thread);
104111
StringBuilder s = new StringBuilder();

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/Threading.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package net.lecousin.framework.concurrent;
22

3+
import java.util.ArrayList;
34
import java.util.Collection;
45
import java.util.HashMap;
56
import java.util.List;
@@ -41,6 +42,7 @@ public static void init(ThreadFactory threadFactory, Class<? extends TaskPriorit
4142
LCCore.get().toClose(new StopMultiThreading());
4243
if (traceTasksNotDone)
4344
ThreadingDebugHelper.traceTasksNotDone();
45+
TaskMonitoring.start(threadFactory);
4446
synchronized (resources) {
4547
for (TaskManager tm : resources.values())
4648
tm.autoCloseSpares();
@@ -142,6 +144,13 @@ public static TaskManager get(Object resource) {
142144
return resources.get(resource);
143145
}
144146

147+
/** Return all current TaskManager. */
148+
public static List<TaskManager> getAllTaskManagers() {
149+
synchronized (resources) {
150+
return new ArrayList<>(resources.values());
151+
}
152+
}
153+
145154
private static Map<Thread, BlockedThreadHandler> blockedHandlers = new HashMap<>();
146155

147156
/** Rregister the given thread. */

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/ThreadingDebugHelper.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
package net.lecousin.framework.concurrent;
22

3-
import java.lang.management.LockInfo;
4-
import java.lang.management.ManagementFactory;
5-
import java.lang.management.MonitorInfo;
6-
import java.lang.management.ThreadInfo;
7-
import java.lang.management.ThreadMXBean;
83
import java.util.ArrayList;
94
import java.util.Iterator;
105
import java.util.LinkedList;
@@ -13,7 +8,6 @@
138
import net.lecousin.framework.concurrent.synch.JoinPoint;
149
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
1510
import net.lecousin.framework.exception.NoException;
16-
import net.lecousin.framework.util.DebugUtil;
1711

1812
/**
1913
* Utilities to help debugging multi-threading applications.
@@ -206,33 +200,4 @@ public void run() {
206200
t.start();
207201
}
208202

209-
/** Called in debugging mode when a TaskWorker is blocked. */
210-
static void checkNoLockForWorker() {
211-
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
212-
if (bean == null) return;
213-
Thread t = Thread.currentThread();
214-
ThreadInfo info = bean.getThreadInfo(t.getId());
215-
if (info == null) return;
216-
MonitorInfo[] monitors = info.getLockedMonitors();
217-
LockInfo[] locks = info.getLockedSynchronizers();
218-
if (monitors.length == 0 && locks.length == 0) return;
219-
StringBuilder s = new StringBuilder(4096);
220-
s.append("TaskWorker is blocked while locking objects:\r\n");
221-
DebugUtil.createStackTrace(s, new Exception("Here"), false);
222-
if (monitors.length > 0) {
223-
s.append("\r\nLocked monitors:");
224-
for (int i = 0; i < monitors.length; ++i) {
225-
StackTraceElement trace = monitors[i].getLockedStackFrame();
226-
s.append("\r\n - ").append(trace.getClassName()).append('#')
227-
.append(trace.getMethodName()).append(':').append(trace.getLineNumber());
228-
}
229-
}
230-
if (locks.length > 0) {
231-
s.append("\r\nLocked synchronizers:");
232-
for (int i = 0; i < locks.length; ++i) {
233-
s.append("\r\n - ").append(locks[i].getClassName());
234-
}
235-
}
236-
Threading.logger.error(s.toString());
237-
}
238203
}

0 commit comments

Comments
 (0)