Skip to content

Commit 840ffa9

Browse files
committed
programming done :)
1 parent a836ead commit 840ffa9

File tree

4 files changed

+109
-47
lines changed

4 files changed

+109
-47
lines changed

CombiningTree.java

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public CombiningTree(int depth) {
3232
public CombiningTree(int depth, int ary) {
3333
Node<T>[] parent = createNodes(1, ary);
3434
root = parent[0];
35+
leaf = parent;
3536
for (int i=1; i<depth; i++) {
3637
int n = (int) Math.pow(ary, i);
3738
leaf = createNodes(n, ary);

Main.java

+45-12
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,39 @@
11
import java.util.*;
2-
import java.util.concurrent.ConcurrentLinkedQueue;
2+
import java.util.concurrent.*;
3+
34

45
class Main {
56
static Queue<Integer> queue;
67
static CombiningTree<Integer> tree;
7-
static int TH = 8, NUM = 10;
8+
static int TH = 25, ARY = 3, NUM = 100;
9+
// queue: used to store old (get) values (unique check)
10+
// tree: combining tree where threads do increment ops
11+
// TH: number of threads
12+
// ARY: arity of combining tree
13+
// NUM: number of increment ops each thread performs
14+
815

9-
// Each safe thread enqs N numbers and deqs N, adding
10-
// them to its own deqValues for checking; using
11-
// ArrayQueue.
16+
// Each thread performs increment op using the combining
17+
// tree, and saves old (get) values in the queue, for
18+
// uniqueness check later.
1219
static Thread thread(int id) {
1320
return new Thread(() -> {
1421
try {
22+
long start = System.currentTimeMillis();
1523
for (int i=0; i<NUM; i++) {
1624
Integer r = tree.getAndOp(1,
17-
(Integer x, Integer y) -> x);
25+
(Integer x, Integer y) -> x + y);
1826
queue.add(r);
27+
Thread.yield();
1928
}
29+
long stop = System.currentTimeMillis();
30+
log(id()+": done in "+(stop-start)+"ms");
2031
} catch (InterruptedException e) {}
2132
});
2233
}
2334

24-
// Checks if each thread dequeued N values, and they are
25-
// globally unique.
35+
// Check if total sum is as expected.
36+
// Check if all old values are unique.
2637
static boolean wasValid() {
2738
int a = tree.get().intValue();
2839
if (a != TH*NUM) return false;
@@ -35,6 +46,20 @@ static boolean wasValid() {
3546
return true;
3647
}
3748

49+
// Setup the combining tree for threads.
50+
static void setupTreeAndQueue() {
51+
int depth = (int) Math.ceil(Math.log(TH)/Math.log(ARY));
52+
tree = new CombiningTree<>(depth, ARY);
53+
tree.set(0);
54+
}
55+
56+
// Setup the queue for storing old values.
57+
static void setupQueue() {
58+
queue = new ConcurrentLinkedQueue<>();
59+
}
60+
61+
62+
// Start threads doing increments using tree.
3863
static Thread[] startOps() {
3964
Thread[] t = new Thread[TH];
4065
for (int i=0; i<TH; i++)
@@ -44,24 +69,32 @@ static Thread[] startOps() {
4469
return t;
4570
}
4671

72+
// Wait until all threads done with increments.
4773
static void awaitOps(Thread[] t) {
4874
try {
4975
for (int i=0; i<TH; i++)
5076
t[i].join();
5177
} catch (InterruptedException e) {}
5278
}
5379

80+
5481
public static void main(String[] args) {
55-
queue = new ConcurrentLinkedQueue<>();
56-
tree = new CombiningTree<>(3);
57-
tree.set(0);
58-
log("Starting "+TH+" threads doing ops ...");
82+
setupTree();
83+
setupQueue();
84+
log(ARY+"-ary "+depth+"-depth Combining tree.");
85+
log("Starting "+TH+" threads doing increments ...");
5986
Thread[] t = startOps();
6087
awaitOps(t);
88+
log("Total: "+tree.get());
6189
log("\nWas valid? "+wasValid());
6290
}
6391

92+
6493
static void log(String x) {
6594
System.out.println(x);
6695
}
96+
97+
static long id() {
98+
return Thread.currentThread().getId();
99+
}
67100
}

Node.java

+34-15
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class Node<T> {
2424
// FREE: indicates that values are still being inserted
2525
// PUSHING: node is full, combined value being pushed
2626
// PULLED: pulled value from parent being given to threads
27-
// TIMEOUT: maximum time active thread waits for node to fill
27+
// TIMEOUT: max time active thread waits for node to fill
2828
// parent: parent node
2929
// state: either FREE, PUSHING, or PULLING
3030
// count: number of values in node
@@ -60,16 +60,18 @@ public synchronized void set(T x) {
6060

6161
// Gets current value, and then updates it.
6262
// x: value to OP (accumulate), op: binary operator
63-
// 1. Perform get & op based on 3 possible cases.
64-
// 1a. Root node
65-
// 1b. Active thread (first to visit node)
66-
// 1c. Passive thread (visits later)
63+
// 1. Wait until node is free.
64+
// 2. Perform get & op based on 3 possible cases.
65+
// 2a. Root node
66+
// 2b. Active thread (first to visit node)
67+
// 2c. Passive thread (visits later)
6768
public synchronized T getAndOp(T x,
6869
BinaryOperator<T> op)
6970
throws InterruptedException {
70-
if (parent==null) return getAndOpRoot(x, op); // 1a
71-
if (count==0) return getAndOpActive(x, op); // 1b
72-
return getAndOpPassive(x, op); // 1c
71+
while (state!=FREE || count==size) wait(); // 1
72+
if (parent==null) return getAndOpRoot(x, op); // 2a
73+
if (count==0) return getAndOpActive(x, op); // 2b
74+
return getAndOpPassive(x, op); // 2c
7375
}
7476

7577
// Performs get & op for root node.
@@ -108,7 +110,8 @@ private synchronized T getAndOpActive(T x,
108110
T r = parent.getAndOp(a, op); // 5
109111
distribute(r, op); // 6
110112
state = PULLING; // 7
111-
count--; // 8
113+
notifyAll(); // 7
114+
decrementCount(); // 8
112115
return r; // 9
113116
}
114117

@@ -124,7 +127,7 @@ private synchronized T getAndOpPassive(T x,
124127
throws InterruptedException {
125128
int i = insert(x); // 1
126129
while (state!=PULLING) wait(); // 2
127-
if (--count==0) state = FREE; // 3, 4
130+
decrementCount(); // 3, 4
128131
return value[i]; // 5
129132
}
130133

@@ -133,16 +136,16 @@ private synchronized T getAndOpPassive(T x,
133136
// x: value to insert
134137
// 1. Wait unit node is free.
135138
// 2. Get index to place value in.
136-
// 3. Increment number of values in node.
137-
// 4. Place the value.
139+
// 3. Place the value.
140+
// 4. Increment number of values in node.
138141
// 5. If node is full, notify active thread.
139142
// 6. Return index where value was placed.
140143
public synchronized int insert(T x)
141144
throws InterruptedException {
142145
while (state!=FREE) wait(); // 1
143-
int i = count++; // 2, 3
144-
value[i] = x; // 4
145-
if (count==size) notifyAll(); // 5
146+
int i = count; // 2
147+
value[i] = x; // 3
148+
incrementCount(); // 4, 5
146149
return i; // 6
147150
}
148151

@@ -173,6 +176,22 @@ public synchronized void distribute(T r,
173176
} // 3
174177
}
175178

179+
// Increment count once done with insertion.
180+
// 1. Increment count.
181+
// 2. If node is full, notify active thread.
182+
private synchronized void incrementCount() {
183+
if (++count<size) return; // 1
184+
notifyAll(); // 2
185+
}
186+
187+
// Decrement count once done with distribution.
188+
// 1. Decrement count.
189+
// 2. If count is zero, node is free.
190+
private synchronized void decrementCount() {
191+
if (--count>0) return; // 1
192+
state = FREE; // 2
193+
notifyAll(); // 2
194+
}
176195

177196
// Wait until node is full, or timeout.
178197
// 1. Get start time.

README.md

+29-20
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,36 @@ tryDeq():
3131

3232
```bash
3333
## OUTPUT
34-
Starting 10 threads with sequential queue
35-
2: failed enq
36-
5: failed deq
37-
6: failed deq
38-
7: failed deq
39-
8: failed deq
40-
1: failed deq
41-
4: failed deq
42-
9: failed deq
43-
1: dequeued 0/1000 values
44-
2: dequeued 0/1000 values
45-
4: dequeued 0/1000 values
46-
5: dequeued 698/1000 values
47-
6: dequeued 0/1000 values
48-
7: dequeued 0/1000 values
49-
8: dequeued 0/1000 values
50-
9: dequeued 0/1000 values
51-
Was LIFO? false
34+
3-ary 3-depth Combining tree.
35+
Starting 25 threads doing increments ...
36+
30: done in 51ms
37+
14: done in 60ms
38+
13: done in 68ms
39+
23: done in 53ms
40+
12: done in 56ms
41+
21: done in 54ms
42+
32: done in 56ms
43+
31: done in 56ms
44+
22: done in 54ms
45+
28: done in 7831ms
46+
11: done in 7836ms
47+
10: done in 7838ms
48+
19: done in 7833ms
49+
20: done in 7833ms
50+
29: done in 7831ms
51+
24: done in 8165ms
52+
33: done in 8160ms
53+
25: done in 8165ms
54+
16: done in 8167ms
55+
34: done in 8159ms
56+
15: done in 8167ms
57+
26: done in 11970ms
58+
17: done in 11972ms
59+
27: done in 12239ms
60+
18: done in 12241ms
61+
Total: 2500
5262

53-
Starting 10 threads with array queue
54-
Was LIFO? true
63+
Was valid? true
5564
```
5665

5766
See [ArrayQueue.java] for code, [Main.java] for test, and [repl.it] for output.

0 commit comments

Comments
 (0)