Skip to content

Commit 090775f

Browse files
committed
🚌 let CombiningTree prevail!
1 parent 9f420cb commit 090775f

File tree

3 files changed

+115
-130
lines changed

3 files changed

+115
-130
lines changed

CombiningTree.java

Lines changed: 32 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,41 @@
1-
import java.nio.*;
2-
import java.util.*;
3-
import java.util.concurrent.locks.*;
1+
import java.util.function.*;
42

5-
// Array queue is a bounded lock-based FIFO queue using
6-
// an array. It uses 2 separate locks for head and tail.
73

84
class CombiningTree<T> {
9-
// headLock: lock for enq() at head
10-
// tailLock: lock for deq() at tail
11-
// data: array of values in stack
12-
// head: front of queue (0)
13-
// tail: rear of queue (0)
5+
Node<T>[] leaf;
146

15-
@SuppressWarnings("unchecked")
16-
public CombiningTree(int width) {
17-
Node<T>[] nodes = new Node[width - 1];
18-
nodes[0] = new Node<>();
19-
for (int i=1; i<nodes.length; i++)
20-
nodes[i] = new Node<>(nodes[(i-1)/2]);
21-
Node<T>[] leaf = new Node[(width+1)/2];
22-
for (int i=0; i<leaf.length; i++)
23-
leaf[i] = nodes[nodes.length-i-1];
7+
8+
public CombiningTree(int depth) {
9+
this(depth, 2);
2410
}
2511

26-
// 1. Lock tail.
27-
// 2. Try enq.
28-
// 3. Unlock tail.
29-
public int getAndIncrement() {
30-
Stack<Node<T>> stack = new Stack<>();
31-
Node<T> myLeaf = leaf[ThreadID.get()/2];
32-
Node node = myLeaf;
33-
// precombining phase
34-
while (node.precombine())
35-
node = node.parent;
36-
Node stop = node;
37-
// combining phase
38-
node = myLeaf;
39-
int combined = 1;
40-
while (node != stop) {
41-
combined = node.combine(combined);
42-
stack.push(node);
43-
node = node.parent;
44-
}
45-
// operation phase
46-
int prior = stop.op(combined);
47-
// distribution phase
48-
while (!stack.empty()) {
49-
node = stack.pop();
50-
node.distribute();
12+
public CombiningTree(int depth, int ary) {
13+
Node<T>[] parent = createNodes(1, ary);
14+
for (int i=1; i<depth; i++) {
15+
int n = (int) Math.pow(ary, i);
16+
leaf = createNodes(n, ary);
17+
for (int j=0; j<n; j++)
18+
leaf[j].parent = parent[j/ary];
5119
}
52-
return prior;
20+
}
21+
22+
@SuppressWarnings("unchecked")
23+
private Node<T>[] createNodes(int n, int ary) {
24+
Node<T>[] a = (Node<T>[]) new Node[n];
25+
for (int i=0; i<n; i++)
26+
a[i] = new Node<>(ary);
27+
return a;
28+
}
29+
30+
31+
public T getAndOp(T x, BinaryOperator<T> op)
32+
throws InterruptedException {
33+
int i = (int) Thread.currentThread().getId();
34+
return getAndOp(x, op, i);
35+
}
36+
37+
public T getAndOp(T x, BinaryOperator<T> op, int i)
38+
throws InterruptedException {
39+
return leaf[i % leaf.length].getAndOp(x, op);
5340
}
5441
}

Node.java

Lines changed: 83 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,100 @@
1+
import java.util.function.*;
2+
3+
14
class Node<T> {
5+
static final int FREE = 0;
6+
static final int PUSHING = 1;
7+
static final int PULLING = 2;
8+
static final long TIMEOUT = 100;
29
Node<T> parent;
3-
boolean locked;
4-
Status status;
5-
T value1;
6-
T value2;
7-
T result;
10+
int state;
11+
int count;
12+
T[] value;
13+
int size;
814

15+
916
public Node() {
10-
status = Status.ROOT;
17+
this(2);
1118
}
1219

13-
public Node(Node<T> par) {
14-
parent = par;
15-
status = Status.IDLE;
20+
@SuppressWarnings("unchecked")
21+
public Node(int n) {
22+
value = (T[]) new Object[n];
23+
parent = null;
24+
state = FREE;
25+
count = 0;
26+
size = n;
1627
}
1728

18-
// 1. Lock head.
19-
// 2. Try deq.
20-
// 3. Unlock head.
21-
public synchronized boolean precombine()
22-
throws InterruptedException {
23-
while (locked) wait();
24-
switch (status) {
25-
case IDLE:
26-
status = Status.FIRST;
27-
return true;
28-
case FIRST:
29-
locked = true;
30-
status = Status.SECOND;
31-
return false;
32-
case ROOT:
33-
return false;
34-
default:
35-
throw new IllegalStateException(status+"");
36-
}
29+
30+
public synchronized T getAndOp(T x, BinaryOperator<T> op)
31+
throws InterruptedException {
32+
if (parent==null) return getAndOpRoot(x, op);
33+
if (count==0) return getAndOpActive(x, op);
34+
return getAndOpPassive(x, op);
3735
}
3836

39-
// 1. Ensure queue is not full
40-
// 2. Save data at tail.
41-
// 3. Increment tail.
42-
public synchronized T combine(T combined)
43-
throws InterruptedException {
44-
while (locked) wait();
45-
locked = true;
46-
value1 = combined;
47-
switch (status) {
48-
case FIRST:
49-
return value1;
50-
case SECOND:
51-
return value1 + value2;
52-
default:
53-
throw new IllegalStateException(status+"");
54-
}
37+
private synchronized T getAndOpRoot(T x, BinaryOperator<T> op)
38+
throws InterruptedException {
39+
T a = combine(op);
40+
count = 0;
41+
insert(op.apply(a, x));
42+
return a;
5543
}
56-
57-
// 1. Ensure queue is not empty.
58-
// 2. Return data at head.
59-
// 3. Increment head.
60-
public synchronized T op(T combined) {
61-
switch (status) {
62-
case ROOT:
63-
T prior = result;
64-
result += combined;
65-
return prior;
66-
case SECOND:
67-
value2 = combined;
68-
locked = false;
69-
notifyAll(); // wake up waiting threads
70-
while (status != Status.RESULT) wait();
71-
locked = false;
72-
notifyAll();
73-
status = Status.IDLE;
74-
return result;
75-
default:
76-
throw new IllegalStateException(status+"");
44+
45+
private synchronized T getAndOpActive(T x, BinaryOperator<T> op)
46+
throws InterruptedException {
47+
insert(x);
48+
waitUntilFull(TIMEOUT);
49+
state = PUSHING;
50+
T a = combine(op);
51+
T r = parent.getAndOp(a, op);
52+
distribute(r, op);
53+
state = PULLING;
54+
return r;
55+
}
56+
57+
private synchronized T getAndOpPassive(T x, BinaryOperator<T> op)
58+
throws InterruptedException {
59+
int i = insert(x);
60+
while (state!=PULLING) wait();
61+
if (--count==0) state = FREE;
62+
return value[i];
63+
}
64+
65+
66+
public synchronized int insert(T x)
67+
throws InterruptedException {
68+
while (state!=FREE) wait();
69+
int i = count++;
70+
value[i] = x;
71+
if (count==size) notifyAll();
72+
return i;
73+
}
74+
75+
76+
public synchronized T combine(BinaryOperator<T> op) {
77+
T a = value[0];
78+
for (int i=1; i<count; i++)
79+
a = op.apply(a, value[i]);
80+
return a;
81+
}
82+
83+
public synchronized void distribute(T r, BinaryOperator<T> op) {
84+
for (int i=0; i<count; i++) {
85+
value[i] = r;
86+
r = op.apply(r, value[i]);
7787
}
7888
}
7989

80-
public synchronized void distribute(int prior) {
81-
switch (status) {
82-
case FIRST:
83-
status = Status.IDLE;
84-
locked = false;
85-
break;
86-
case SECOND:
87-
result = prior + value1;
88-
status = Status.RESULT;
89-
break;
90-
default:
91-
throw new IllegalStateException(status+"");
90+
91+
private void waitUntilFull(long w)
92+
throws InterruptedException {
93+
long t0 = System.currentTimeMillis();
94+
while (count < size) {
95+
wait(w);
96+
long t = System.currentTimeMillis();
97+
w -= t - t0;
9298
}
93-
notifyAll();
9499
}
95100
}

Status.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)