Skip to content

Commit 9536d5f

Browse files
committed
Removed non-daemon dataflow parallel group
1 parent 29d8e36 commit 9536d5f

30 files changed

+364
-397
lines changed

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,8 @@ task demo(type: DemoTask, dependsOn: 'compileGroovy') {
277277
'DemoSieveEratosthenesTheGoWay', //Never stops
278278
'DemoSieveEratosthenesTheGoWayWithOperators', //Never stops
279279
'DemoThreading', //Never stops
280+
'DemoProducerConsumer1', //Never stops
281+
'DemoPhysicalCalculations', //Needs user input
280282
'DemoFibonacci1', //Needs classes from its source folder
281283
'DemoFibonacci2', //Needs classes from its source folder
282284
'DemoNumbers', //Needs classes from its source folder

grails-doc/src/guide/2.6 What's new.gdoc

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ h3. Dataflow
4141
* Fixed issues with reusing the same parallel group in the DataFlowVariable whenBound() handlers
4242
* Tasks can return values and can be joined
4343
* DataFlowQueue's _whenBound_ renamed to _wheneverBound_ and _whenNextBound_ renamed to _whenBound_ to obey the _DataFlowChannel_ interface contract
44+
* DataFlow default parallel group has been made daemon
4445

4546
h3. Agent
4647

grails-doc/src/guide/7. Dataflow Concurrency.gdoc

+5-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ final def z = new DataFlowVariable()
1212

1313
task {
1414
z << x.val + y.val
15-
println "Result: ${z.val}"
1615
}
1716

1817
task {
@@ -22,6 +21,8 @@ task {
2221
task {
2322
y << 5
2423
}
24+
25+
println "Result: ${z.val}"
2526
{code}
2627

2728
Or the same algorithm rewritten using the _DataFlows_ class.
@@ -34,7 +35,6 @@ final def df = new DataFlows()
3435

3536
task {
3637
df.z = df.x + df.y
37-
println "Result: ${df.z}"
3838
}
3939

4040
task {
@@ -44,6 +44,9 @@ task {
4444
task {
4545
df.y = 5
4646
}
47+
48+
println "Result: ${df.z}"
49+
4750
{code}
4851

4952
We start three logical tasks, which can run in parallel and perform their particular activities. The tasks need to exchange data and they do so using *Dataflow Variables*.

grails-doc/src/guide/7.1 Tasks.gdoc

+7-6
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ task {
4747
it.val.toUpperCase().contains 'GROOVY'
4848
}).size()
4949
}
50-
System.exit 0
51-
}
50+
}.join()
5251
{code}
5352

5453
h3. Grouping tasks
@@ -73,7 +72,7 @@ group.with {
7372
{code}
7473

7574
{note:Title=Custom thread pools for dataflow}
76-
The default thread pool for dataflow tasks contains non-daemon threads, which means your application will not exit before all tasks complete.
75+
The default thread pool for dataflow tasks contains daemon threads, which means your application will exit as soon as the main thread finishes and won't wait for all tasks to complete.
7776
When grouping tasks, make sure that your custom thread pools either use daemon threads, too, which can be achieved by
7877
using DefaultPGroup or by providing your own thread factory to a thread pool constructor,
7978
or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method,
@@ -104,8 +103,7 @@ task {
104103
it.val.toUpperCase().contains 'GROOVY'
105104
}).size()
106105
}
107-
System.exit 0
108-
}
106+
}.join()
109107

110108
def downloadPage(def url) {
111109
def page = new DataFlowVariable()
@@ -138,7 +136,7 @@ final def decelerationForce = new DataFlowVariable()
138136
final def deceleration = new DataFlowVariable()
139137
final def distance = new DataFlowVariable()
140138

141-
task {
139+
def t = task {
142140
println """
143141
Calculating distance required to stop a moving ball.
144142
====================================================
@@ -190,6 +188,9 @@ task {
190188
task {
191189
distance << deceleration.val * ((velocity.val/deceleration.val) ** 2) * 0.5
192190
}
191+
192+
t.join()
193+
193194
{code}Note: I did my best to make all the physical calculations right. Feel free to change the values and see how long distance you need to stop the rolling ball.
194195

195196
h2. Deterministic deadlocks

grails-doc/src/guide/7.3 Operators.gdoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ group.with {
235235
{code}
236236

237237
{note:Title=Custom thread pools for dataflow}
238-
The default thread pool for dataflow operators contains non-daemon threads, which means your application will not exit before all operators are stopped.
238+
The default thread pool for dataflow operators contains daemon threads, which means your application will exit as soon as the main thread finishes and won't wait for all tasks to complete.
239239
When grouping operators, make sure that your custom thread pools either use daemon threads, too, which can be achieved by
240240
using DefaultPGroup or by providing your own thread factory to a thread pool constructor,
241241
or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method,

src/main/groovy/groovyx/gpars/dataflow/DataFlow.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import groovy.lang.Closure;
2020
import groovyx.gpars.actor.Actor;
2121
import groovyx.gpars.dataflow.operator.DataFlowProcessor;
22+
import groovyx.gpars.group.DefaultPGroup;
2223
import groovyx.gpars.group.PGroup;
24+
import groovyx.gpars.scheduler.ResizeablePool;
2325

2426
import java.util.List;
2527
import java.util.Map;
@@ -37,7 +39,7 @@ public abstract class DataFlow {
3739
/**
3840
* The parallel group used by all Dataflow Concurrency actors by default.
3941
*/
40-
public static final DataFlowPGroup DATA_FLOW_GROUP = new DataFlowPGroup(1);
42+
public static final PGroup DATA_FLOW_GROUP = new DefaultPGroup(new ResizeablePool(true, 1));
4143

4244
/**
4345
* Maps threads/tasks to parallel groups they belong to

src/main/groovy/groovyx/gpars/dataflow/DataFlowPGroup.java

-45
This file was deleted.

src/test/groovy/groovyx/gpars/samples/dataflow/BenchmarkManyDataFlowVariables.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package groovyx.gpars.samples.dataflow
1919
import groovyx.gpars.dataflow.DataFlowVariable
2020
import java.util.concurrent.Executors
2121

22-
final many = 1..(limit)
22+
final many = 1..(100)
2323

2424
List dfs = many.collect { new DataFlowVariable() }
2525
def result = new DataFlowVariable()

src/test/groovy/groovyx/gpars/samples/dataflow/DataFlowDemo2.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,4 @@ def y = new DataFlowVariable<List<Integer>>()
4141

4242
task { x << ints(0, 500) }
4343
task { y << sum(0, x.val) }
44-
task { println("List of sums: " + y.val); System.exit(0) }
44+
task { println("List of sums: " + y.val); System.exit(0) }.join()

src/test/groovy/groovyx/gpars/samples/dataflow/DataFlowDemo3.groovy

+1-2
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,4 @@ task {
4040
task {
4141
Thread.sleep(1000)
4242
println "Sum: ${producer.collect {it * it}.inject(0) {sum, x -> sum + x}}"
43-
System.exit 0
44-
}
43+
}.join()

src/test/groovy/groovyx/gpars/samples/dataflow/DataFlowDemo6.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,4 @@ task {
3939

4040
task {
4141
y << 5
42-
}
42+
}.join()

src/test/groovy/groovyx/gpars/samples/dataflow/DemoAvoidPotentialDeadlock1.groovy

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import groovyx.gpars.scheduler.ResizeablePool
2727
* @author Vaclav Pech
2828
*/
2929

30-
final def group = new DefaultPGroup(new ResizeablePool(false))
30+
final def group = new DefaultPGroup(new ResizeablePool(true))
3131

3232
final def a = new DataFlowVariable()
3333
final def b = new DataFlowVariable()
@@ -39,14 +39,14 @@ group.with {
3939

4040
task {
4141
println "Result: ${b.val}"
42-
System.exit 0
42+
group.shutdown()
4343
}
4444

4545
Thread.sleep 2000
4646

4747
task {
4848
a << 10
4949
}
50+
b.join()
5051
}
5152

52-
group.shutdown()
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,58 @@
1-
// GPars - Groovy Parallel Systems
2-
//
3-
// Copyright © 2008-10 The original author or authors
4-
//
5-
// Licensed under the Apache License, Version 2.0 (the "License");
6-
// you may not use this file except in compliance with the License.
7-
// You may obtain a copy of the License at
8-
//
9-
// http://www.apache.org/licenses/LICENSE-2.0
10-
//
11-
// Unless required by applicable law or agreed to in writing, software
12-
// distributed under the License is distributed on an "AS IS" BASIS,
13-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
// See the License for the specific language governing permissions and
15-
// limitations under the License.
16-
17-
package groovyx.gpars.samples.dataflow
18-
19-
import groovyx.gpars.dataflow.DataFlowVariable
20-
import groovyx.gpars.group.DefaultPGroup
21-
import groovyx.gpars.group.PGroup
22-
import groovyx.gpars.scheduler.DefaultPool
23-
24-
/**
25-
* Demonstrates deadlock prevention using the whenBound (>>) dataflow construct. Instead of blocking the task
26-
* together with its underlying thread on a read call to DataFlowVariable.val, the whenBound handler will be invoked only
27-
* after the value of the DataFlowVariable is set and so the reader doesn't consume a thread while waiting for a value
28-
* to arrive.
29-
* The code would end up deadlocked if we blocked the threads during reads, since the first two tasks
30-
* wait for each other to bind values to a and b. Only the third thread can unlock the two threads by setting value of a.
31-
*
32-
* @author Vaclav Pech
33-
*/
34-
35-
final PGroup group = new DefaultPGroup(new DefaultPool(false, 1))
36-
37-
final def a = new DataFlowVariable()
38-
final def b = new DataFlowVariable()
39-
40-
group.with {
41-
task {
42-
a >> {b << 20 + it}
43-
}
44-
45-
task {
46-
b >> {
47-
println "Result: ${it}"
48-
System.exit 0
49-
}
50-
}
51-
52-
Thread.sleep 2000
53-
54-
task {
55-
a << 10
56-
}
57-
1+
// GPars - Groovy Parallel Systems
2+
//
3+
// Copyright © 2008-10 The original author or authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package groovyx.gpars.samples.dataflow
18+
19+
import groovyx.gpars.dataflow.DataFlowVariable
20+
import groovyx.gpars.group.DefaultPGroup
21+
import groovyx.gpars.group.PGroup
22+
import groovyx.gpars.scheduler.DefaultPool
23+
24+
/**
25+
* Demonstrates deadlock prevention using the whenBound (>>) dataflow construct. Instead of blocking the task
26+
* together with its underlying thread on a read call to DataFlowVariable.val, the whenBound handler will be invoked only
27+
* after the value of the DataFlowVariable is set and so the reader doesn't consume a thread while waiting for a value
28+
* to arrive.
29+
* The code would end up deadlocked if we blocked the threads during reads, since the first two tasks
30+
* wait for each other to bind values to a and b. Only the third thread can unlock the two threads by setting value of a.
31+
*
32+
* @author Vaclav Pech
33+
*/
34+
35+
final PGroup group = new DefaultPGroup(new DefaultPool(true, 1))
36+
37+
final def a = new DataFlowVariable()
38+
final def b = new DataFlowVariable()
39+
40+
group.with {
41+
task {
42+
a >> {b << 20 + it}
43+
}
44+
45+
task {
46+
b >> {
47+
println "Result: ${it}"
48+
group.shutdown()
49+
}
50+
}
51+
52+
Thread.sleep 2000
53+
54+
task {
55+
a << 10
56+
}
57+
b.join()
5858
}

src/test/groovy/groovyx/gpars/samples/dataflow/DemoDataFlowQueueIteration.groovy

-1
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,3 @@ println 'Reading from the stream'
5050
println ''
5151
println "The stream is now empty. Length = ${stream.length()}"
5252

53-
System.exit 0

0 commit comments

Comments
 (0)