diff --git a/osgp/platform/osgp-throttling-service/pom.xml b/osgp/platform/osgp-throttling-service/pom.xml index 3b3fa8f85e4..2e3e0a39ec9 100644 --- a/osgp/platform/osgp-throttling-service/pom.xml +++ b/osgp/platform/osgp-throttling-service/pom.xml @@ -129,7 +129,6 @@ SPDX-License-Identifier: Apache-2.0 org.postgresql postgresql - runtime org.projectlombok diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java index 25cb232fd8e..88d09dfccd1 100644 --- a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java @@ -13,6 +13,7 @@ import org.opensmartgridplatform.throttling.entities.ThrottlingConfig; import org.opensmartgridplatform.throttling.repositories.PermitRepository; import org.opensmartgridplatform.throttling.repositories.ThrottlingConfigRepository; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -28,15 +29,21 @@ public class PermitsByThrottlingConfig { private final ThrottlingConfigRepository throttlingConfigRepository; private final PermitRepository permitRepository; + private final PermitReleasedNotifier permitReleasedNotifier; + private final boolean highPrioPoolEnabled; private final int maxWaitForHighPrioInMs; public PermitsByThrottlingConfig( final ThrottlingConfigRepository throttlingConfigRepository, final PermitRepository permitRepository, - @Value("${max.wait.for.high.prio.in.ms:5000}") final int maxWaitForHighPrioInMs) { + final PermitReleasedNotifier permitReleasedNotifier, + @Value("${wait.for.high.prio.enabled:true}") final boolean highPrioPoolEnabled, + @Value("${wait.for.high.prio.max.in.ms:10000}") final int maxWaitForHighPrioInMs) { this.throttlingConfigRepository = throttlingConfigRepository; this.permitRepository = permitRepository; + this.permitReleasedNotifier = permitReleasedNotifier; + this.highPrioPoolEnabled = highPrioPoolEnabled; this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs; } @@ -54,7 +61,11 @@ public void initialize() { throttlingConfigId -> this.permitsPerSegmentByConfig.putIfAbsent( throttlingConfigId, - new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs))); + new PermitsPerNetworkSegment( + this.permitRepository, + this.permitReleasedNotifier, + this.highPrioPoolEnabled, + this.maxWaitForHighPrioInMs))); /* Update config */ this.permitsPerSegmentByConfig.entrySet().parallelStream() @@ -100,7 +111,11 @@ public boolean requestPermit( private PermitsPerNetworkSegment createAndInitialize(final short throttlingConfigId) { final PermitsPerNetworkSegment permitsPerNetworkSegment = - new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs); + new PermitsPerNetworkSegment( + this.permitRepository, + this.permitReleasedNotifier, + this.highPrioPoolEnabled, + this.maxWaitForHighPrioInMs); permitsPerNetworkSegment.initialize(throttlingConfigId); return permitsPerNetworkSegment; } @@ -113,7 +128,11 @@ public void newThrottlingConfigCreated(final short throttlingConfigId) { */ this.permitsPerSegmentByConfig.putIfAbsent( throttlingConfigId, - new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs)); + new PermitsPerNetworkSegment( + this.permitRepository, + this.permitReleasedNotifier, + this.highPrioPoolEnabled, + this.maxWaitForHighPrioInMs)); } public boolean releasePermit( diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java index 0f43113dc4e..d84e83bac6c 100644 --- a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java @@ -10,23 +10,34 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum; import org.opensmartgridplatform.throttling.repositories.PermitRepository; import org.opensmartgridplatform.throttling.repositories.PermitRepository.PermitCountByNetworkSegment; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PermitsPerNetworkSegment { private static final Logger LOGGER = LoggerFactory.getLogger(PermitsPerNetworkSegment.class); + private static final int WAIT_TIME = 1000; + private final ConcurrentMap> permitsPerSegment = new ConcurrentHashMap<>(); private final PermitRepository permitRepository; + private final PermitReleasedNotifier permitReleasedNotifier; + private final boolean highPrioPoolEnabled; private final int maxWaitForHighPrioInMs; public PermitsPerNetworkSegment( - final PermitRepository permitRepository, final int maxWaitForHighPrioInMs) { + final PermitRepository permitRepository, + final PermitReleasedNotifier permitReleasedNotifier, + final boolean highPrioPoolEnabled, + final int maxWaitForHighPrioInMs) { this.permitRepository = permitRepository; + this.permitReleasedNotifier = permitReleasedNotifier; + this.highPrioPoolEnabled = highPrioPoolEnabled; this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs; } @@ -97,15 +108,7 @@ public boolean requestPermit( final int requestId, final int priority, final int maxConcurrency) { - - final AtomicInteger permitCounter = - this.permitsPerSegment - .computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>()) - .computeIfAbsent(cellId, key -> new AtomicInteger(0)); - - final int numberOfPermitsIfGranted = permitCounter.incrementAndGet(); - if (numberOfPermitsIfGranted > maxConcurrency) { - permitCounter.decrementAndGet(); + if (!this.isPermitAvailable(baseTransceiverStationId, cellId, priority, maxConcurrency)) { return false; } @@ -120,10 +123,7 @@ public boolean releasePermit( final int cellId, final int requestId) { - final AtomicInteger permitCounter = - this.permitsPerSegment - .getOrDefault(baseTransceiverStationId, new ConcurrentHashMap<>()) - .getOrDefault(cellId, new AtomicInteger(0)); + final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId); final int numberOfPermitsIfReleased = permitCounter.decrementAndGet(); if (numberOfPermitsIfReleased < 0) { @@ -134,9 +134,70 @@ public boolean releasePermit( this.permitRepository.releasePermit( throttlingConfigId, clientId, baseTransceiverStationId, cellId, requestId); + if (this.highPrioPoolEnabled) { + this.permitReleasedNotifier.notifyPermitReleased(baseTransceiverStationId, cellId); + } + return numberOfReleasedPermits == 1; } + private boolean isPermitAvailable( + final int baseTransceiverStationId, + final int cellId, + final int priority, + final int maxConcurrency) { + final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId); + + final int numberOfPermitsIfGranted = permitCounter.incrementAndGet(); + if (numberOfPermitsIfGranted > maxConcurrency) { + permitCounter.decrementAndGet(); + + if (!this.highPrioPoolEnabled) { + return false; + } + + if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) { + return false; + } + + // Wait until permit is released + return this.waitUntilPermitIsAvailable( + baseTransceiverStationId, cellId, maxConcurrency, this.maxWaitForHighPrioInMs); + } + return true; + } + + private boolean waitUntilPermitIsAvailable( + final int baseTransceiverStationId, + final int cellId, + final int maxConcurrency, + final int maxWaitForHighPrioInMs) { + + final long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < maxWaitForHighPrioInMs) { + final boolean permitAvailable = + this.permitReleasedNotifier.waitForAvailablePermit( + baseTransceiverStationId, cellId, WAIT_TIME); + if (!permitAvailable) { + continue; + } + final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId); + final int numberOfPermitsIfGranted = permitCounter.incrementAndGet(); + if (numberOfPermitsIfGranted > maxConcurrency) { + permitCounter.decrementAndGet(); + } else { + return true; + } + } + return false; + } + + private AtomicInteger getPermitCounter(final int baseTransceiverStationId, final int cellId) { + return this.permitsPerSegment + .computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>()) + .computeIfAbsent(cellId, key -> new AtomicInteger(0)); + } + @Override public String toString() { return String.format( diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java new file mode 100644 index 00000000000..b0bb2e84920 --- /dev/null +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java @@ -0,0 +1,45 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package org.opensmartgridplatform.throttling.config; + +import com.zaxxer.hikari.util.DriverDataSource; +import java.util.Properties; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ThrottlingConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(ThrottlingConfig.class); + + @Value("${spring.datasource.url}") + private String jdbcUrl; + + @Value("${spring.datasource.driver-class-name}") + private String databaseDriver; + + @Value("${spring.datasource.username}") + private String databaseUsername; + + @Value("${spring.datasource.password}") + private String databasePassword; + + @Bean + public PermitReleasedNotifier permitReleasedNotifier() { + LOGGER.info("Created jdbcUrl {} for permitReleasedNotifier", this.jdbcUrl); + final DriverDataSource dataSource = + new DriverDataSource( + this.jdbcUrl, + this.databaseDriver, + new Properties(), + this.databaseUsername, + this.databasePassword); + + return new PermitReleasedNotifier(dataSource); + } +} diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java new file mode 100644 index 00000000000..36f105a0c28 --- /dev/null +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java @@ -0,0 +1,112 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package org.opensmartgridplatform.throttling.service; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import javax.sql.DataSource; +import lombok.extern.slf4j.Slf4j; +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; + +@Slf4j +public class PermitReleasedNotifier { + private static final String PERMIT_RELEASED_CHANNEL = "permit_released"; + private final DataSource dataSource; + + public PermitReleasedNotifier(final DataSource dataSource) { + this.dataSource = dataSource; + } + + public boolean waitForAvailablePermit( + final int baseTransceiverStationId, final int cellId, final int maxWaitInMs) { + Connection connection = null; + try { + connection = this.dataSource.getConnection(); + final PGConnection pgConnection = + this.startListening(connection, baseTransceiverStationId, cellId); + + final PGNotification[] notifications = pgConnection.getNotifications(maxWaitInMs); + if (notifications == null) { + log.info( + "Received no notifications for btsId: {}, cellId: {} within {} ms", + baseTransceiverStationId, + cellId, + maxWaitInMs); + return false; + } + + log.debug( + "Received notification for btsId: {}, cellId: {}", baseTransceiverStationId, cellId); + } catch (final SQLException sqle) { + log.error( + "SQLException occurred while listening for notification for btsId: {}, cellId: {}", + baseTransceiverStationId, + cellId, + sqle); + return false; + } finally { + this.closeConnection(connection, baseTransceiverStationId, cellId); + } + + return true; + } + + public void notifyPermitReleased(final int baseTransceiverStationId, final int cellId) { + Connection connection = null; + try { + connection = this.dataSource.getConnection(); + + final String sqlStatement = + String.format("NOTIFY %s", this.getChannelName(baseTransceiverStationId, cellId)); + try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) { + preparedStatement.executeUpdate(); + } + } catch (final SQLException sqle) { + log.error( + "SQLException occurred while notify for btsId: {}, cellId: {}", + baseTransceiverStationId, + cellId, + sqle); + } finally { + this.closeConnection(connection, baseTransceiverStationId, cellId); + } + } + + private PGConnection startListening( + final Connection connection, final int baseTransceiverStationId, final int cellId) + throws SQLException { + + final PGConnection pgConnection = connection.unwrap(PGConnection.class); + final String sqlStatement = + String.format("LISTEN %s", this.getChannelName(baseTransceiverStationId, cellId)); + try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) { + preparedStatement.executeUpdate(); + } + + return pgConnection; + } + + private void closeConnection( + final Connection connection, final int baseTransceiverStationId, final int cellId) { + try { + if (connection != null) { + connection.close(); + } + } catch (final SQLException e) { + log.error( + "SQLException occurred while trying to close the connection that is listening for notification for btsId: {}, cellId: {}", + baseTransceiverStationId, + cellId, + e); + } + } + + private String getChannelName(final int baseTransceiverStationId, final int cellId) { + return (PERMIT_RELEASED_CHANNEL + "_" + baseTransceiverStationId + "_" + cellId) + .replace("-", "minus"); + } +} diff --git a/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties b/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties index 88ffc5ac748..813972b7e79 100644 --- a/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties +++ b/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties @@ -31,9 +31,8 @@ scheduling.task.cleanup.permits.cron.expression=0 0/30 * * * ? # Releasing expired permits will happen in batches of the following size. cleanup.permits.batch.size=100 -#max.wait.for.high.prio.in.ms=5000 -# For now we disable this high prio pool by default -max.wait.for.high.prio.in.ms=0 +wait.for.high.prio.enabled=true +wait.for.high.prio.max.in.ms=10000 # The task to reset in memory counters with db state is executed by cron expression. scheduling.task.reinitialize.state.cron.expression=30 0/30 * * * ? diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java index ee42dfe1e29..77427b54c02 100644 --- a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java +++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java @@ -20,20 +20,27 @@ import org.opensmartgridplatform.throttling.entities.ThrottlingConfig; import org.opensmartgridplatform.throttling.repositories.PermitRepository; import org.opensmartgridplatform.throttling.repositories.ThrottlingConfigRepository; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; @ExtendWith(MockitoExtension.class) class PermitsByThrottlingConfigTest { + private static final boolean WAIT_FOR_HIGH_PRIO_ENABLED = true; private static final int MAX_WAIT_FOR_HIGH_PRIO = 1000; @Mock private ThrottlingConfigRepository throttlingConfigRepository; @Mock private PermitRepository permitRepository; + @Mock private PermitReleasedNotifier permitReleasedNotifier; private PermitsByThrottlingConfig permitsByThrottlingConfig; @BeforeEach void setUp() { this.permitsByThrottlingConfig = new PermitsByThrottlingConfig( - this.throttlingConfigRepository, this.permitRepository, this.MAX_WAIT_FOR_HIGH_PRIO); + this.throttlingConfigRepository, + this.permitRepository, + this.permitReleasedNotifier, + WAIT_FOR_HIGH_PRIO_ENABLED, + this.MAX_WAIT_FOR_HIGH_PRIO); } @Test diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java index cd39cf64d3b..b2331865423 100644 --- a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java +++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java @@ -6,28 +6,42 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.assertj.core.util.Lists; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensmartgridplatform.throttling.repositories.PermitRepository; import org.opensmartgridplatform.throttling.repositories.PermitRepository.PermitCountByNetworkSegment; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; @ExtendWith(MockitoExtension.class) class PermitsPerNetworkSegmentTest { + private static final boolean WAIT_FOR_HIGH_PRIO_ENABLED = true; private static final int MAX_WAIT_FOR_HIGH_PRIO = 1000; @Mock private PermitRepository permitRepository; + @Mock private PermitReleasedNotifier permitReleasedNotifier; private PermitsPerNetworkSegment permitsPerNetworkSegment; @BeforeEach void setUp() { this.permitsPerNetworkSegment = - new PermitsPerNetworkSegment(this.permitRepository, this.MAX_WAIT_FOR_HIGH_PRIO); + new PermitsPerNetworkSegment( + this.permitRepository, + this.permitReleasedNotifier, + WAIT_FOR_HIGH_PRIO_ENABLED, + this.MAX_WAIT_FOR_HIGH_PRIO); } @Test @@ -130,6 +144,100 @@ void testInitializeDelete() { assertThat(this.permitsPerNetworkSegment.permitsPerNetworkSegment()).isEmpty(); } + @ParameterizedTest + @ValueSource(ints = {0, 2000}) + void testHighPrioPoolTime(final int maxWaitForHighPrio) { + this.permitsPerNetworkSegment = + new PermitsPerNetworkSegment( + this.permitRepository, + this.permitReleasedNotifier, + WAIT_FOR_HIGH_PRIO_ENABLED, + maxWaitForHighPrio); + + final int btsId = 1; + final int cellId = 2; + final int numberOfPermits = 3; + final short throttlingConfigId = Integer.valueOf(1).shortValue(); + final int clientId = 4; + final int requestId = 5; + final int priority = 6; + final int maxConcurrency = numberOfPermits; + + this.preparePermits(btsId, cellId, numberOfPermits, throttlingConfigId); + + this.permitsPerNetworkSegment.initialize(throttlingConfigId); + + final long start = System.currentTimeMillis(); + final boolean permitGranted = + this.permitsPerNetworkSegment.requestPermit( + throttlingConfigId, clientId, btsId, cellId, requestId, priority, maxConcurrency); + assertThat(permitGranted).isFalse(); + assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(maxWaitForHighPrio); + + verify(this.permitRepository, never()) + .grantPermit(throttlingConfigId, clientId, btsId, cellId, requestId); + } + + @Test + void testHighPrioPool() { + final int maxWaitForHighPrio = 10000; + final int waitBeforeRelease = 1000; + this.permitsPerNetworkSegment = + new PermitsPerNetworkSegment( + this.permitRepository, + this.permitReleasedNotifier, + WAIT_FOR_HIGH_PRIO_ENABLED, + maxWaitForHighPrio); + + final int btsId = 1; + final int cellId = 2; + final int otherCellId = cellId + 1; + final int numberOfPermits = 3; + final short throttlingConfigId = Integer.valueOf(1).shortValue(); + final int clientId = 4; + final int requestId = 5; + final int priority = 6; + final int maxConcurrency = numberOfPermits; + + this.preparePermits(btsId, cellId, numberOfPermits, throttlingConfigId); + + when(this.permitRepository.grantPermit(throttlingConfigId, clientId, btsId, cellId, requestId)) + .thenReturn(true); + when(this.permitRepository.grantPermit( + throttlingConfigId, clientId, btsId, otherCellId, requestId)) + .thenReturn(true); + + this.permitsPerNetworkSegment.initialize(throttlingConfigId); + + final long start = System.currentTimeMillis(); + + final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); + executor.schedule( + () -> { + PermitsPerNetworkSegmentTest.this.permitsPerNetworkSegment.releasePermit( + throttlingConfigId, clientId, btsId, cellId, requestId); + when(this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, 1000)) + .thenReturn(false) + .thenReturn(true); + verify(this.permitReleasedNotifier, times(1)).notifyPermitReleased(btsId, cellId); + }, + waitBeforeRelease, + TimeUnit.MILLISECONDS); + + final boolean permitGrantedOtherCell = + this.permitsPerNetworkSegment.requestPermit( + throttlingConfigId, clientId, btsId, otherCellId, requestId, priority, maxConcurrency); + assertThat(permitGrantedOtherCell).isTrue(); + assertThat((int) (System.currentTimeMillis() - start)).isBetween(0, waitBeforeRelease); + + final boolean permitGranted = + this.permitsPerNetworkSegment.requestPermit( + throttlingConfigId, clientId, btsId, cellId, requestId, priority, maxConcurrency); + assertThat(permitGranted).isTrue(); + assertThat((int) (System.currentTimeMillis() - start)) + .isBetween(waitBeforeRelease, maxWaitForHighPrio); + } + private void preparePermits( final int btsId, final int cellId, diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java new file mode 100644 index 00000000000..9973a15298a --- /dev/null +++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java @@ -0,0 +1,147 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package org.opensmartgridplatform.throttling.service; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import javax.sql.DataSource; +import org.assertj.core.util.Arrays; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; + +@ExtendWith(MockitoExtension.class) +class PermitReleasedNotifierTest { + private static final String PERMIT_RELEASED_CHANNEL = "permit_released"; + + private static final int btsId = 1; + private static final int cellId = 2; + @Mock private DataSource dataSource; + private final int maxWaitInMs = 654321; + private PermitReleasedNotifier permitReleasedNotifier; + + @BeforeEach + void setUp() { + this.permitReleasedNotifier = new PermitReleasedNotifier(this.dataSource); + } + + @Test + void testWaitForPermitReleasedNoNotification() throws SQLException { + final PGConnection pgConnection = this.setupPgConnectionListen(); + + when(pgConnection.getNotifications(this.maxWaitInMs)).thenReturn(null); + + final boolean result = + this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, this.maxWaitInMs); + + assertThat(result).isFalse(); + } + + @Test + void testWaitForPermitReleased() throws SQLException { + final PGConnection pgConnection = this.setupPgConnectionListen(); + final PGNotification pgNotification = + this.newPGNotification(this.getChannelName(btsId, cellId)); + + when(pgConnection.getNotifications(this.maxWaitInMs)).thenReturn(Arrays.array(pgNotification)); + + final boolean result = + this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, this.maxWaitInMs); + + assertThat(result).isTrue(); + verify(this.dataSource.getConnection()).close(); + } + + @Test + void testWaitForPermitReleasedThrowSqlException() throws SQLException { + final PGConnection pgConnection = this.setupPgConnectionListen(); + + when(pgConnection.getNotifications(this.maxWaitInMs)).thenThrow(new SQLException()); + + final boolean result = + this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, this.maxWaitInMs); + + assertThat(result).isFalse(); + verify(this.dataSource.getConnection()).close(); + } + + @Test + void testNotifyPermitReleased() throws SQLException { + this.setupPgConnectionNotify(); + + this.permitReleasedNotifier.notifyPermitReleased(btsId, cellId); + verify(this.dataSource.getConnection()).close(); + } + + @Test + void testNotifyPermitReleasedThrowSqlException() throws SQLException { + final Connection connection = mock(Connection.class); + when(this.dataSource.getConnection()).thenReturn(connection); + when(connection.prepareStatement( + String.format("NOTIFY %s", this.getChannelName(btsId, cellId)))) + .thenThrow(new SQLException()); + + this.permitReleasedNotifier.notifyPermitReleased(btsId, cellId); + verify(this.dataSource.getConnection()).close(); + } + + private void setupPgConnectionNotify() throws SQLException { + final Connection connection = mock(Connection.class); + final PreparedStatement preparedStatement = mock(PreparedStatement.class); + + when(this.dataSource.getConnection()).thenReturn(connection); + when(connection.prepareStatement( + String.format("NOTIFY %s", this.getChannelName(btsId, cellId)))) + .thenReturn(preparedStatement); + when(preparedStatement.executeUpdate()).thenReturn(0); + } + + private PGConnection setupPgConnectionListen() throws SQLException { + final Connection connection = mock(Connection.class); + final PGConnection pgConnection = mock(PGConnection.class); + final PreparedStatement preparedStatement = mock(PreparedStatement.class); + + when(this.dataSource.getConnection()).thenReturn(connection); + when(connection.unwrap(PGConnection.class)).thenReturn(pgConnection); + when(connection.prepareStatement("LISTEN " + this.getChannelName(btsId, cellId))) + .thenReturn(preparedStatement); + when(preparedStatement.executeUpdate()).thenReturn(0); + + return pgConnection; + } + + private String getChannelName(final int btsId, final int cellId) { + return PERMIT_RELEASED_CHANNEL + "_" + btsId + "_" + cellId; + } + + private PGNotification newPGNotification(final String name) { + return new PGNotification() { + @Override + public String getName() { + return name; + } + + @Override + public int getPID() { + return 666; + } + + @Override + public String getParameter() { + return ""; + } + }; + } +}