Skip to content

Commit 96b0a53

Browse files
committed
add URLEncoding + Blockable
1 parent c380075 commit 96b0a53

21 files changed

+890
-81
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/async/Async.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.function.Function;
1010

1111
import net.lecousin.framework.concurrent.CancelException;
12-
import net.lecousin.framework.concurrent.threads.TaskExecutor;
1312
import net.lecousin.framework.concurrent.threads.Threading;
1413
import net.lecousin.framework.log.Logger;
1514
import net.lecousin.framework.util.ThreadUtil;
@@ -126,11 +125,11 @@ public final void cancel(CancelException reason) {
126125
@Override
127126
@SuppressWarnings("squid:S2274")
128127
public final void block(long timeout) {
129-
TaskExecutor executor;
128+
Blockable blockable;
130129
synchronized (this) {
131130
if (unblocked && listenersInline == null) return;
132-
executor = Threading.getTaskExecutor();
133-
if (executor == null) {
131+
blockable = Threading.getBlockable();
132+
if (blockable == null) {
134133
if (timeout <= 0) {
135134
while (!unblocked || listenersInline != null)
136135
if (!ThreadUtil.wait(this, 0)) return;
@@ -139,8 +138,8 @@ public final void block(long timeout) {
139138
}
140139
}
141140
}
142-
if (executor != null)
143-
executor.blocked(this, timeout);
141+
if (blockable != null)
142+
blockable.blocked(this, timeout);
144143
}
145144

146145
@Override

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/async/AsyncSupplier.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import net.lecousin.framework.concurrent.CancelException;
1313
import net.lecousin.framework.concurrent.Executable;
1414
import net.lecousin.framework.concurrent.threads.Task;
15-
import net.lecousin.framework.concurrent.threads.TaskExecutor;
1615
import net.lecousin.framework.concurrent.threads.Threading;
1716
import net.lecousin.framework.log.Logger;
1817
import net.lecousin.framework.util.Runnables.ConsumerThrows;
@@ -452,11 +451,11 @@ public final void cancel(CancelException reason) {
452451
@Override
453452
@SuppressWarnings("squid:S2274")
454453
public final void block(long timeout) {
455-
TaskExecutor executor;
454+
Blockable blockable;
456455
synchronized (this) {
457456
if (unblocked && listenersInline == null) return;
458-
executor = Threading.getTaskExecutor();
459-
if (executor == null) {
457+
blockable = Threading.getBlockable();
458+
if (blockable == null) {
460459
if (timeout <= 0) {
461460
while (!unblocked || listenersInline != null)
462461
if (!ThreadUtil.wait(this, 0)) return;
@@ -465,29 +464,29 @@ public final void block(long timeout) {
465464
}
466465
}
467466
}
468-
if (executor != null)
469-
executor.blocked(this, timeout);
467+
if (blockable != null)
468+
blockable.blocked(this, timeout);
470469
}
471470

472471
/** Block until this AsyncSupplier is unblocked or the given timeout expired,
473472
* and return the result in case of success, or throw the error or cancellation.
474473
* @param timeout in milliseconds. 0 or negative value means infinite.
475474
*/
476475
public final T blockResult(long timeout) throws TError, CancelException {
477-
TaskExecutor executor;
476+
Blockable blockable;
478477
synchronized (this) {
479478
if (unblocked && listenersInline == null) {
480479
if (error != null) throw error;
481480
if (cancel != null) throw cancel;
482481
return result;
483482
}
484-
executor = Threading.getTaskExecutor();
485-
if (executor == null)
483+
blockable = Threading.getBlockable();
484+
if (blockable == null)
486485
while (!unblocked || listenersInline != null)
487486
if (!ThreadUtil.wait(this, timeout < 0 ? 0 : timeout)) return null;
488487
}
489-
if (executor != null)
490-
executor.blocked(this, timeout);
488+
if (blockable != null)
489+
blockable.blocked(this, timeout);
491490
if (error != null) throw error;
492491
if (cancel != null) throw cancel;
493492
return result;
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package net.lecousin.framework.concurrent.async;
2+
3+
/** Blockable thread. */
4+
public interface Blockable {
5+
6+
/** Signal that the current thread is blocked by the given synchronization point. */
7+
void blocked(IAsync<?> synchPoint, long timeout);
8+
9+
}

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/async/LockPoint.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.ArrayList;
44

5-
import net.lecousin.framework.concurrent.threads.TaskExecutor;
65
import net.lecousin.framework.concurrent.threads.Threading;
76

87
/**
@@ -31,20 +30,20 @@ public void lock() {
3130
return;
3231
if (error != null)
3332
return;
34-
TaskExecutor executor;
33+
Blockable blockable;
3534
do {
3635
synchronized (this) {
3736
if (!locked) {
3837
locked = true;
3938
return;
4039
}
41-
executor = Threading.getTaskExecutor();
42-
if (executor != null) break;
40+
blockable = Threading.getBlockable();
41+
if (blockable != null) break;
4342
try { this.wait(0); }
4443
catch (InterruptedException e) { /* continue anyway */ }
4544
}
4645
} while (true);
47-
executor.blocked(this, 0);
46+
blockable.blocked(this, 0);
4847
}
4948

5049
/** Release the lock. */

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/async/MutualExclusion.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.ArrayList;
44

5-
import net.lecousin.framework.concurrent.threads.TaskExecutor;
65
import net.lecousin.framework.concurrent.threads.Threading;
76

87
/**
@@ -37,23 +36,23 @@ public void lock() {
3736
lockedTimes++;
3837
return;
3938
}
40-
TaskExecutor executor = null;
39+
Blockable blockable = null;
4140
do {
4241
synchronized (this) {
4342
if (lockingThread == null) {
4443
lockingThread = t;
4544
lockedTimes = 1;
4645
return;
4746
}
48-
if (executor == null)
49-
executor = Threading.getTaskExecutor(t);
50-
if (executor == null) {
47+
if (blockable == null)
48+
blockable = Threading.getBlockable(t);
49+
if (blockable == null) {
5150
try { this.wait(0); }
5251
catch (InterruptedException e) { /* ignore */ }
5352
continue;
5453
}
5554
}
56-
executor.blocked(this, 0);
55+
blockable.blocked(this, 0);
5756
} while (true);
5857
}
5958

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/async/WaitingDataQueueSynchronizationPoint.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import java.util.ArrayList;
44

55
import net.lecousin.framework.collections.TurnArray;
6-
import net.lecousin.framework.concurrent.threads.TaskExecutor;
76
import net.lecousin.framework.concurrent.threads.Threading;
87
import net.lecousin.framework.util.ThreadUtil;
98

@@ -29,7 +28,7 @@ public class WaitingDataQueueSynchronizationPoint<DataType,TError extends Except
2928
public DataType waitForData(long timeout) {
3029
long start = System.currentTimeMillis();
3130
do {
32-
TaskExecutor executor;
31+
Blockable blockable;
3332
synchronized (this) {
3433
if (cancel != null)
3534
return null;
@@ -39,12 +38,12 @@ public DataType waitForData(long timeout) {
3938
return waitingData.removeFirst();
4039
if (end)
4140
return null;
42-
executor = Threading.getTaskExecutor();
43-
if (executor == null && !ThreadUtil.wait(this, timeout))
41+
blockable = Threading.getBlockable();
42+
if (blockable == null && !ThreadUtil.wait(this, timeout))
4443
return null;
4544
}
46-
if (executor != null)
47-
executor.blocked(this, timeout);
45+
if (blockable != null)
46+
blockable.blocked(this, timeout);
4847
} while (timeout <= 0 || (System.currentTimeMillis() - start) < timeout);
4948
return null;
5049
}
@@ -95,18 +94,18 @@ public boolean isDone() {
9594
public void block(long timeout) {
9695
long start = System.currentTimeMillis();
9796
do {
98-
TaskExecutor executor;
97+
Blockable blockable;
9998
synchronized (this) {
10099
if (cancel != null) return;
101100
if (error != null) return;
102101
if (!waitingData.isEmpty()) return;
103102
if (end) return;
104-
executor = Threading.getTaskExecutor();
105-
if (executor == null && !ThreadUtil.wait(this, timeout))
103+
blockable = Threading.getBlockable();
104+
if (blockable == null && !ThreadUtil.wait(this, timeout))
106105
return;
107106
}
108-
if (executor != null)
109-
executor.blocked(this, timeout);
107+
if (blockable != null)
108+
blockable.blocked(this, timeout);
110109
} while (timeout <= 0 || (System.currentTimeMillis() - start) < timeout);
111110
}
112111

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/Task.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public Priority less() {
5353
switch (this) {
5454
case URGENT: return IMPORTANT;
5555
case IMPORTANT: return RATHER_IMPORTANT;
56+
case RATHER_IMPORTANT: return NORMAL;
5657
case NORMAL: return RATHER_LOW;
5758
default: return LOW;
5859
}
@@ -89,7 +90,7 @@ public Task(TaskManager manager, String description, Priority priority, Executab
8990
this.ondone = ondone;
9091
result.onDone(() -> {
9192
if (result.isCancelled() && status < STATUS_RUNNING) {
92-
Logger logger = app.getLoggerFactory().getLogger("Threading");
93+
Logger logger = app.getDefaultLogger();
9394
if (logger.debug()) {
9495
CancelException reason = result.getCancelEvent();
9596
logger.debug("Task cancelled: " + description + " => "
@@ -572,6 +573,13 @@ public String toString() {
572573
}
573574
}
574575

576+
/** Create a CPU task already done. */
577+
public static <T, TError extends Exception> Task<T, TError> done(T result, TError error) {
578+
Task<T, TError> t = new Task<>(Threading.getCPUTaskManager(), "", Priority.NORMAL, null, null);
579+
t.setDone(result, error);
580+
return t;
581+
}
582+
575583
/** Create a CPU task. */
576584
public static <T, TError extends Exception> Task<T, TError> cpu(
577585
String description, Priority priority, Executable<T, TError> executable, Consumer<Pair<T,TError>> ondone

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/TaskExecutor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44

55
import net.lecousin.framework.application.LCCore;
66
import net.lecousin.framework.concurrent.CancelException;
7+
import net.lecousin.framework.concurrent.async.Blockable;
78
import net.lecousin.framework.concurrent.async.IAsync;
89
import net.lecousin.framework.util.DebugUtil;
910
import net.lecousin.framework.util.ThreadUtil;
1011

1112
/** Thread executing tasks. */
12-
public abstract class TaskExecutor {
13+
public abstract class TaskExecutor implements Blockable {
1314

1415
protected TaskManager manager;
1516
protected Thread thread;
@@ -34,6 +35,7 @@ protected TaskExecutor(TaskManager manager, String name) {
3435
}
3536

3637
/** Signal that the current thread is blocked by the given synchronization point. */
38+
@Override
3739
public final void blocked(IAsync<?> synchPoint, long timeout) {
3840
long start = System.nanoTime();
3941
manager.imBlocked(this);

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/threads/Threading.java

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,14 @@
22

33
import java.io.File;
44
import java.util.ArrayList;
5-
import java.util.Collection;
65
import java.util.HashMap;
76
import java.util.List;
87
import java.util.Map;
98
import java.util.concurrent.ThreadFactory;
109

1110
import net.lecousin.framework.application.LCCore;
12-
import net.lecousin.framework.concurrent.CancelException;
1311
import net.lecousin.framework.concurrent.async.Async;
14-
import net.lecousin.framework.concurrent.async.AsyncSupplier;
12+
import net.lecousin.framework.concurrent.async.Blockable;
1513
import net.lecousin.framework.concurrent.async.IAsync;
1614
import net.lecousin.framework.concurrent.threads.DrivesThreadingManager.DrivesProvider;
1715
import net.lecousin.framework.concurrent.threads.fixed.MultiThreadTaskManager;
@@ -216,6 +214,33 @@ public static List<TaskManager> getAllTaskManagers() {
216214
}
217215

218216
private static Map<Thread, TaskExecutor> executors = new HashMap<>();
217+
private static Map<Thread, Blockable> blockables = new HashMap<>();
218+
219+
/** Register the executor for the given thread. */
220+
public static void registerBlockable(Blockable handler, Thread thread) {
221+
synchronized (blockables) {
222+
blockables.put(thread, handler);
223+
}
224+
}
225+
226+
/** Unregister the executor for the given thread. */
227+
public static void unregisterBlockable(Thread thread) {
228+
synchronized (blockables) {
229+
blockables.remove(thread);
230+
}
231+
}
232+
233+
/** Return the blockable for the given thread. */
234+
public static Blockable getBlockable(Thread thread) {
235+
Blockable b = executors.get(thread);
236+
if (b != null) return b;
237+
return blockables.get(thread);
238+
}
239+
240+
/** Return the blockable for the current thread. */
241+
public static Blockable getBlockable() {
242+
return getBlockable(Thread.currentThread());
243+
}
219244

220245
/** Register the executor for the given thread. */
221246
public static void registerTaskExecutor(TaskExecutor handler, Thread thread) {
@@ -265,34 +290,6 @@ public static void setUnmanagedMonitorConfiguration(TaskManagerMonitor.Configura
265290
unmanagedManager.getMonitor().setConfiguration(config);
266291
}
267292

268-
/** Wait for the given tasks to be done. */
269-
public static <TError extends Exception> void waitFinished(Collection<? extends Task<?,TError>> tasks) throws TError, CancelException {
270-
for (Task<?,TError> t : tasks) {
271-
t.getOutput().blockThrow(0);
272-
}
273-
}
274-
275-
/** Wait for the given tasks to finish, if one has an error this error is immediately thrown without waiting for other tasks. */
276-
public static <TError extends Exception> void waitUnblockedWithError(Collection<AsyncSupplier<?,TError>> tasks)
277-
throws TError, CancelException {
278-
for (AsyncSupplier<?,TError> t : tasks)
279-
t.blockResult(0);
280-
}
281-
282-
/** Wait for one of the given task to be done. */
283-
public static void waitOneFinished(List<? extends Task<?,?>> tasks) {
284-
if (tasks.isEmpty()) return;
285-
if (tasks.size() == 1)
286-
try { tasks.get(0).getOutput().block(0); }
287-
catch (Exception e) { /* ignore */ }
288-
Async<Exception> sp = new Async<>();
289-
for (Task<?,?> t : tasks) {
290-
if (t.isDone()) return;
291-
t.getOutput().onDone(sp::unblock);
292-
}
293-
sp.block(0);
294-
}
295-
296293
/** Return a string containing multi-threading status for debugging purposes. */
297294
public static String debug() {
298295
StringBuilder s = new StringBuilder();

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/util/AsyncConsumer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public interface AsyncConsumer<T, TError extends Exception> {
2929

3030
/** Call the consume method for each data in the given list. */
3131
default IAsync<TError> push(List<T> dataList) {
32+
if (dataList.isEmpty()) return new Async<>(true);
33+
if (dataList.size() == 1) return consume(dataList.get(0));
3234
Iterator<T> it = dataList.iterator();
3335
Async<TError> result = new Async<>();
3436
Runnable pushNext = new Runnable() {

0 commit comments

Comments
 (0)