26
26
import java .util .concurrent .ConcurrentLinkedQueue ;
27
27
import java .util .concurrent .ExecutorService ;
28
28
import java .util .concurrent .Executors ;
29
- import java .util .concurrent .atomic .AtomicBoolean ;
29
+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
30
30
31
31
/**
32
32
* @author Vaclav Pech
@@ -75,7 +75,10 @@ public final void attachToThreadPool(final ExecutorService threadPool) {
75
75
/**
76
76
* Indicates, whether there's an active thread handling a message inside the agent's body
77
77
*/
78
- private final AtomicBoolean active = new AtomicBoolean (false );
78
+ private volatile int active = PASSIVE ;
79
+ private static final AtomicIntegerFieldUpdater <AgentCore > activeUpdater = AtomicIntegerFieldUpdater .newUpdater (AgentCore .class , "active" );
80
+ private static final int PASSIVE = 0 ;
81
+ private static final int ACTIVE = 1 ;
79
82
80
83
/**
81
84
* Adds the message to the agent\s message queue
@@ -108,7 +111,7 @@ public final void leftShift(final Object message) {
108
111
* Schedules processing of a next message, if there are some and if there isn't an active thread handling a message at the moment
109
112
*/
110
113
void schedule () {
111
- if (!queue .isEmpty () && active .compareAndSet (false , true )) {
114
+ if (!queue .isEmpty () && activeUpdater .compareAndSet (this , PASSIVE , ACTIVE )) {
112
115
threadPool .submit (this );
113
116
}
114
117
}
@@ -124,7 +127,7 @@ public void run() {
124
127
} catch (Exception e ) {
125
128
registerError (e );
126
129
} finally {
127
- active .set (false );
130
+ activeUpdater .set (this , PASSIVE );
128
131
schedule ();
129
132
}
130
133
}
0 commit comments