diff --git a/gradle.properties b/gradle.properties index 6f23e53..9cca801 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,19 +1,3 @@ -# -# Copyright 2022 Exactpro (Exactpro Systems Limited) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -release_version = 0.1.2 +release_version = 0.1.3 description = "Task managenet utility classes" vcs_url = https://github.com/th2-net/th2-task-utils \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/taskutils/BlockingScheduledRetryableTaskQueue.java b/src/main/java/com/exactpro/th2/taskutils/BlockingScheduledRetryableTaskQueue.java index fe891c3..c942181 100644 --- a/src/main/java/com/exactpro/th2/taskutils/BlockingScheduledRetryableTaskQueue.java +++ b/src/main/java/com/exactpro/th2/taskutils/BlockingScheduledRetryableTaskQueue.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 Exactpro (Exactpro Systems Limited) + * Copyright 2020-2025 Exactpro (Exactpro Systems Limited) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -32,7 +33,7 @@ public class BlockingScheduledRetryableTaskQueue { private final Queue> taskQueue; private final Set> taskSet; - private volatile long dataSize; + private final AtomicLong dataSize; private final Lock lock; private final Condition addition; private final Condition removal; @@ -67,6 +68,7 @@ public BlockingScheduledRetryableTaskQueue(int maxTaskCount, long maxDataSize, R taskQueue = new PriorityQueue<>(ScheduledRetryableTask::compareOrder); taskSet = new HashSet<>(); + dataSize = new AtomicLong(0); lock = new ReentrantLock(); addition = lock.newCondition(); removal = lock.newCondition(); @@ -87,9 +89,9 @@ public void submit(ScheduledRetryableTask task) { throw new IllegalStateException("Task has been already submitted"); while (true) { - long capacityLeft = maxDataSize - dataSize; + long capacityLeft = maxDataSize - dataSize.get(); if (capacityLeft >= task.getPayloadSize() && taskSet.size() < maxTaskCount) { - dataSize += task.getPayloadSize(); + dataSize.addAndGet(task.getPayloadSize()); addTask(task); break; } else { @@ -141,7 +143,7 @@ public void complete(ScheduledRetryableTask task) { throw new IllegalStateException("Task to complete has not been submitted previously"); taskSet.remove(task); - dataSize -= task.getPayloadSize(); + dataSize.addAndGet(-task.getPayloadSize()); removal.signalAll(); } finally { lock.unlock(); @@ -165,7 +167,7 @@ public ScheduledRetryableTask take() { lock.lock(); try { while (true) { - if (taskQueue.size() > 0) + if (!taskQueue.isEmpty()) return taskQueue.poll(); else addition.awaitUninterruptibly(); @@ -195,8 +197,8 @@ public ScheduledRetryableTask awaitScheduled() throws InterruptedException { lock.lock(); try { while (true) { - if (taskQueue.size() == 0) - addition.awaitUninterruptibly(); + if (taskQueue.isEmpty()) + addition.await(); else { ScheduledRetryableTask job = taskQueue.peek(); long now = System.nanoTime(); @@ -279,7 +281,7 @@ public void setMaxDataSize(long value) { public long getUsedDataSize() { lock.lock(); try { - return dataSize; + return dataSize.get(); } finally { lock.unlock(); }