Skip to content

Commit 894da49

Browse files
committed
DataFlowBroadcast added
1 parent c97a714 commit 894da49

File tree

7 files changed

+187
-7
lines changed

7 files changed

+187
-7
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// GPars - Groovy Parallel Systems
2+
//
3+
// Copyright © 2008-10 The original author or authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package groovyx.gpars.dataflow;
18+
19+
import groovyx.gpars.dataflow.stream.DataFlowStream;
20+
import groovyx.gpars.dataflow.stream.DataFlowStreamReadAdapter;
21+
import groovyx.gpars.dataflow.stream.DataFlowStreamWriteAdapter;
22+
23+
/**
24+
* Offers a deterministic one-to-many and many-to-many messaging alternative to DataFlowQueue.
25+
* Internally it wraps a DataFlowStream class with a DataFlowStreamWriteAdapter and so
26+
* synchronizes all writes to the underlying stream allowing multiple threads accessing the stream concurrently.
27+
* On demand through the createReadChannel() method it will return an DataFlowReadChannel through which the reader will receive
28+
* all messages written to the channel since then.
29+
* <p/>
30+
* Typical use:
31+
* <p/>
32+
* DataFlowWriteChannel broadcastStream = new DataFlowBroadcast()
33+
* DataFlowReadChannel stream1 = broadcastStream.createReadChannel()
34+
* DataFlowReadChannel stream2 = broadcastStream.createReadChannel()
35+
* broadcastStream << 'Message'
36+
* assert stream1.val == stream2.val
37+
*
38+
* @param <T> The type of messages to pass through the stream
39+
* @author Vaclav Pech
40+
*/
41+
public final class DataFlowBroadcast<T> extends DataFlowStreamWriteAdapter<T> {
42+
43+
/**
44+
* Creates a new adapter
45+
*/
46+
public DataFlowBroadcast() {
47+
super(new DataFlowStream<T>());
48+
}
49+
50+
@SuppressWarnings({"SynchronizedMethod"})
51+
@Override
52+
public synchronized String toString() {
53+
return "DataFlowBroadcast around " + super.toString();
54+
}
55+
56+
/**
57+
* Retrieves an implementation of DataFlowReadChannel to read all messages submitted to the broadcast chanel.
58+
* Since multiple parties (threads/tasks/actors/...) may ask for read channels independently, the submitted messages are effectively
59+
* broadcast to all the subscribers.
60+
*
61+
* @return A read channel to receive messages submitted to the broadcast channel from now on.
62+
*/
63+
public DataFlowReadChannel<T> createReadChannel() {
64+
return new DataFlowStreamReadAdapter<T>(getHead());
65+
}
66+
}
67+

src/main/groovy/groovyx/gpars/dataflow/stream/DataFlowStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
* @author Johannes Link, Vaclav Pech
4646
*/
4747
@SuppressWarnings({"rawtypes", "TailRecursion", "unchecked", "StaticMethodNamingConvention", "ClassWithTooManyMethods"})
48-
public class DataFlowStream<T> implements FList<T> {
48+
public final class DataFlowStream<T> implements FList<T> {
4949

5050
private final DataFlowVariable<T> first = new DataFlowVariable<T>();
5151
private final AtomicReference<DataFlowStream<T>> rest = new AtomicReference<DataFlowStream<T>>();

src/main/groovy/groovyx/gpars/dataflow/stream/DataFlowStreamReadAdapter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* @param <T> The type of messages to pass through the stream
3535
* @author Vaclav Pech
3636
*/
37-
public class DataFlowStreamReadAdapter<T> implements DataFlowReadChannel<T> {
37+
public final class DataFlowStreamReadAdapter<T> implements DataFlowReadChannel<T> {
3838

3939
private DataFlowStream<T> head;
4040
private DataFlowStream<T> asyncHead;

src/main/groovy/groovyx/gpars/dataflow/stream/DataFlowStreamWriteAdapter.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,21 @@ public DataFlowStreamWriteAdapter(final DataFlowStream<T> stream) {
4141
}
4242

4343
@Override
44-
public synchronized DataFlowWriteChannel<T> leftShift(final T value) {
44+
public final synchronized DataFlowWriteChannel<T> leftShift(final T value) {
4545
head.leftShift(value);
4646
head = (DataFlowStream<T>) head.getRest();
4747
return this;
4848
}
4949

5050
@Override
51-
public synchronized DataFlowWriteChannel<T> leftShift(final DataFlowReadChannel<T> ref) {
51+
public final synchronized DataFlowWriteChannel<T> leftShift(final DataFlowReadChannel<T> ref) {
5252
head.leftShift(ref);
5353
head = (DataFlowStream<T>) head.getRest();
5454
return this;
5555
}
5656

5757
@Override
58-
public synchronized void bind(final T value) {
58+
public final synchronized void bind(final T value) {
5959
head.leftShift(value);
6060
head = (DataFlowStream<T>) head.getRest();
6161
}
@@ -64,5 +64,9 @@ public synchronized void bind(final T value) {
6464
public synchronized String toString() {
6565
return head.toString();
6666
}
67+
68+
protected final synchronized DataFlowStream<T> getHead() {
69+
return head;
70+
}
6771
}
6872

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// GPars - Groovy Parallel Systems
2+
//
3+
// Copyright © 2008-10 The original author or authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package groovyx.gpars.dataflow.stream
18+
19+
import groovyx.gpars.dataflow.DataFlow
20+
import groovyx.gpars.dataflow.DataFlowBroadcast
21+
import groovyx.gpars.dataflow.DataFlowReadChannel
22+
import groovyx.gpars.dataflow.DataFlowWriteChannel
23+
import java.util.concurrent.CyclicBarrier
24+
25+
public class DataFlowStreamBroadCastTest extends GroovyTestCase {
26+
27+
public void testMultipleThreadedWrite() {
28+
final DataFlowWriteChannel writeStream = new DataFlowBroadcast()
29+
final DataFlowReadChannel stream = writeStream.createReadChannel()
30+
31+
final CyclicBarrier barrier = new CyclicBarrier(10)
32+
10.times {value ->
33+
DataFlow.task {
34+
barrier.await()
35+
writeStream << value
36+
}
37+
}
38+
checkResult(stream)
39+
}
40+
41+
public void testMultipleReaders() {
42+
final DataFlowWriteChannel writeStream = new DataFlowBroadcast()
43+
final DataFlowReadChannel stream1 = writeStream.createReadChannel()
44+
final DataFlowReadChannel stream2 = writeStream.createReadChannel()
45+
46+
final CyclicBarrier barrier = new CyclicBarrier(10)
47+
10.times {value ->
48+
DataFlow.task {
49+
barrier.await()
50+
writeStream << value
51+
}
52+
}
53+
checkResult(stream1)
54+
checkResult(stream2)
55+
}
56+
57+
public void testTwoStreams() {
58+
final DataFlowWriteChannel broadcastStream = new DataFlowBroadcast()
59+
final DataFlowReadChannel stream1 = broadcastStream.createReadChannel()
60+
final DataFlowReadChannel stream2 = broadcastStream.createReadChannel()
61+
broadcastStream << 'Message1'
62+
broadcastStream << 'Message2'
63+
broadcastStream << 'Message3'
64+
assert stream1.val == stream2.val
65+
assert stream1.val == stream2.val
66+
assert stream1.val == 'Message3'
67+
assert stream2.val == 'Message3'
68+
}
69+
70+
private def checkResult(DataFlowReadChannel stream) {
71+
def result = (1..10).collect {
72+
stream.val
73+
}.sort()
74+
assert result == (0..9).collect {it}
75+
}
76+
}

src/test/groovy/groovyx/gpars/dataflow/stream/DataFlowStreamWriteAdapterTest.groovy

-2
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@ package groovyx.gpars.dataflow.stream
1818

1919
import groovyx.gpars.dataflow.DataFlow
2020
import groovyx.gpars.dataflow.DataFlowReadChannel
21-
import java.util.concurrent.CountDownLatch
2221
import java.util.concurrent.CyclicBarrier
2322

2423
public class DataFlowStreamWriteAdapterTest extends GroovyTestCase {
2524

2625
public void testMultipleThreadedWrite() {
27-
final CountDownLatch latch = new CountDownLatch(1)
2826
final def original = new DataFlowStream()
2927
final def writeStream = new DataFlowStreamWriteAdapter(original)
3028
final DataFlowReadChannel stream = new DataFlowStreamReadAdapter(original)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// GPars - Groovy Parallel Systems
2+
//
3+
// Copyright © 2008-10 The original author or authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package groovyx.gpars.samples.dataflow
18+
19+
import groovyx.gpars.dataflow.DataFlowBroadcast
20+
import groovyx.gpars.dataflow.DataFlowReadChannel
21+
import groovyx.gpars.dataflow.DataFlowWriteChannel
22+
23+
/**
24+
* Demonstrates the dataflow broadcast stream implementing the deterministic one-to-many or many-to-many message stream
25+
*/
26+
27+
DataFlowWriteChannel broadcastStream = new DataFlowBroadcast()
28+
DataFlowReadChannel stream1 = broadcastStream.createReadChannel()
29+
DataFlowReadChannel stream2 = broadcastStream.createReadChannel()
30+
broadcastStream << 'Message1'
31+
broadcastStream << 'Message2'
32+
broadcastStream << 'Message3'
33+
assert stream1.val == stream2.val
34+
assert stream1.val == stream2.val
35+
assert stream1.val == stream2.val

0 commit comments

Comments
 (0)