From 8d3a9e32ba9f3934864e941faadbc901df61b2b1 Mon Sep 17 00:00:00 2001 From: Gerben Kroes Date: Mon, 19 Feb 2024 15:08:14 +0100 Subject: [PATCH 1/2] Implement pool for high prio requests based on db events Signed-off-by: Gerben Kroes --- osgp/platform/osgp-throttling-service/pom.xml | 1 - .../throttling/PermitsByThrottlingConfig.java | 15 +- .../throttling/PermitsPerNetworkSegment.java | 90 +++++++++-- .../throttling/config/ThrottlingConfig.java | 45 ++++++ .../service/PermitReleasedNotifier.java | 112 +++++++++++++ .../osgp-throttling-service.properties | 4 +- .../PermitsByThrottlingConfigTest.java | 7 +- .../PermitsPerNetworkSegmentTest.java | 100 +++++++++++- .../service/PermitReleasedNotifierTest.java | 147 ++++++++++++++++++ 9 files changed, 498 insertions(+), 23 deletions(-) create mode 100644 osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java create mode 100644 osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java create mode 100644 osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java diff --git a/osgp/platform/osgp-throttling-service/pom.xml b/osgp/platform/osgp-throttling-service/pom.xml index 3b3fa8f85e4..2e3e0a39ec9 100644 --- a/osgp/platform/osgp-throttling-service/pom.xml +++ b/osgp/platform/osgp-throttling-service/pom.xml @@ -129,7 +129,6 @@ SPDX-License-Identifier: Apache-2.0 org.postgresql postgresql - runtime org.projectlombok diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java index 25cb232fd8e..864ed92cdf9 100644 --- a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java @@ -13,6 +13,7 @@ import org.opensmartgridplatform.throttling.entities.ThrottlingConfig; import org.opensmartgridplatform.throttling.repositories.PermitRepository; import org.opensmartgridplatform.throttling.repositories.ThrottlingConfigRepository; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -28,15 +29,18 @@ public class PermitsByThrottlingConfig { private final ThrottlingConfigRepository throttlingConfigRepository; private final PermitRepository permitRepository; + private final PermitReleasedNotifier permitReleasedNotifier; private final int maxWaitForHighPrioInMs; public PermitsByThrottlingConfig( final ThrottlingConfigRepository throttlingConfigRepository, final PermitRepository permitRepository, + final PermitReleasedNotifier permitReleasedNotifier, @Value("${max.wait.for.high.prio.in.ms:5000}") final int maxWaitForHighPrioInMs) { this.throttlingConfigRepository = throttlingConfigRepository; this.permitRepository = permitRepository; + this.permitReleasedNotifier = permitReleasedNotifier; this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs; } @@ -54,7 +58,10 @@ public void initialize() { throttlingConfigId -> this.permitsPerSegmentByConfig.putIfAbsent( throttlingConfigId, - new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs))); + new PermitsPerNetworkSegment( + this.permitRepository, + this.permitReleasedNotifier, + this.maxWaitForHighPrioInMs))); /* Update config */ this.permitsPerSegmentByConfig.entrySet().parallelStream() @@ -100,7 +107,8 @@ public boolean requestPermit( private PermitsPerNetworkSegment createAndInitialize(final short throttlingConfigId) { final PermitsPerNetworkSegment permitsPerNetworkSegment = - new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs); + new PermitsPerNetworkSegment( + this.permitRepository, this.permitReleasedNotifier, this.maxWaitForHighPrioInMs); permitsPerNetworkSegment.initialize(throttlingConfigId); return permitsPerNetworkSegment; } @@ -113,7 +121,8 @@ public void newThrottlingConfigCreated(final short throttlingConfigId) { */ this.permitsPerSegmentByConfig.putIfAbsent( throttlingConfigId, - new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs)); + new PermitsPerNetworkSegment( + this.permitRepository, this.permitReleasedNotifier, this.maxWaitForHighPrioInMs)); } public boolean releasePermit( diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java index 0f43113dc4e..656ed336b52 100644 --- a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java @@ -10,23 +10,31 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum; import org.opensmartgridplatform.throttling.repositories.PermitRepository; import org.opensmartgridplatform.throttling.repositories.PermitRepository.PermitCountByNetworkSegment; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PermitsPerNetworkSegment { private static final Logger LOGGER = LoggerFactory.getLogger(PermitsPerNetworkSegment.class); + private static final int WAIT_TIME = 1000; + private final ConcurrentMap> permitsPerSegment = new ConcurrentHashMap<>(); private final PermitRepository permitRepository; + private final PermitReleasedNotifier permitReleasedNotifier; private final int maxWaitForHighPrioInMs; public PermitsPerNetworkSegment( - final PermitRepository permitRepository, final int maxWaitForHighPrioInMs) { + final PermitRepository permitRepository, + final PermitReleasedNotifier permitReleasedNotifier, + final int maxWaitForHighPrioInMs) { this.permitRepository = permitRepository; + this.permitReleasedNotifier = permitReleasedNotifier; this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs; } @@ -97,15 +105,7 @@ public boolean requestPermit( final int requestId, final int priority, final int maxConcurrency) { - - final AtomicInteger permitCounter = - this.permitsPerSegment - .computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>()) - .computeIfAbsent(cellId, key -> new AtomicInteger(0)); - - final int numberOfPermitsIfGranted = permitCounter.incrementAndGet(); - if (numberOfPermitsIfGranted > maxConcurrency) { - permitCounter.decrementAndGet(); + if (!this.isPermitAvailable(baseTransceiverStationId, cellId, priority, maxConcurrency)) { return false; } @@ -120,10 +120,7 @@ public boolean releasePermit( final int cellId, final int requestId) { - final AtomicInteger permitCounter = - this.permitsPerSegment - .getOrDefault(baseTransceiverStationId, new ConcurrentHashMap<>()) - .getOrDefault(cellId, new AtomicInteger(0)); + final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId); final int numberOfPermitsIfReleased = permitCounter.decrementAndGet(); if (numberOfPermitsIfReleased < 0) { @@ -134,9 +131,74 @@ public boolean releasePermit( this.permitRepository.releasePermit( throttlingConfigId, clientId, baseTransceiverStationId, cellId, requestId); + if (this.useHighPrioPool()) { + this.permitReleasedNotifier.notifyPermitReleased(baseTransceiverStationId, cellId); + } + return numberOfReleasedPermits == 1; } + private boolean useHighPrioPool() { + return this.maxWaitForHighPrioInMs != 0; + } + + private boolean isPermitAvailable( + final int baseTransceiverStationId, + final int cellId, + final int priority, + final int maxConcurrency) { + final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId); + + final int numberOfPermitsIfGranted = permitCounter.incrementAndGet(); + if (numberOfPermitsIfGranted > maxConcurrency) { + permitCounter.decrementAndGet(); + + if (!this.useHighPrioPool()) { + return false; + } + + if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) { + return false; + } + + // Wait until permit is released + return this.waitUntilPermitIsAvailable( + baseTransceiverStationId, cellId, maxConcurrency, this.maxWaitForHighPrioInMs); + } + return true; + } + + private boolean waitUntilPermitIsAvailable( + final int baseTransceiverStationId, + final int cellId, + final int maxConcurrency, + final int maxWaitForHighPrioInMs) { + + final long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < maxWaitForHighPrioInMs) { + final boolean permitAvailable = + this.permitReleasedNotifier.waitForAvailablePermit( + baseTransceiverStationId, cellId, WAIT_TIME); + if (!permitAvailable) { + continue; + } + final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId); + final int numberOfPermitsIfGranted = permitCounter.incrementAndGet(); + if (numberOfPermitsIfGranted > maxConcurrency) { + permitCounter.decrementAndGet(); + } else { + return true; + } + } + return false; + } + + private AtomicInteger getPermitCounter(final int baseTransceiverStationId, final int cellId) { + return this.permitsPerSegment + .computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>()) + .computeIfAbsent(cellId, key -> new AtomicInteger(0)); + } + @Override public String toString() { return String.format( diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java new file mode 100644 index 00000000000..b0bb2e84920 --- /dev/null +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/config/ThrottlingConfig.java @@ -0,0 +1,45 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package org.opensmartgridplatform.throttling.config; + +import com.zaxxer.hikari.util.DriverDataSource; +import java.util.Properties; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ThrottlingConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(ThrottlingConfig.class); + + @Value("${spring.datasource.url}") + private String jdbcUrl; + + @Value("${spring.datasource.driver-class-name}") + private String databaseDriver; + + @Value("${spring.datasource.username}") + private String databaseUsername; + + @Value("${spring.datasource.password}") + private String databasePassword; + + @Bean + public PermitReleasedNotifier permitReleasedNotifier() { + LOGGER.info("Created jdbcUrl {} for permitReleasedNotifier", this.jdbcUrl); + final DriverDataSource dataSource = + new DriverDataSource( + this.jdbcUrl, + this.databaseDriver, + new Properties(), + this.databaseUsername, + this.databasePassword); + + return new PermitReleasedNotifier(dataSource); + } +} diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java new file mode 100644 index 00000000000..db45729cb42 --- /dev/null +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java @@ -0,0 +1,112 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package org.opensmartgridplatform.throttling.service; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import javax.sql.DataSource; +import lombok.extern.slf4j.Slf4j; +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; + +@Slf4j +public class PermitReleasedNotifier { + private static final String PERMIT_RELEASED_CHANNEL = "permit_released"; + private final DataSource dataSource; + + public PermitReleasedNotifier(final DataSource dataSource) { + this.dataSource = dataSource; + } + + public boolean waitForAvailablePermit( + final int baseTransceiverStationId, final int cellId, final int maxWaitInMs) { + Connection connection = null; + try { + connection = this.dataSource.getConnection(); + final PGConnection pgConnection = + this.startListening(connection, baseTransceiverStationId, cellId); + + final PGNotification[] notifications = pgConnection.getNotifications(maxWaitInMs); + if (notifications == null) { + log.info( + "Received no notifications for btsId: {}, cellId: {} within {} ms", + baseTransceiverStationId, + cellId, + maxWaitInMs); + return false; + } + + log.debug( + "Received notification for btsId: {}, cellId: {}", baseTransceiverStationId, cellId); + } catch (final SQLException sqle) { + log.error( + "SQLException occurred while listening for notification for btsId: {}, cellId: {}", + baseTransceiverStationId, + cellId, + sqle); + return false; + } finally { + this.closeConnection(connection, baseTransceiverStationId, cellId); + } + + return true; + } + + public void notifyPermitReleased(final int baseTransceiverStationId, final int cellId) { + Connection connection = null; + try { + connection = this.dataSource.getConnection(); + + final String sqlStatement = + String.format("NOTIFY %s", this.getChannelName(baseTransceiverStationId, cellId)); + try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) { + preparedStatement.executeUpdate(); + } + } catch (final SQLException sqle) { + log.error( + "SQLException occurred while notify for btsId: {}, cellId: {}", + baseTransceiverStationId, + cellId, + sqle); + } finally { + this.closeConnection(connection, baseTransceiverStationId, cellId); + } + } + + private PGConnection startListening( + final Connection connection, final int baseTransceiverStationId, final int cellId) + throws SQLException { + + final PGConnection pgConnection = connection.unwrap(PGConnection.class); + final String sqlStatement = + String.format("LISTEN %s", this.getChannelName(baseTransceiverStationId, cellId)); + try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) { + preparedStatement.executeUpdate(); + } + + return pgConnection; + } + + private void closeConnection( + final Connection connection, final int baseTransceiverStationId, final int cellId) { + try { + if (connection != null) { + connection.close(); + } + } catch (final SQLException e) { + log.error( + "SQLException occurred while listening for notification for btsId: {}, cellId: {}", + baseTransceiverStationId, + cellId, + e); + } + } + + private String getChannelName(final int baseTransceiverStationId, final int cellId) { + return (PERMIT_RELEASED_CHANNEL + "_" + baseTransceiverStationId + "_" + cellId) + .replace("-", "minus"); + } +} diff --git a/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties b/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties index 88ffc5ac748..a48f269c988 100644 --- a/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties +++ b/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties @@ -31,9 +31,7 @@ scheduling.task.cleanup.permits.cron.expression=0 0/30 * * * ? # Releasing expired permits will happen in batches of the following size. cleanup.permits.batch.size=100 -#max.wait.for.high.prio.in.ms=5000 -# For now we disable this high prio pool by default -max.wait.for.high.prio.in.ms=0 +max.wait.for.high.prio.in.ms=10000 # The task to reset in memory counters with db state is executed by cron expression. scheduling.task.reinitialize.state.cron.expression=30 0/30 * * * ? diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java index ee42dfe1e29..dd2c9d9e8ec 100644 --- a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java +++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java @@ -20,6 +20,7 @@ import org.opensmartgridplatform.throttling.entities.ThrottlingConfig; import org.opensmartgridplatform.throttling.repositories.PermitRepository; import org.opensmartgridplatform.throttling.repositories.ThrottlingConfigRepository; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; @ExtendWith(MockitoExtension.class) class PermitsByThrottlingConfigTest { @@ -27,13 +28,17 @@ class PermitsByThrottlingConfigTest { @Mock private ThrottlingConfigRepository throttlingConfigRepository; @Mock private PermitRepository permitRepository; + @Mock private PermitReleasedNotifier permitReleasedNotifier; private PermitsByThrottlingConfig permitsByThrottlingConfig; @BeforeEach void setUp() { this.permitsByThrottlingConfig = new PermitsByThrottlingConfig( - this.throttlingConfigRepository, this.permitRepository, this.MAX_WAIT_FOR_HIGH_PRIO); + this.throttlingConfigRepository, + this.permitRepository, + this.permitReleasedNotifier, + this.MAX_WAIT_FOR_HIGH_PRIO); } @Test diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java index cd39cf64d3b..65f1c497a17 100644 --- a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java +++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java @@ -6,28 +6,38 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.assertj.core.util.Lists; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensmartgridplatform.throttling.repositories.PermitRepository; import org.opensmartgridplatform.throttling.repositories.PermitRepository.PermitCountByNetworkSegment; +import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier; @ExtendWith(MockitoExtension.class) class PermitsPerNetworkSegmentTest { private static final int MAX_WAIT_FOR_HIGH_PRIO = 1000; @Mock private PermitRepository permitRepository; + @Mock private PermitReleasedNotifier permitReleasedNotifier; private PermitsPerNetworkSegment permitsPerNetworkSegment; @BeforeEach void setUp() { this.permitsPerNetworkSegment = - new PermitsPerNetworkSegment(this.permitRepository, this.MAX_WAIT_FOR_HIGH_PRIO); + new PermitsPerNetworkSegment( + this.permitRepository, this.permitReleasedNotifier, this.MAX_WAIT_FOR_HIGH_PRIO); } @Test @@ -130,6 +140,94 @@ void testInitializeDelete() { assertThat(this.permitsPerNetworkSegment.permitsPerNetworkSegment()).isEmpty(); } + @ParameterizedTest + @ValueSource(ints = {0, 2000}) + void testHighPrioPoolTime(final int maxWaitForHighPrio) { + this.permitsPerNetworkSegment = + new PermitsPerNetworkSegment( + this.permitRepository, this.permitReleasedNotifier, maxWaitForHighPrio); + + final int btsId = 1; + final int cellId = 2; + final int numberOfPermits = 3; + final short throttlingConfigId = Integer.valueOf(1).shortValue(); + final int clientId = 4; + final int requestId = 5; + final int priority = 6; + final int maxConcurrency = numberOfPermits; + + this.preparePermits(btsId, cellId, numberOfPermits, throttlingConfigId); + + this.permitsPerNetworkSegment.initialize(throttlingConfigId); + + final long start = System.currentTimeMillis(); + final boolean permitGranted = + this.permitsPerNetworkSegment.requestPermit( + throttlingConfigId, clientId, btsId, cellId, requestId, priority, maxConcurrency); + assertThat(permitGranted).isFalse(); + assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(maxWaitForHighPrio); + + verify(this.permitRepository, never()) + .grantPermit(throttlingConfigId, clientId, btsId, cellId, requestId); + } + + @Test + void testHighPrioPool() { + final int maxWaitForHighPrio = 10000; + final int waitBeforeRelease = 1000; + this.permitsPerNetworkSegment = + new PermitsPerNetworkSegment( + this.permitRepository, this.permitReleasedNotifier, maxWaitForHighPrio); + + final int btsId = 1; + final int cellId = 2; + final int otherCellId = cellId + 1; + final int numberOfPermits = 3; + final short throttlingConfigId = Integer.valueOf(1).shortValue(); + final int clientId = 4; + final int requestId = 5; + final int priority = 6; + final int maxConcurrency = numberOfPermits; + + this.preparePermits(btsId, cellId, numberOfPermits, throttlingConfigId); + + when(this.permitRepository.grantPermit(throttlingConfigId, clientId, btsId, cellId, requestId)) + .thenReturn(true); + when(this.permitRepository.grantPermit( + throttlingConfigId, clientId, btsId, otherCellId, requestId)) + .thenReturn(true); + + this.permitsPerNetworkSegment.initialize(throttlingConfigId); + + final long start = System.currentTimeMillis(); + + final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); + executor.schedule( + () -> { + PermitsPerNetworkSegmentTest.this.permitsPerNetworkSegment.releasePermit( + throttlingConfigId, clientId, btsId, cellId, requestId); + when(this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, 1000)) + .thenReturn(false) + .thenReturn(true); + verify(this.permitReleasedNotifier, times(1)).notifyPermitReleased(btsId, cellId); + }, + waitBeforeRelease, + TimeUnit.MILLISECONDS); + + final boolean permitGrantedOtherCell = + this.permitsPerNetworkSegment.requestPermit( + throttlingConfigId, clientId, btsId, otherCellId, requestId, priority, maxConcurrency); + assertThat(permitGrantedOtherCell).isTrue(); + assertThat((int) (System.currentTimeMillis() - start)).isBetween(0, waitBeforeRelease); + + final boolean permitGranted = + this.permitsPerNetworkSegment.requestPermit( + throttlingConfigId, clientId, btsId, cellId, requestId, priority, maxConcurrency); + assertThat(permitGranted).isTrue(); + assertThat((int) (System.currentTimeMillis() - start)) + .isBetween(waitBeforeRelease, maxWaitForHighPrio); + } + private void preparePermits( final int btsId, final int cellId, diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java new file mode 100644 index 00000000000..9973a15298a --- /dev/null +++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifierTest.java @@ -0,0 +1,147 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package org.opensmartgridplatform.throttling.service; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import javax.sql.DataSource; +import org.assertj.core.util.Arrays; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.postgresql.PGConnection; +import org.postgresql.PGNotification; + +@ExtendWith(MockitoExtension.class) +class PermitReleasedNotifierTest { + private static final String PERMIT_RELEASED_CHANNEL = "permit_released"; + + private static final int btsId = 1; + private static final int cellId = 2; + @Mock private DataSource dataSource; + private final int maxWaitInMs = 654321; + private PermitReleasedNotifier permitReleasedNotifier; + + @BeforeEach + void setUp() { + this.permitReleasedNotifier = new PermitReleasedNotifier(this.dataSource); + } + + @Test + void testWaitForPermitReleasedNoNotification() throws SQLException { + final PGConnection pgConnection = this.setupPgConnectionListen(); + + when(pgConnection.getNotifications(this.maxWaitInMs)).thenReturn(null); + + final boolean result = + this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, this.maxWaitInMs); + + assertThat(result).isFalse(); + } + + @Test + void testWaitForPermitReleased() throws SQLException { + final PGConnection pgConnection = this.setupPgConnectionListen(); + final PGNotification pgNotification = + this.newPGNotification(this.getChannelName(btsId, cellId)); + + when(pgConnection.getNotifications(this.maxWaitInMs)).thenReturn(Arrays.array(pgNotification)); + + final boolean result = + this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, this.maxWaitInMs); + + assertThat(result).isTrue(); + verify(this.dataSource.getConnection()).close(); + } + + @Test + void testWaitForPermitReleasedThrowSqlException() throws SQLException { + final PGConnection pgConnection = this.setupPgConnectionListen(); + + when(pgConnection.getNotifications(this.maxWaitInMs)).thenThrow(new SQLException()); + + final boolean result = + this.permitReleasedNotifier.waitForAvailablePermit(btsId, cellId, this.maxWaitInMs); + + assertThat(result).isFalse(); + verify(this.dataSource.getConnection()).close(); + } + + @Test + void testNotifyPermitReleased() throws SQLException { + this.setupPgConnectionNotify(); + + this.permitReleasedNotifier.notifyPermitReleased(btsId, cellId); + verify(this.dataSource.getConnection()).close(); + } + + @Test + void testNotifyPermitReleasedThrowSqlException() throws SQLException { + final Connection connection = mock(Connection.class); + when(this.dataSource.getConnection()).thenReturn(connection); + when(connection.prepareStatement( + String.format("NOTIFY %s", this.getChannelName(btsId, cellId)))) + .thenThrow(new SQLException()); + + this.permitReleasedNotifier.notifyPermitReleased(btsId, cellId); + verify(this.dataSource.getConnection()).close(); + } + + private void setupPgConnectionNotify() throws SQLException { + final Connection connection = mock(Connection.class); + final PreparedStatement preparedStatement = mock(PreparedStatement.class); + + when(this.dataSource.getConnection()).thenReturn(connection); + when(connection.prepareStatement( + String.format("NOTIFY %s", this.getChannelName(btsId, cellId)))) + .thenReturn(preparedStatement); + when(preparedStatement.executeUpdate()).thenReturn(0); + } + + private PGConnection setupPgConnectionListen() throws SQLException { + final Connection connection = mock(Connection.class); + final PGConnection pgConnection = mock(PGConnection.class); + final PreparedStatement preparedStatement = mock(PreparedStatement.class); + + when(this.dataSource.getConnection()).thenReturn(connection); + when(connection.unwrap(PGConnection.class)).thenReturn(pgConnection); + when(connection.prepareStatement("LISTEN " + this.getChannelName(btsId, cellId))) + .thenReturn(preparedStatement); + when(preparedStatement.executeUpdate()).thenReturn(0); + + return pgConnection; + } + + private String getChannelName(final int btsId, final int cellId) { + return PERMIT_RELEASED_CHANNEL + "_" + btsId + "_" + cellId; + } + + private PGNotification newPGNotification(final String name) { + return new PGNotification() { + @Override + public String getName() { + return name; + } + + @Override + public int getPID() { + return 666; + } + + @Override + public String getParameter() { + return ""; + } + }; + } +} From 9ac83aa8b28688c0bd5c8dc6be3b945a11052c13 Mon Sep 17 00:00:00 2001 From: Gerben Kroes Date: Tue, 20 Feb 2024 09:45:51 +0100 Subject: [PATCH 2/2] Explicit enabled/disable of high prio pool Signed-off-by: Gerben Kroes --- .../throttling/PermitsByThrottlingConfig.java | 16 +++++++++++++--- .../throttling/PermitsPerNetworkSegment.java | 11 +++++------ .../service/PermitReleasedNotifier.java | 2 +- .../resources/osgp-throttling-service.properties | 3 ++- .../PermitsByThrottlingConfigTest.java | 2 ++ .../throttling/PermitsPerNetworkSegmentTest.java | 16 +++++++++++++--- 6 files changed, 36 insertions(+), 14 deletions(-) diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java index 864ed92cdf9..88d09dfccd1 100644 --- a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfig.java @@ -30,17 +30,20 @@ public class PermitsByThrottlingConfig { private final ThrottlingConfigRepository throttlingConfigRepository; private final PermitRepository permitRepository; private final PermitReleasedNotifier permitReleasedNotifier; + private final boolean highPrioPoolEnabled; private final int maxWaitForHighPrioInMs; public PermitsByThrottlingConfig( final ThrottlingConfigRepository throttlingConfigRepository, final PermitRepository permitRepository, final PermitReleasedNotifier permitReleasedNotifier, - @Value("${max.wait.for.high.prio.in.ms:5000}") final int maxWaitForHighPrioInMs) { + @Value("${wait.for.high.prio.enabled:true}") final boolean highPrioPoolEnabled, + @Value("${wait.for.high.prio.max.in.ms:10000}") final int maxWaitForHighPrioInMs) { this.throttlingConfigRepository = throttlingConfigRepository; this.permitRepository = permitRepository; this.permitReleasedNotifier = permitReleasedNotifier; + this.highPrioPoolEnabled = highPrioPoolEnabled; this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs; } @@ -61,6 +64,7 @@ public void initialize() { new PermitsPerNetworkSegment( this.permitRepository, this.permitReleasedNotifier, + this.highPrioPoolEnabled, this.maxWaitForHighPrioInMs))); /* Update config */ @@ -108,7 +112,10 @@ public boolean requestPermit( private PermitsPerNetworkSegment createAndInitialize(final short throttlingConfigId) { final PermitsPerNetworkSegment permitsPerNetworkSegment = new PermitsPerNetworkSegment( - this.permitRepository, this.permitReleasedNotifier, this.maxWaitForHighPrioInMs); + this.permitRepository, + this.permitReleasedNotifier, + this.highPrioPoolEnabled, + this.maxWaitForHighPrioInMs); permitsPerNetworkSegment.initialize(throttlingConfigId); return permitsPerNetworkSegment; } @@ -122,7 +129,10 @@ public void newThrottlingConfigCreated(final short throttlingConfigId) { this.permitsPerSegmentByConfig.putIfAbsent( throttlingConfigId, new PermitsPerNetworkSegment( - this.permitRepository, this.permitReleasedNotifier, this.maxWaitForHighPrioInMs)); + this.permitRepository, + this.permitReleasedNotifier, + this.highPrioPoolEnabled, + this.maxWaitForHighPrioInMs)); } public boolean releasePermit( diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java index 656ed336b52..d84e83bac6c 100644 --- a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegment.java @@ -27,14 +27,17 @@ public class PermitsPerNetworkSegment { private final PermitRepository permitRepository; private final PermitReleasedNotifier permitReleasedNotifier; + private final boolean highPrioPoolEnabled; private final int maxWaitForHighPrioInMs; public PermitsPerNetworkSegment( final PermitRepository permitRepository, final PermitReleasedNotifier permitReleasedNotifier, + final boolean highPrioPoolEnabled, final int maxWaitForHighPrioInMs) { this.permitRepository = permitRepository; this.permitReleasedNotifier = permitReleasedNotifier; + this.highPrioPoolEnabled = highPrioPoolEnabled; this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs; } @@ -131,17 +134,13 @@ public boolean releasePermit( this.permitRepository.releasePermit( throttlingConfigId, clientId, baseTransceiverStationId, cellId, requestId); - if (this.useHighPrioPool()) { + if (this.highPrioPoolEnabled) { this.permitReleasedNotifier.notifyPermitReleased(baseTransceiverStationId, cellId); } return numberOfReleasedPermits == 1; } - private boolean useHighPrioPool() { - return this.maxWaitForHighPrioInMs != 0; - } - private boolean isPermitAvailable( final int baseTransceiverStationId, final int cellId, @@ -153,7 +152,7 @@ private boolean isPermitAvailable( if (numberOfPermitsIfGranted > maxConcurrency) { permitCounter.decrementAndGet(); - if (!this.useHighPrioPool()) { + if (!this.highPrioPoolEnabled) { return false; } diff --git a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java index db45729cb42..36f105a0c28 100644 --- a/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java +++ b/osgp/platform/osgp-throttling-service/src/main/java/org/opensmartgridplatform/throttling/service/PermitReleasedNotifier.java @@ -98,7 +98,7 @@ private void closeConnection( } } catch (final SQLException e) { log.error( - "SQLException occurred while listening for notification for btsId: {}, cellId: {}", + "SQLException occurred while trying to close the connection that is listening for notification for btsId: {}, cellId: {}", baseTransceiverStationId, cellId, e); diff --git a/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties b/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties index a48f269c988..813972b7e79 100644 --- a/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties +++ b/osgp/platform/osgp-throttling-service/src/main/resources/osgp-throttling-service.properties @@ -31,7 +31,8 @@ scheduling.task.cleanup.permits.cron.expression=0 0/30 * * * ? # Releasing expired permits will happen in batches of the following size. cleanup.permits.batch.size=100 -max.wait.for.high.prio.in.ms=10000 +wait.for.high.prio.enabled=true +wait.for.high.prio.max.in.ms=10000 # The task to reset in memory counters with db state is executed by cron expression. scheduling.task.reinitialize.state.cron.expression=30 0/30 * * * ? diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java index dd2c9d9e8ec..77427b54c02 100644 --- a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java +++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsByThrottlingConfigTest.java @@ -24,6 +24,7 @@ @ExtendWith(MockitoExtension.class) class PermitsByThrottlingConfigTest { + private static final boolean WAIT_FOR_HIGH_PRIO_ENABLED = true; private static final int MAX_WAIT_FOR_HIGH_PRIO = 1000; @Mock private ThrottlingConfigRepository throttlingConfigRepository; @@ -38,6 +39,7 @@ void setUp() { this.throttlingConfigRepository, this.permitRepository, this.permitReleasedNotifier, + WAIT_FOR_HIGH_PRIO_ENABLED, this.MAX_WAIT_FOR_HIGH_PRIO); } diff --git a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java index 65f1c497a17..b2331865423 100644 --- a/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java +++ b/osgp/platform/osgp-throttling-service/src/test/java/org/opensmartgridplatform/throttling/PermitsPerNetworkSegmentTest.java @@ -28,6 +28,7 @@ @ExtendWith(MockitoExtension.class) class PermitsPerNetworkSegmentTest { + private static final boolean WAIT_FOR_HIGH_PRIO_ENABLED = true; private static final int MAX_WAIT_FOR_HIGH_PRIO = 1000; @Mock private PermitRepository permitRepository; @Mock private PermitReleasedNotifier permitReleasedNotifier; @@ -37,7 +38,10 @@ class PermitsPerNetworkSegmentTest { void setUp() { this.permitsPerNetworkSegment = new PermitsPerNetworkSegment( - this.permitRepository, this.permitReleasedNotifier, this.MAX_WAIT_FOR_HIGH_PRIO); + this.permitRepository, + this.permitReleasedNotifier, + WAIT_FOR_HIGH_PRIO_ENABLED, + this.MAX_WAIT_FOR_HIGH_PRIO); } @Test @@ -145,7 +149,10 @@ void testInitializeDelete() { void testHighPrioPoolTime(final int maxWaitForHighPrio) { this.permitsPerNetworkSegment = new PermitsPerNetworkSegment( - this.permitRepository, this.permitReleasedNotifier, maxWaitForHighPrio); + this.permitRepository, + this.permitReleasedNotifier, + WAIT_FOR_HIGH_PRIO_ENABLED, + maxWaitForHighPrio); final int btsId = 1; final int cellId = 2; @@ -177,7 +184,10 @@ void testHighPrioPool() { final int waitBeforeRelease = 1000; this.permitsPerNetworkSegment = new PermitsPerNetworkSegment( - this.permitRepository, this.permitReleasedNotifier, maxWaitForHighPrio); + this.permitRepository, + this.permitReleasedNotifier, + WAIT_FOR_HIGH_PRIO_ENABLED, + maxWaitForHighPrio); final int btsId = 1; final int cellId = 2;