Skip to content

Commit a836ead

Browse files
committed
except Main, wrote comments
1 parent 090775f commit a836ead

File tree

3 files changed

+213
-114
lines changed

3 files changed

+213
-114
lines changed

CombiningTree.java

+43-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,28 @@
11
import java.util.function.*;
22

33

4+
// A Combining Tree is an N-ary tree of nodes, that follows
5+
// software combining to reduce memory contention while
6+
// updating a shared value.
7+
//
8+
// The shared value is placed at the root of the tree, and
9+
// threads perfrom getAndOp() at the leaf nodes. Each leaf
10+
// node handles N threads. The combined value is then
11+
// propagated up the tree by an active thread. There can be
12+
// several active threads, but eventually one active thread
13+
// updates the root node. It then shares this message to all
14+
// other threads behind it.
15+
//
16+
// This was contention at a single memory location is avoided.
17+
// However, i guess that such a design is normally useful
18+
// only in hardware, as it becomes too slow in software.
19+
// Useful for educational purposes.
20+
421
class CombiningTree<T> {
22+
Node<T> root;
523
Node<T>[] leaf;
24+
// root: root node
25+
// leaf: leaf nodes
626

727

828
public CombiningTree(int depth) {
@@ -11,11 +31,13 @@ public CombiningTree(int depth) {
1131

1232
public CombiningTree(int depth, int ary) {
1333
Node<T>[] parent = createNodes(1, ary);
34+
root = parent[0];
1435
for (int i=1; i<depth; i++) {
1536
int n = (int) Math.pow(ary, i);
1637
leaf = createNodes(n, ary);
1738
for (int j=0; j<n; j++)
1839
leaf[j].parent = parent[j/ary];
40+
parent = leaf;
1941
}
2042
}
2143

@@ -28,14 +50,32 @@ private Node<T>[] createNodes(int n, int ary) {
2850
}
2951

3052

53+
// Get value of tree (at root).
54+
public T get() {
55+
return root.get();
56+
}
57+
58+
// Set value of tree (at root).
59+
public void set(T x) {
60+
root.set(x);
61+
}
62+
63+
64+
// Gets current value, and then updates it.
65+
// x: value to OP with, op: binary op
66+
// 1. Select leaf index using thread id.
67+
// 2. Perform get & op at desired leaf.
3168
public T getAndOp(T x, BinaryOperator<T> op)
3269
throws InterruptedException {
33-
int i = (int) Thread.currentThread().getId();
34-
return getAndOp(x, op, i);
70+
int i = (int) Thread.currentThread().getId(); // 1
71+
return getAndOp(x, op, i); // 2
3572
}
3673

74+
// Gets current value, and then updates it.
75+
// x: value to OP with, op: binary op, i: leaf index
76+
// 1. Perform get & op at desired leaf (ensure in limit).
3777
public T getAndOp(T x, BinaryOperator<T> op, int i)
3878
throws InterruptedException {
39-
return leaf[i % leaf.length].getAndOp(x, op);
79+
return leaf[i % leaf.length].getAndOp(x, op); // 1
4080
}
4181
}

Main.java

+37-72
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,64 @@
11
import java.util.*;
2+
import java.util.concurrent.ConcurrentLinkedQueue;
23

34
class Main {
4-
static Deque<Integer> queue;
5-
static CombiningTree<Integer> concurrentQueue;
6-
static List<Integer>[] deqValues;
7-
static int TH = 10, NUM = 1000;
8-
9-
// Each unsafe thread enqs N numbers and deqs N, adding
10-
// them to its own deqValues for checking; using Java's
11-
// sequential queue implementation, ArrayDeque.
12-
static Thread unsafe(int id, int x, int N) {
13-
return new Thread(() -> {
14-
String action = "enq";
15-
try {
16-
for (int i=0, y=x; i<N; i++)
17-
queue.addLast(y++);
18-
Thread.sleep(1000);
19-
action = "deq";
20-
for (int i=0; i<N; i++)
21-
deqValues[id].add(queue.removeFirst());
22-
}
23-
catch (Exception e) { log(id+": failed "+action); }
24-
});
25-
}
5+
static Queue<Integer> queue;
6+
static CombiningTree<Integer> tree;
7+
static int TH = 8, NUM = 10;
268

279
// Each safe thread enqs N numbers and deqs N, adding
2810
// them to its own deqValues for checking; using
2911
// ArrayQueue.
30-
static Thread safe(int id, int x, int N) {
12+
static Thread thread(int id) {
3113
return new Thread(() -> {
32-
String action = "enq";
3314
try {
34-
for (int i=0, y=x; i<N; i++)
35-
concurrentQueue.enq(y++);
36-
Thread.sleep(1000);
37-
action = "deq";
38-
for (int i=0; i<N; i++)
39-
deqValues[id].add(concurrentQueue.deq());
15+
for (int i=0; i<NUM; i++) {
16+
Integer r = tree.getAndOp(1,
17+
(Integer x, Integer y) -> x);
18+
queue.add(r);
4019
}
41-
catch (Exception e) { log(id+": failed "+action);
42-
e.printStackTrace(); }
20+
} catch (InterruptedException e) {}
4321
});
4422
}
4523

4624
// Checks if each thread dequeued N values, and they are
4725
// globally unique.
48-
static boolean wasLIFO(int N) {
49-
Set<Integer> set = new HashSet<>();
50-
boolean passed = true;
51-
for (int i=0; i<TH; i++) {
52-
int n = deqValues[i].size();
53-
if (n != N) {
54-
log(i+": dequeued "+n+"/"+N+" values");
55-
passed = false;
56-
}
57-
for (Integer x : deqValues[i])
58-
if (set.contains(x)) {
59-
log(i+": has duplicate value "+x);
60-
passed = false;
61-
}
62-
set.addAll(deqValues[i]);
26+
static boolean wasValid() {
27+
int a = tree.get().intValue();
28+
if (a != TH*NUM) return false;
29+
Set<Integer> s = new HashSet<>();
30+
while (queue.size()>0) {
31+
Integer n = queue.remove();
32+
if (s.contains(n)) return false;
33+
s.add(n);
6334
}
64-
return passed;
35+
return true;
6536
}
6637

67-
@SuppressWarnings("unchecked")
68-
static void testThreads(boolean safe) {
69-
queue = new ArrayDeque<>();
70-
concurrentQueue = new CombiningTree<>(TH*NUM);
71-
deqValues = new List[TH];
38+
static Thread[] startOps() {
39+
Thread[] t = new Thread[TH];
7240
for (int i=0; i<TH; i++)
73-
deqValues[i] = new ArrayList<>();
74-
Thread[] threads = new Thread[TH];
75-
for (int i=0; i<TH; i++) {
76-
threads[i] = safe?
77-
safe(i, i*NUM, NUM) :
78-
unsafe(i, i*NUM, NUM);
79-
threads[i].start();
80-
}
41+
t[i] = thread(i);
42+
for (int i=0; i<TH; i++)
43+
t[i].start();
44+
return t;
45+
}
46+
47+
static void awaitOps(Thread[] t) {
8148
try {
8249
for (int i=0; i<TH; i++)
83-
threads[i].join();
84-
}
85-
catch (Exception e) {}
50+
t[i].join();
51+
} catch (InterruptedException e) {}
8652
}
8753

8854
public static void main(String[] args) {
89-
log("Starting "+TH+" threads with sequential queue");
90-
testThreads(false);
91-
log("Was LIFO? "+wasLIFO(NUM));
92-
log("");
93-
log("Starting "+TH+" threads with array queue");
94-
testThreads(true);
95-
log("Was LIFO? "+wasLIFO(NUM));
96-
log("");
55+
queue = new ConcurrentLinkedQueue<>();
56+
tree = new CombiningTree<>(3);
57+
tree.set(0);
58+
log("Starting "+TH+" threads doing ops ...");
59+
Thread[] t = startOps();
60+
awaitOps(t);
61+
log("\nWas valid? "+wasValid());
9762
}
9863

9964
static void log(String x) {

0 commit comments

Comments
 (0)