Skip to content

Commit 768039a

Browse files
committed
Merge branch 'master' of ssh://git.codehaus.org/gpars
2 parents f00cf98 + 834c0a9 commit 768039a

File tree

2 files changed

+77
-61
lines changed

2 files changed

+77
-61
lines changed

src/main/groovy/groovyx/gpars/actor/AbstractPooledActor.java

+76-60
Original file line numberDiff line numberDiff line change
@@ -28,91 +28,105 @@
2828
import java.util.concurrent.TimeUnit;
2929

3030
/**
31-
* AbstractPooledActor provides the default implementation of a stateful actor. Refer to DynamicDispatchActor or ReactiveActor for examples of stateless actors.
32-
* It represents a standalone active object (actor),
33-
* which reacts asynchronously to messages sent to it from outside through the send() method, which preserving its internal implicit state.
34-
* Each Actor has its own message queue and a thread pool shared with other Actors by means of an instance
35-
* of the PGroup, which they have in common.
36-
* The PGroup instance is responsible for the pool creation, management and shutdown.
37-
* All work performed by an Actor is divided into chunks, which are sequentially submitted as independent tasks
38-
* to the thread pool for processing.
39-
* Whenever an Actor looks for a new message through the react() method, the actor gets detached
40-
* from the thread, making the thread available for other actors. Thanks to the ability to dynamically attach and detach
41-
* threads to actors, Actors can scale far beyond the limits of the underlying platform on number of concurrently
42-
* available threads.
43-
* The receive() method can be used to read a message from the queue without giving up the thread. If no message is available,
44-
* the call to receive() blocks until a message arrives or the supplied timeout expires.
45-
* The loop() method allows to repeatedly invoke a closure and yet perform each of the iterations sequentially
46-
* in different thread from the thread pool.
47-
* To support continuations correctly the react() and loop() methods never return.
31+
* {@code AbstractPooledActor} provides the default implementation of a stateful actor. Refer to {@code
32+
* DynamicDispatchActor} or {@code ReactiveActor} for examples of stateless actors. {@code
33+
* AbstractPooledActor} represents a standalone active object (actor), which reacts asynchronously to
34+
* messages sent to it from outside through the {@code send} method, which preserving its internal implicit
35+
* state. Each {@code Actor} has its own message queue and a thread pool shared with other {@code Actor}s
36+
* by means of an instance of the {@code PGroup}, which they have in common. The {@code PGroup} instance is
37+
* responsible for the pool creation, management and shutdown. All work performed by an {@code Actor} is
38+
* divided into chunks, which are sequentially submitted as independent tasks to the thread pool for
39+
* processing. Whenever an {@code Actor} looks for a new message through the {@code react} method, the
40+
* actor gets detached from the thread, making the thread available for other actors. Thanks to the ability
41+
* to dynamically attach and detach threads to actors, {@code Actors} can scale far beyond the limits of the
42+
* underlying platform on number of concurrently available threads. The {@code receive} method can be used
43+
* to read a message from the queue without giving up the thread. If no message is available, the call to
44+
* {@code receive} blocks until a message arrives or the supplied timeout expires. The {@code loop} method
45+
* allows to repeatedly invoke a closure and yet perform each of the iterations sequentially in different
46+
* thread from the thread pool. To support continuations correctly the {@code react} and {@code loop}
47+
* methods never return.
48+
*
4849
* <pre>
4950
* import static groovyx.gpars.actor.Actors.actor
50-
* <p/>
51+
*
5152
* def actor = actor {
5253
* loop {
53-
* react {message ->
54+
* react { message ->
5455
* println message
5556
* }
56-
* //this line will never be reached
57+
* // This line will never be reached.
5758
* }
58-
* //this line will never be reached
59+
* // This line will never be reached.
5960
* }.start()
60-
* <p/>
61+
*
6162
* actor.send 'Hi!'
6263
* </pre>
64+
* <p>
6365
* This requires the code to be structured accordingly.
64-
* <p/>
66+
* </p>
6567
* <pre>
6668
* def adder = actor {
6769
* loop {
68-
* react {a ->
69-
* react {b ->
70+
* react { a ->
71+
* react { b ->
7072
* println a+b
71-
* replyIfExists a+b //sends reply, if b was sent by a PooledActor
73+
* replyIfExists a+b // Sends reply, if b was sent by a PooledActor.
7274
* }
7375
* }
74-
* //this line will never be reached
76+
* // This line will never be reached.
7577
* }
76-
* //this line will never be reached
78+
* // This line will never be reached.
7779
* }.start()
7880
* </pre>
79-
* The closures passed to the react() method can call reply() or replyIfExists(), which will send a message back to
80-
* the originator of the currently processed message. The replyIfExists() method unlike the reply() method will not fail
81-
* if the original message wasn't sent by an actor nor if the original sender actor is no longer running.
82-
* The reply() and replyIfExists() methods are also dynamically added to the processed messages.
81+
* <p>
82+
* The closures passed to the {@code react} method can call {@code reply} or {@code replyIfExists}, which
83+
* will send a message back to the originator of the currently processed message. The {@code replyIfExists}
84+
* method unlike the {@code reply} method will not fail if the original message wasn't sent by an actor nor
85+
* if the original sender actor is no longer running. The {@code reply} and {@code replyIfExists} methods
86+
* are also dynamically added to the processed messages.
87+
* </p>
8388
* <pre>
84-
* react {a ->
85-
* react {b ->
89+
* react { a ->
90+
* react { b ->
8691
* reply 'message' //sent to senders of a as well as b
8792
* a.reply 'private message' //sent to the sender of a only
8893
* }
8994
* }
9095
* </pre>
91-
* <p/>
92-
* The react() method accepts timeouts as well.
96+
* <p>
97+
* The {@code react} method accepts timeouts as well.
98+
* </p>
9399
* <pre>
94100
* react(10, TimeUnit.MINUTES) {
95101
* println 'Received message: ' + it
96102
* }
97103
* </pre>
98-
* If no message arrives within the given timeout, the onTimeout() lifecycle handler is invoked, if exists,
99-
* and the Actor.TIMEOUT message is returned.
100-
* Each Actor has at any point in time at most one active instance of ActorAction associated, which abstracts
101-
* the current chunk of actor's work to perform. Once a thread is assigned to the ActorAction, it moves the actor forward
102-
* till loop() or react() is called. These methods schedule another ActorAction for processing and throw dedicated exception
103-
* to terminate the current ActorAction.
104-
* <p/>
105-
* Each Actor can define lifecycle observing methods, which will be called by the Actor's background thread whenever a certain lifecycle event occurs.
104+
*<p>
105+
* If no message arrives within the given timeout, the {@code onTimeout} lifecycle handler is invoked, if
106+
* exists, and the {@code Actor.TIMEOUT} message is returned. Each {@code Actor} has at any point in time
107+
* at most one active instance of {@code ActorAction} associated, which abstracts the current chunk of
108+
* actor's work to perform. Once a thread is assigned to the {@code ActorAction}, it moves the actor forward
109+
* till {@code loop} or {@code react} is called. These methods schedule another {@code ActorAction} for
110+
* processing and throw dedicated exception to terminate the current {@code ActorAction}.
111+
* </p>
112+
* <p>
113+
* Each Actor can define lifecycle observing methods, which will be called by the Actor's background thread
114+
* whenever a certain lifecycle event occurs.
115+
* </p>
106116
* <ul>
107-
* <li>afterStart() - called immediately after the Actor's background thread has been started, before the act() method is called the first time.</li>
108-
* <li>afterStop(List undeliveredMessages) - called right after the actor is stopped, passing in all the messages from the queue.</li>
109-
* <li>onInterrupt(InterruptedException e) - called when a react() method timeouts. The actor will be terminated.
110-
* <li>onTimeout() - called when the actor's thread gets interrupted. Thread interruption will result in the stopping the actor in any case.</li>
111-
* <li>onException(Throwable e) - called when an exception occurs in the actor's thread. Throwing an exception from this method will stop the actor.</li>
117+
* <li>{@code afterStart()} - called immediately after the {@code Actor}'s background thread has been
118+
* started, before the {@code act} method is called the first time.</li>
119+
* <li>{@code afterStop(List undeliveredMessages)} - called right after the actor is stopped, passing in all
120+
* the messages from the queue.</li>
121+
* <li>{@code onInterrupt(InterruptedException e)} - called when a {@code react} method timeouts. The actor
122+
* will be terminated.</li>
123+
* <li>{@code onTimeout()} - called when the actor's thread gets interrupted. Thread interruption will
124+
* result in the stopping the actor in any case.</li>
125+
* <li>{@code onException(Throwable e)} - called when an exception occurs in the actor's thread. Throwing an
126+
* exception from this method will stop the actor.</li>
112127
* </ul>
113128
*
114129
* @author Vaclav Pech, Alex Tkachman, Dierk Koenig
115-
* Date: Feb 7, 2009
116130
*/
117131
@Deprecated
118132
@SuppressWarnings({"ThrowCaughtLocally", "UnqualifiedStaticUsage"})
@@ -123,20 +137,22 @@ public abstract class AbstractPooledActor extends SequentialProcessingActor {
123137
private static final long serialVersionUID = -6232655362494852540L;
124138

125139
/**
126-
* This method represents the body of the actor. It is called upon actor's start and can exit either normally
127-
* by return or due to actor being stopped through the stop() method, which cancels the current actor action.
128-
* Provides an extension point for subclasses to provide their custom Actor's message handling code.
140+
* This method represents the body of the actor. It is called upon actor's start and can exit either
141+
* normally by return or due to actor being stopped through the stop() method, which cancels the current
142+
* actor action. Provides an extension point for subclasses to provide their custom {@code Actor}'s
143+
* message handling code.
129144
*/
130145
protected abstract void act();
131146

132147
/**
133-
* Adds reply() and replyIfExists() methods to the currentActor and the message.
134-
* These methods will call send() on the target actor (the sender of the original message).
135-
* The reply()/replyIfExists() methods invoked on the actor will be sent to all currently processed messages,
136-
* reply()/replyIfExists() invoked on a message will send a reply to the sender of that particular message only.
148+
* Adds {@code reply} and {@code replyIfExists} methods to the current {@code Actor} and the message.
149+
* These methods will call {@code send} on the target actor (the sender of the original message). The
150+
* {@code reply}/{@code replyIfExists} methods invoked on the actor will be sent to all currently
151+
* processed messages, {@code reply}/{@code replyIfExists} invoked on a message will send a reply to the
152+
* sender of that particular message only.
137153
*
138-
* @param messages List of ActorMessage wrapping the sender actor, who we need to be able to respond to,
139-
* plus the original message
154+
* @param messages List of {@code ActorMessage} wrapping the sender actor, who we need to be able to
155+
* respond to, plus the original message
140156
*/
141157
private void enhanceReplies(final Iterable<ActorMessage> messages) {
142158
final List<MessageStream> senders = getSenders();
@@ -167,7 +183,7 @@ protected final Object receiveImpl() throws InterruptedException {
167183
* Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
168184
*
169185
* @param timeout how long to wait before giving up, in units of unit
170-
* @param units a TimeUnit determining how to interpret the timeout parameter
186+
* @param units a {@code TimeUnit} determining how to interpret the timeout parameter
171187
* @return The message retrieved from the queue, or null, if the timeout expires.
172188
* @throws InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
173189
*/

src/main/groovy/groovyx/gpars/group/DefaultPGroup.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* def group = new DefaultPGroup()
3333
* group.resize 1
3434
* def actor = group.actor {
35-
* react {message ->
35+
* react { message ->
3636
* println message
3737
* }
3838
* }.start()

0 commit comments

Comments
 (0)