1
1
/*
2
- * Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
2
+ * Copyright 2020-2025 Exactpro (Exactpro Systems Limited)
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
20
20
import java .util .PriorityQueue ;
21
21
import java .util .Queue ;
22
22
import java .util .Set ;
23
+ import java .util .concurrent .atomic .AtomicLong ;
23
24
import java .util .concurrent .locks .Condition ;
24
25
import java .util .concurrent .locks .Lock ;
25
26
import java .util .concurrent .locks .ReentrantLock ;
@@ -32,7 +33,7 @@ public class BlockingScheduledRetryableTaskQueue<V> {
32
33
private final Queue <ScheduledRetryableTask <V >> taskQueue ;
33
34
private final Set <ScheduledRetryableTask <V >> taskSet ;
34
35
35
- private volatile long dataSize ;
36
+ private final AtomicLong dataSize ;
36
37
private final Lock lock ;
37
38
private final Condition addition ;
38
39
private final Condition removal ;
@@ -67,6 +68,7 @@ public BlockingScheduledRetryableTaskQueue(int maxTaskCount, long maxDataSize, R
67
68
68
69
taskQueue = new PriorityQueue <>(ScheduledRetryableTask ::compareOrder );
69
70
taskSet = new HashSet <>();
71
+ dataSize = new AtomicLong (0 );
70
72
lock = new ReentrantLock ();
71
73
addition = lock .newCondition ();
72
74
removal = lock .newCondition ();
@@ -87,9 +89,9 @@ public void submit(ScheduledRetryableTask<V> task) {
87
89
throw new IllegalStateException ("Task has been already submitted" );
88
90
89
91
while (true ) {
90
- long capacityLeft = maxDataSize - dataSize ;
92
+ long capacityLeft = maxDataSize - dataSize . get () ;
91
93
if (capacityLeft >= task .getPayloadSize () && taskSet .size () < maxTaskCount ) {
92
- dataSize += task .getPayloadSize ();
94
+ dataSize . addAndGet ( task .getPayloadSize () );
93
95
addTask (task );
94
96
break ;
95
97
} else {
@@ -141,7 +143,7 @@ public void complete(ScheduledRetryableTask<V> task) {
141
143
throw new IllegalStateException ("Task to complete has not been submitted previously" );
142
144
taskSet .remove (task );
143
145
144
- dataSize -= task .getPayloadSize ();
146
+ dataSize . addAndGet (- task .getPayloadSize () );
145
147
removal .signalAll ();
146
148
} finally {
147
149
lock .unlock ();
@@ -165,7 +167,7 @@ public ScheduledRetryableTask<V> take() {
165
167
lock .lock ();
166
168
try {
167
169
while (true ) {
168
- if (taskQueue .size () > 0 )
170
+ if (! taskQueue .isEmpty () )
169
171
return taskQueue .poll ();
170
172
else
171
173
addition .awaitUninterruptibly ();
@@ -195,8 +197,8 @@ public ScheduledRetryableTask<V> awaitScheduled() throws InterruptedException {
195
197
lock .lock ();
196
198
try {
197
199
while (true ) {
198
- if (taskQueue .size () == 0 )
199
- addition .awaitUninterruptibly ();
200
+ if (taskQueue .isEmpty () )
201
+ addition .await ();
200
202
else {
201
203
ScheduledRetryableTask <V > job = taskQueue .peek ();
202
204
long now = System .nanoTime ();
@@ -279,7 +281,7 @@ public void setMaxDataSize(long value) {
279
281
public long getUsedDataSize () {
280
282
lock .lock ();
281
283
try {
282
- return dataSize ;
284
+ return dataSize . get () ;
283
285
} finally {
284
286
lock .unlock ();
285
287
}
0 commit comments