diff --git a/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/LocalThrottlingServiceImpl.java b/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/LocalThrottlingServiceImpl.java index 3cfbdf39cbc..85b042d82f2 100644 --- a/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/LocalThrottlingServiceImpl.java +++ b/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/LocalThrottlingServiceImpl.java @@ -19,7 +19,6 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.opensmartgridplatform.adapter.protocol.dlms.application.config.annotation.LocalThrottlingServiceCondition; -import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum; import org.opensmartgridplatform.throttling.ThrottlingPermitDeniedException; import org.opensmartgridplatform.throttling.api.Permit; import org.slf4j.Logger; @@ -31,6 +30,9 @@ @Component @Conditional(LocalThrottlingServiceCondition.class) public class LocalThrottlingServiceImpl implements ThrottlingService { + + private static final long WAIT_FOR_LOCK = 10; + private final AtomicInteger requestIdCounter = new AtomicInteger(0); private static final Logger LOGGER = LoggerFactory.getLogger(LocalThrottlingServiceImpl.class); @@ -38,8 +40,8 @@ public class LocalThrottlingServiceImpl implements ThrottlingService { private final int maxOpenConnections; private final int maxNewConnectionRequests; - @Value("${throttling.max.wait.high.prio}") - private int maxWaitForHighPrioInMs; + @Value("${throttling.max.wait.for.permit}") + private int maxWaitForPermitInMs; @Value("${throttling.max.new.connection.reset.time}") private int maxNewConnectionResetTime; @@ -126,18 +128,7 @@ private void requestPermit( LOGGER.debug("{}. available = {} ", permitDescription, semaphore.availablePermits()); try { - if (semaphore.availablePermits() == 0) { - if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) { - throw new ThrottlingPermitDeniedException( - permitDescription + ": no available permits", priority); - } else { - LOGGER.debug( - "{} wait for available permit for request with priority {}", - permitDescription, - priority); - } - } - if (!semaphore.tryAcquire(this.maxWaitForHighPrioInMs, TimeUnit.MILLISECONDS)) { + if (!semaphore.tryAcquire(this.maxWaitForPermitInMs, TimeUnit.MILLISECONDS)) { throw new ThrottlingPermitDeniedException( permitDescription + ": could not acquire permit for request with priority " + priority, priority); @@ -149,15 +140,17 @@ private void requestPermit( } } - private synchronized void awaitReset() { + private void awaitReset() { LOGGER.debug( "Await reset for newConnection. available = {} ", this.newConnectionRequestsSemaphore.availablePermits()); while (this.resetTimerLock.isLocked()) { try { - LOGGER.info("Wait {}ms while reset timer is locked", this.maxNewConnectionResetTime); - this.resetTimerLock.wait(this.maxNewConnectionResetTime); + LOGGER.info("Wait {}ms while reset timer is locked", WAIT_FOR_LOCK); + synchronized (this.requestIdCounter) { + this.requestIdCounter.wait(WAIT_FOR_LOCK); + } } catch (final InterruptedException e) { LOGGER.warn("Unable to acquire New Connection Request Lock", e); Thread.currentThread().interrupt(); diff --git a/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/resources/osgp-adapter-protocol-dlms.properties b/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/resources/osgp-adapter-protocol-dlms.properties index f9535385f30..80241fdffe4 100644 --- a/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/resources/osgp-adapter-protocol-dlms.properties +++ b/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/resources/osgp-adapter-protocol-dlms.properties @@ -194,7 +194,7 @@ throttling.rejected.max.delay=PT70S throttling.rejected.high.prio.delay=PT2S # Configuration for the throttling service on the GPRS network throttling.max.open.connections=1000 -throttling.max.wait.high.prio=10000 +throttling.max.wait.for.permit=60000 throttling.max.new.connection.reset.time=1000 throttling.max.new.connection.requests=30 diff --git a/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/test/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/LocalThrottlingServiceImplTest.java b/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/test/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/LocalThrottlingServiceImplTest.java index a58372ee0a1..9830182eb45 100644 --- a/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/test/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/LocalThrottlingServiceImplTest.java +++ b/osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/test/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/LocalThrottlingServiceImplTest.java @@ -21,7 +21,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum; import org.opensmartgridplatform.throttling.ThrottlingPermitDeniedException; import org.opensmartgridplatform.throttling.api.Permit; import org.slf4j.Logger; @@ -37,7 +36,7 @@ class LocalThrottlingServiceImplTest { private static final Integer MAX_NEW_CONNECTION_REQUESTS = 10; private static final Integer MAX_OPEN_CONNECTIONS = MAX_NEW_CONNECTION_REQUESTS * 2; private static final Integer MAX_NEW_CONNECTION_RESET_TIME = 200; - private static final Integer MAX_WAIT_FOR_HIGH_PRIO = 500; + private static final Integer MAX_WAIT_FOR_PERMIT = 500; private static final Integer CLEANUP_PERMITS_INTERVAL = 200; private static final Duration PERMIT_TTL = Duration.of(2, ChronoUnit.SECONDS); @@ -50,7 +49,7 @@ void setUp() { ReflectionTestUtils.setField( this.throttlingService, "maxNewConnectionResetTime", MAX_NEW_CONNECTION_RESET_TIME); ReflectionTestUtils.setField( - this.throttlingService, "maxWaitForHighPrioInMs", MAX_WAIT_FOR_HIGH_PRIO); + this.throttlingService, "maxWaitForPermitInMs", MAX_WAIT_FOR_PERMIT); ReflectionTestUtils.setField( this.throttlingService, "cleanupExpiredPermitsInterval", CLEANUP_PERMITS_INTERVAL); ReflectionTestUtils.setField(this.throttlingService, "timeToLive", PERMIT_TTL); @@ -60,6 +59,8 @@ void setUp() { @ParameterizedTest @ValueSource(ints = {4, 5}) void testThrottlingOpenConnections(final int priority) throws InterruptedException { + ReflectionTestUtils.setField(this.throttlingService, "maxWaitForPermitInMs", 0); + // Claim 10 final List firstBatch = this.requestPermitLowPrio(MAX_NEW_CONNECTION_REQUESTS); // Sleep longer than reset time @@ -93,19 +94,10 @@ void testThrottlingMaxNewConnections(final int priority) { final List permits = this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority); this.assertAvailableNewConnections(0); - this.releasePermitWithDelay(permits, MAX_WAIT_FOR_HIGH_PRIO / 2); + this.releasePermitWithDelay(permits, MAX_WAIT_FOR_PERMIT / 2); - final int nrOfOpenConnections; - if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) { - assertThrows( - ThrottlingPermitDeniedException.class, - () -> this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority)); - nrOfOpenConnections = MAX_NEW_CONNECTION_REQUESTS; - } else { - // high prio will wait for connection - this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority); - nrOfOpenConnections = MAX_NEW_CONNECTION_REQUESTS * 2; - } + this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority); + final int nrOfOpenConnections = MAX_NEW_CONNECTION_REQUESTS * 2; this.assertPermitsInMemory(nrOfOpenConnections); this.assertAvailablePermits(MAX_OPEN_CONNECTIONS - nrOfOpenConnections); @@ -154,6 +146,43 @@ void testCleanupPermit() throws InterruptedException { this.assertAvailablePermits(MAX_OPEN_CONNECTIONS - 1); } + @ParameterizedTest + @ValueSource(ints = {0, 600, 1200}) + void testPermitMaxWait(final int releaseDelay) throws InterruptedException { + final int priority = 4; + // Claim 10 + final List firstBatch = this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority); + // Sleep longer than reset time + Thread.sleep(MAX_NEW_CONNECTION_RESET_TIME + 100); + // Next 10 + this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority); + // Sleep longer than reset time + Thread.sleep(MAX_NEW_CONNECTION_RESET_TIME + 100); + + final long startTime = System.currentTimeMillis(); + if (releaseDelay == 0) { + this.releasePermit(List.of(firstBatch.get(0))); + this.assertPermitsInMemory(MAX_OPEN_CONNECTIONS - 1); + this.assertAvailablePermits(1); + } else { + this.releasePermitWithDelay(List.of(firstBatch.get(0)), releaseDelay); + this.assertPermitsInMemory(MAX_OPEN_CONNECTIONS); + this.assertAvailablePermits(0); + } + + if (releaseDelay > MAX_WAIT_FOR_PERMIT) { + assertThrows(ThrottlingPermitDeniedException.class, () -> this.requestPermit(1, priority)); + assertThat(System.currentTimeMillis() - startTime) + .isGreaterThanOrEqualTo(MAX_WAIT_FOR_PERMIT); + } else { + this.requestPermit(1, priority); + assertThat(System.currentTimeMillis() - startTime).isGreaterThanOrEqualTo(releaseDelay); + } + + this.assertPermitsInMemory(MAX_OPEN_CONNECTIONS); + this.assertAvailablePermits(0); + } + private List requestPermitLowPrio(final int requests) { return this.requestPermit(requests, 3); }