diff --git a/osgp/platform/osgp-throttling-service/pom.xml b/osgp/platform/osgp-throttling-service/pom.xml
index 3b3fa8f85e4..2e3e0a39ec9 100644
--- a/osgp/platform/osgp-throttling-service/pom.xml
+++ b/osgp/platform/osgp-throttling-service/pom.xml
@@ -129,7 +129,6 @@ SPDX-License-Identifier: Apache-2.0
org.postgresql
postgresql
- runtime
org.projectlombok
diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java
index 25cb232fd8e..88d09dfccd1 100644
--- a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java
+++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java
@@ -13,6 +13,7 @@
import org.opensmartgridplatform.throttling.entities.ThrottlingConfig;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
import org.opensmartgridplatform.throttling.repositories.ThrottlingConfigRepository;
+import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@@ -28,15 +29,21 @@ public class PermitsByThrottlingConfig {
private final ThrottlingConfigRepository throttlingConfigRepository;
private final PermitRepository permitRepository;
+ private final PermitReleasedNotifier permitReleasedNotifier;
+ private final boolean highPrioPoolEnabled;
private final int maxWaitForHighPrioInMs;
public PermitsByThrottlingConfig(
final ThrottlingConfigRepository throttlingConfigRepository,
final PermitRepository permitRepository,
- @Value("${max.wait.for.high.prio.in.ms:5000}") final int maxWaitForHighPrioInMs) {
+ final PermitReleasedNotifier permitReleasedNotifier,
+ @Value("${wait.for.high.prio.enabled:true}") final boolean highPrioPoolEnabled,
+ @Value("${wait.for.high.prio.max.in.ms:10000}") final int maxWaitForHighPrioInMs) {
this.throttlingConfigRepository = throttlingConfigRepository;
this.permitRepository = permitRepository;
+ this.permitReleasedNotifier = permitReleasedNotifier;
+ this.highPrioPoolEnabled = highPrioPoolEnabled;
this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs;
}
@@ -54,7 +61,11 @@ public void initialize() {
throttlingConfigId ->
this.permitsPerSegmentByConfig.putIfAbsent(
throttlingConfigId,
- new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs)));
+ new PermitsPerNetworkSegment(
+ this.permitRepository,
+ this.permitReleasedNotifier,
+ this.highPrioPoolEnabled,
+ this.maxWaitForHighPrioInMs)));
/* Update config */
this.permitsPerSegmentByConfig.entrySet().parallelStream()
@@ -100,7 +111,11 @@ public boolean requestPermit(
private PermitsPerNetworkSegment createAndInitialize(final short throttlingConfigId) {
final PermitsPerNetworkSegment permitsPerNetworkSegment =
- new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs);
+ new PermitsPerNetworkSegment(
+ this.permitRepository,
+ this.permitReleasedNotifier,
+ this.highPrioPoolEnabled,
+ this.maxWaitForHighPrioInMs);
permitsPerNetworkSegment.initialize(throttlingConfigId);
return permitsPerNetworkSegment;
}
@@ -113,7 +128,11 @@ public void newThrottlingConfigCreated(final short throttlingConfigId) {
*/
this.permitsPerSegmentByConfig.putIfAbsent(
throttlingConfigId,
- new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs));
+ new PermitsPerNetworkSegment(
+ this.permitRepository,
+ this.permitReleasedNotifier,
+ this.highPrioPoolEnabled,
+ this.maxWaitForHighPrioInMs));
}
public boolean releasePermit(
diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java
index 0f43113dc4e..d84e83bac6c 100644
--- a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java
+++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java
@@ -10,23 +10,34 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
import org.opensmartgridplatform.throttling.repositories.PermitRepository.PermitCountByNetworkSegment;
+import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PermitsPerNetworkSegment {
private static final Logger LOGGER = LoggerFactory.getLogger(PermitsPerNetworkSegment.class);
+ private static final int WAIT_TIME = 1000;
+
private final ConcurrentMap> permitsPerSegment =
new ConcurrentHashMap<>();
private final PermitRepository permitRepository;
+ private final PermitReleasedNotifier permitReleasedNotifier;
+ private final boolean highPrioPoolEnabled;
private final int maxWaitForHighPrioInMs;
public PermitsPerNetworkSegment(
- final PermitRepository permitRepository, final int maxWaitForHighPrioInMs) {
+ final PermitRepository permitRepository,
+ final PermitReleasedNotifier permitReleasedNotifier,
+ final boolean highPrioPoolEnabled,
+ final int maxWaitForHighPrioInMs) {
this.permitRepository = permitRepository;
+ this.permitReleasedNotifier = permitReleasedNotifier;
+ this.highPrioPoolEnabled = highPrioPoolEnabled;
this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs;
}
@@ -97,15 +108,7 @@ public boolean requestPermit(
final int requestId,
final int priority,
final int maxConcurrency) {
-
- final AtomicInteger permitCounter =
- this.permitsPerSegment
- .computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>())
- .computeIfAbsent(cellId, key -> new AtomicInteger(0));
-
- final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
- if (numberOfPermitsIfGranted > maxConcurrency) {
- permitCounter.decrementAndGet();
+ if (!this.isPermitAvailable(baseTransceiverStationId, cellId, priority, maxConcurrency)) {
return false;
}
@@ -120,10 +123,7 @@ public boolean releasePermit(
final int cellId,
final int requestId) {
- final AtomicInteger permitCounter =
- this.permitsPerSegment
- .getOrDefault(baseTransceiverStationId, new ConcurrentHashMap<>())
- .getOrDefault(cellId, new AtomicInteger(0));
+ final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);
final int numberOfPermitsIfReleased = permitCounter.decrementAndGet();
if (numberOfPermitsIfReleased < 0) {
@@ -134,9 +134,70 @@ public boolean releasePermit(
this.permitRepository.releasePermit(
throttlingConfigId, clientId, baseTransceiverStationId, cellId, requestId);
+ if (this.highPrioPoolEnabled) {
+ this.permitReleasedNotifier.notifyPermitReleased(baseTransceiverStationId, cellId);
+ }
+
return numberOfReleasedPermits == 1;
}
+ private boolean isPermitAvailable(
+ final int baseTransceiverStationId,
+ final int cellId,
+ final int priority,
+ final int maxConcurrency) {
+ final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);
+
+ final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
+ if (numberOfPermitsIfGranted > maxConcurrency) {
+ permitCounter.decrementAndGet();
+
+ if (!this.highPrioPoolEnabled) {
+ return false;
+ }
+
+ if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) {
+ return false;
+ }
+
+ // Wait until permit is released
+ return this.waitUntilPermitIsAvailable(
+ baseTransceiverStationId, cellId, maxConcurrency, this.maxWaitForHighPrioInMs);
+ }
+ return true;
+ }
+
+ private boolean waitUntilPermitIsAvailable(
+ final int baseTransceiverStationId,
+ final int cellId,
+ final int maxConcurrency,
+ final int maxWaitForHighPrioInMs) {
+
+ final long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < maxWaitForHighPrioInMs) {
+ final boolean permitAvailable =
+ this.permitReleasedNotifier.waitForAvailablePermit(
+ baseTransceiverStationId, cellId, WAIT_TIME);
+ if (!permitAvailable) {
+ continue;
+ }
+ final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);
+ final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
+ if (numberOfPermitsIfGranted > maxConcurrency) {
+ permitCounter.decrementAndGet();
+ } else {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private AtomicInteger getPermitCounter(final int baseTransceiverStationId, final int cellId) {
+ return this.permitsPerSegment
+ .computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>())
+ .computeIfAbsent(cellId, key -> new AtomicInteger(0));
+ }
+
@Override
public String toString() {
return String.format(
diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java
new file mode 100644
index 00000000000..b0bb2e84920
--- /dev/null
+++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java
@@ -0,0 +1,45 @@
+// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package org.opensmartgridplatform.throttling.config;
+
+import com.zaxxer.hikari.util.DriverDataSource;
+import java.util.Properties;
+import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ThrottlingConfig {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ThrottlingConfig.class);
+
+ @Value("${spring.datasource.url}")
+ private String jdbcUrl;
+
+ @Value("${spring.datasource.driver-class-name}")
+ private String databaseDriver;
+
+ @Value("${spring.datasource.username}")
+ private String databaseUsername;
+
+ @Value("${spring.datasource.password}")
+ private String databasePassword;
+
+ @Bean
+ public PermitReleasedNotifier permitReleasedNotifier() {
+ LOGGER.info("Created jdbcUrl {} for permitReleasedNotifier", this.jdbcUrl);
+ final DriverDataSource dataSource =
+ new DriverDataSource(
+ this.jdbcUrl,
+ this.databaseDriver,
+ new Properties(),
+ this.databaseUsername,
+ this.databasePassword);
+
+ return new PermitReleasedNotifier(dataSource);
+ }
+}
diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java
new file mode 100644
index 00000000000..36f105a0c28
--- /dev/null
+++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java
@@ -0,0 +1,112 @@
+// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package org.opensmartgridplatform.throttling.service;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+import org.postgresql.PGConnection;
+import org.postgresql.PGNotification;
+
+@Slf4j
+public class PermitReleasedNotifier {
+ private static final String PERMIT_RELEASED_CHANNEL = "permit_released";
+ private final DataSource dataSource;
+
+ public PermitReleasedNotifier(final DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public boolean waitForAvailablePermit(
+ final int baseTransceiverStationId, final int cellId, final int maxWaitInMs) {
+ Connection connection = null;
+ try {
+ connection = this.dataSource.getConnection();
+ final PGConnection pgConnection =
+ this.startListening(connection, baseTransceiverStationId, cellId);
+
+ final PGNotification[] notifications = pgConnection.getNotifications(maxWaitInMs);
+ if (notifications == null) {
+ log.info(
+ "Received no notifications for btsId: {}, cellId: {} within {} ms",
+ baseTransceiverStationId,
+ cellId,
+ maxWaitInMs);
+ return false;
+ }
+
+ log.debug(
+ "Received notification for btsId: {}, cellId: {}", baseTransceiverStationId, cellId);
+ } catch (final SQLException sqle) {
+ log.error(
+ "SQLException occurred while listening for notification for btsId: {}, cellId: {}",
+ baseTransceiverStationId,
+ cellId,
+ sqle);
+ return false;
+ } finally {
+ this.closeConnection(connection, baseTransceiverStationId, cellId);
+ }
+
+ return true;
+ }
+
+ public void notifyPermitReleased(final int baseTransceiverStationId, final int cellId) {
+ Connection connection = null;
+ try {
+ connection = this.dataSource.getConnection();
+
+ final String sqlStatement =
+ String.format("NOTIFY %s", this.getChannelName(baseTransceiverStationId, cellId));
+ try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) {
+ preparedStatement.executeUpdate();
+ }
+ } catch (final SQLException sqle) {
+ log.error(
+ "SQLException occurred while notify for btsId: {}, cellId: {}",
+ baseTransceiverStationId,
+ cellId,
+ sqle);
+ } finally {
+ this.closeConnection(connection, baseTransceiverStationId, cellId);
+ }
+ }
+
+ private PGConnection startListening(
+ final Connection connection, final int baseTransceiverStationId, final int cellId)
+ throws SQLException {
+
+ final PGConnection pgConnection = connection.unwrap(PGConnection.class);
+ final String sqlStatement =
+ String.format("LISTEN %s", this.getChannelName(baseTransceiverStationId, cellId));
+ try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) {
+ preparedStatement.executeUpdate();
+ }
+
+ return pgConnection;
+ }
+
+ private void closeConnection(
+ final Connection connection, final int baseTransceiverStationId, final int cellId) {
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (final SQLException e) {
+ log.error(
+ "SQLException occurred while trying to close the connection that is listening for notification for btsId: {}, cellId: {}",
+ baseTransceiverStationId,
+ cellId,
+ e);
+ }
+ }
+
+ private String getChannelName(final int baseTransceiverStationId, final int cellId) {
+ return (PERMIT_RELEASED_CHANNEL + "_" + baseTransceiverStationId + "_" + cellId)
+ .replace("-", "minus");
+ }
+}
diff --git a/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties b/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties
index 88ffc5ac748..813972b7e79 100644
--- a/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties
+++ b/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties
@@ -31,9 +31,8 @@ scheduling.task.cleanup.permits.cron.expression=0 0/30 * * * ?
# Releasing expired permits will happen in batches of the following size.
cleanup.permits.batch.size=100
-#max.wait.for.high.prio.in.ms=5000
-# For now we disable this high prio pool by default
-max.wait.for.high.prio.in.ms=0
+wait.for.high.prio.enabled=true
+wait.for.high.prio.max.in.ms=10000
# The task to reset in memory counters with db state is executed by cron expression.
scheduling.task.reinitialize.state.cron.expression=30 0/30 * * * ?
diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java
index ee42dfe1e29..77427b54c02 100644
--- a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java
+++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java
@@ -20,20 +20,27 @@
import org.opensmartgridplatform.throttling.entities.ThrottlingConfig;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
import org.opensmartgridplatform.throttling.repositories.ThrottlingConfigRepository;
+import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
@ExtendWith(MockitoExtension.class)
class PermitsByThrottlingConfigTest {
+ private static final boolean WAIT_FOR_HIGH_PRIO_ENABLED = true;
private static final int MAX_WAIT_FOR_HIGH_PRIO = 1000;
@Mock private ThrottlingConfigRepository throttlingConfigRepository;
@Mock private PermitRepository permitRepository;
+ @Mock private PermitReleasedNotifier permitReleasedNotifier;
private PermitsByThrottlingConfig permitsByThrottlingConfig;
@BeforeEach
void setUp() {
this.permitsByThrottlingConfig =
new PermitsByThrottlingConfig(
- this.throttlingConfigRepository, this.permitRepository, this.MAX_WAIT_FOR_HIGH_PRIO);
+ this.throttlingConfigRepository,
+ this.permitRepository,
+ this.permitReleasedNotifier,
+ WAIT_FOR_HIGH_PRIO_ENABLED,
+ this.MAX_WAIT_FOR_HIGH_PRIO);
}
@Test
diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java
index cd39cf64d3b..b2331865423 100644
--- a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java
+++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java
@@ -6,28 +6,42 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
import org.opensmartgridplatform.throttling.repositories.PermitRepository.PermitCountByNetworkSegment;
+import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
@ExtendWith(MockitoExtension.class)
class PermitsPerNetworkSegmentTest {
+ private static final boolean WAIT_FOR_HIGH_PRIO_ENABLED = true;
private static final int MAX_WAIT_FOR_HIGH_PRIO = 1000;
@Mock private PermitRepository permitRepository;
+ @Mock private PermitReleasedNotifier permitReleasedNotifier;
private PermitsPerNetworkSegment permitsPerNetworkSegment;
@BeforeEach
void setUp() {
this.permitsPerNetworkSegment =
- new PermitsPerNetworkSegment(this.permitRepository, this.MAX_WAIT_FOR_HIGH_PRIO);
+ new PermitsPerNetworkSegment(
+ this.permitRepository,
+ this.permitReleasedNotifier,
+ WAIT_FOR_HIGH_PRIO_ENABLED,
+ this.MAX_WAIT_FOR_HIGH_PRIO);
}
@Test
@@ -130,6 +144,100 @@ void testInitializeDelete() {
assertThat(this.permitsPerNetworkSegment.permitsPerNetworkSegment()).isEmpty();
}
+ @ParameterizedTest
+ @ValueSource(ints = {0, 2000})
+ void testHighPrioPoolTime(final int maxWaitForHighPrio) {
+ this.permitsPerNetworkSegment =
+ new PermitsPerNetworkSegment(
+ this.permitRepository,
+ this.permitReleasedNotifier,
+ WAIT_FOR_HIGH_PRIO_ENABLED,
+ maxWaitForHighPrio);
+
+ final int btsId = 1;
+ final int cellId = 2;
+ final int numberOfPermits = 3;
+ final short throttlingConfigId = Integer.valueOf(1).shortValue();
+ final int clientId = 4;
+ final int requestId = 5;
+ final int priority = 6;
+ final int maxConcurrency = numberOfPermits;
+
+ this.preparePermits(btsId, cellId, numberOfPermits, throttlingConfigId);
+
+ this.permitsPerNetworkSegment.initialize(throttlingConfigId);
+
+ final long start = System.currentTimeMillis();
+ final boolean permitGranted =
+ this.permitsPerNetworkSegment.requestPermit(
+ throttlingConfigId, clientId, btsId, cellId, requestId, priority, maxConcurrency);
+ assertThat(permitGranted).isFalse();
+ assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(maxWaitForHighPrio);
+
+ verify(this.permitRepository, never())
+ .grantPermit(throttlingConfigId, clientId, btsId, cellId, requestId);
+ }
+
+ @Test
+ void testHighPrioPool() {
+ final int maxWaitForHighPrio = 10000;
+ final int waitBeforeRelease = 1000;
+ this.permitsPerNetworkSegment =
+ new PermitsPerNetworkSegment(
+ this.permitRepository,
+ this.permitReleasedNotifier,
+ WAIT_FOR_HIGH_PRIO_ENABLED,
+ maxWaitForHighPrio);
+
+ final int btsId = 1;
+ final int cellId = 2;
+ final int otherCellId = cellId + 1;
+ final int numberOfPermits = 3;
+ final short throttlingConfigId = Integer.valueOf(1).shortValue();
+ final int clientId = 4;
+ final int requestId = 5;
+ final int priority = 6;
+ final int maxConcurrency = numberOfPermits;
+
+ this.preparePermits(btsId, cellId, numberOfPermits, throttlingConfigId);
+
+ when(this.permitRepository.grantPermit(throttlingConfigId, clientId, btsId, cellId, requestId))
+ .thenReturn(true);
+ when(this.permitRepository.grantPermit(
+ throttlingConfigId, clientId, btsId, otherCellId, requestId))
+ .thenReturn(true);
+
+ this.permitsPerNetworkSegment.initialize(throttlingConfigId);
+
+ final long start = System.currentTimeMillis();
+
+ final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
+ executor.schedule(
+ () -> {
+ PermitsPerNetworkSegmentTest.this.permitsPerNetworkSegment.releasePermit(
+ throttlingConfigId, clientId, btsId, cellId, requestId);
+ when(this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, 1000))
+ .thenReturn(false)
+ .thenReturn(true);
+ verify(this.permitReleasedNotifier, times(1)).notifyPermitReleased(btsId, cellId);
+ },
+ waitBeforeRelease,
+ TimeUnit.MILLISECONDS);
+
+ final boolean permitGrantedOtherCell =
+ this.permitsPerNetworkSegment.requestPermit(
+ throttlingConfigId, clientId, btsId, otherCellId, requestId, priority, maxConcurrency);
+ assertThat(permitGrantedOtherCell).isTrue();
+ assertThat((int) (System.currentTimeMillis() - start)).isBetween(0, waitBeforeRelease);
+
+ final boolean permitGranted =
+ this.permitsPerNetworkSegment.requestPermit(
+ throttlingConfigId, clientId, btsId, cellId, requestId, priority, maxConcurrency);
+ assertThat(permitGranted).isTrue();
+ assertThat((int) (System.currentTimeMillis() - start))
+ .isBetween(waitBeforeRelease, maxWaitForHighPrio);
+ }
+
private void preparePermits(
final int btsId,
final int cellId,
diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java
new file mode 100644
index 00000000000..9973a15298a
--- /dev/null
+++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java
@@ -0,0 +1,147 @@
+// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package org.opensmartgridplatform.throttling.service;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import javax.sql.DataSource;
+import org.assertj.core.util.Arrays;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.postgresql.PGConnection;
+import org.postgresql.PGNotification;
+
+@ExtendWith(MockitoExtension.class)
+class PermitReleasedNotifierTest {
+ private static final String PERMIT_RELEASED_CHANNEL = "permit_released";
+
+ private static final int btsId = 1;
+ private static final int cellId = 2;
+ @Mock private DataSource dataSource;
+ private final int maxWaitInMs = 654321;
+ private PermitReleasedNotifier permitReleasedNotifier;
+
+ @BeforeEach
+ void setUp() {
+ this.permitReleasedNotifier = new PermitReleasedNotifier(this.dataSource);
+ }
+
+ @Test
+ void testWaitForPermitReleasedNoNotification() throws SQLException {
+ final PGConnection pgConnection = this.setupPgConnectionListen();
+
+ when(pgConnection.getNotifications(this.maxWaitInMs)).thenReturn(null);
+
+ final boolean result =
+ this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, this.maxWaitInMs);
+
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ void testWaitForPermitReleased() throws SQLException {
+ final PGConnection pgConnection = this.setupPgConnectionListen();
+ final PGNotification pgNotification =
+ this.newPGNotification(this.getChannelName(btsId, cellId));
+
+ when(pgConnection.getNotifications(this.maxWaitInMs)).thenReturn(Arrays.array(pgNotification));
+
+ final boolean result =
+ this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, this.maxWaitInMs);
+
+ assertThat(result).isTrue();
+ verify(this.dataSource.getConnection()).close();
+ }
+
+ @Test
+ void testWaitForPermitReleasedThrowSqlException() throws SQLException {
+ final PGConnection pgConnection = this.setupPgConnectionListen();
+
+ when(pgConnection.getNotifications(this.maxWaitInMs)).thenThrow(new SQLException());
+
+ final boolean result =
+ this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, this.maxWaitInMs);
+
+ assertThat(result).isFalse();
+ verify(this.dataSource.getConnection()).close();
+ }
+
+ @Test
+ void testNotifyPermitReleased() throws SQLException {
+ this.setupPgConnectionNotify();
+
+ this.permitReleasedNotifier.notifyPermitReleased(btsId, cellId);
+ verify(this.dataSource.getConnection()).close();
+ }
+
+ @Test
+ void testNotifyPermitReleasedThrowSqlException() throws SQLException {
+ final Connection connection = mock(Connection.class);
+ when(this.dataSource.getConnection()).thenReturn(connection);
+ when(connection.prepareStatement(
+ String.format("NOTIFY %s", this.getChannelName(btsId, cellId))))
+ .thenThrow(new SQLException());
+
+ this.permitReleasedNotifier.notifyPermitReleased(btsId, cellId);
+ verify(this.dataSource.getConnection()).close();
+ }
+
+ private void setupPgConnectionNotify() throws SQLException {
+ final Connection connection = mock(Connection.class);
+ final PreparedStatement preparedStatement = mock(PreparedStatement.class);
+
+ when(this.dataSource.getConnection()).thenReturn(connection);
+ when(connection.prepareStatement(
+ String.format("NOTIFY %s", this.getChannelName(btsId, cellId))))
+ .thenReturn(preparedStatement);
+ when(preparedStatement.executeUpdate()).thenReturn(0);
+ }
+
+ private PGConnection setupPgConnectionListen() throws SQLException {
+ final Connection connection = mock(Connection.class);
+ final PGConnection pgConnection = mock(PGConnection.class);
+ final PreparedStatement preparedStatement = mock(PreparedStatement.class);
+
+ when(this.dataSource.getConnection()).thenReturn(connection);
+ when(connection.unwrap(PGConnection.class)).thenReturn(pgConnection);
+ when(connection.prepareStatement("LISTEN " + this.getChannelName(btsId, cellId)))
+ .thenReturn(preparedStatement);
+ when(preparedStatement.executeUpdate()).thenReturn(0);
+
+ return pgConnection;
+ }
+
+ private String getChannelName(final int btsId, final int cellId) {
+ return PERMIT_RELEASED_CHANNEL + "_" + btsId + "_" + cellId;
+ }
+
+ private PGNotification newPGNotification(final String name) {
+ return new PGNotification() {
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public int getPID() {
+ return 666;
+ }
+
+ @Override
+ public String getParameter() {
+ return "";
+ }
+ };
+ }
+}