Skip to content

Commit

Permalink
Merge pull request #1176 from OSGP/feature/throttling_db_events
Browse files Browse the repository at this point in the history
Implement pool for high prio requests based on db events
  • Loading branch information
kroesctrl authored Feb 20, 2024
2 parents e64723f + 9ac83aa commit af89ec6
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 24 deletions.
1 change: 0 additions & 1 deletion osgp/platform/osgp-throttling-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ SPDX-License-Identifier: Apache-2.0
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensmartgridplatform.throttling.entities.ThrottlingConfig;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
import org.opensmartgridplatform.throttling.repositories.ThrottlingConfigRepository;
import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -28,15 +29,21 @@ public class PermitsByThrottlingConfig {

private final ThrottlingConfigRepository throttlingConfigRepository;
private final PermitRepository permitRepository;
private final PermitReleasedNotifier permitReleasedNotifier;
private final boolean highPrioPoolEnabled;
private final int maxWaitForHighPrioInMs;

public PermitsByThrottlingConfig(
final ThrottlingConfigRepository throttlingConfigRepository,
final PermitRepository permitRepository,
@Value("${max.wait.for.high.prio.in.ms:5000}") final int maxWaitForHighPrioInMs) {
final PermitReleasedNotifier permitReleasedNotifier,
@Value("${wait.for.high.prio.enabled:true}") final boolean highPrioPoolEnabled,
@Value("${wait.for.high.prio.max.in.ms:10000}") final int maxWaitForHighPrioInMs) {

this.throttlingConfigRepository = throttlingConfigRepository;
this.permitRepository = permitRepository;
this.permitReleasedNotifier = permitReleasedNotifier;
this.highPrioPoolEnabled = highPrioPoolEnabled;
this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs;
}

Expand All @@ -54,7 +61,11 @@ public void initialize() {
throttlingConfigId ->
this.permitsPerSegmentByConfig.putIfAbsent(
throttlingConfigId,
new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs)));
new PermitsPerNetworkSegment(
this.permitRepository,
this.permitReleasedNotifier,
this.highPrioPoolEnabled,
this.maxWaitForHighPrioInMs)));

/* Update config */
this.permitsPerSegmentByConfig.entrySet().parallelStream()
Expand Down Expand Up @@ -100,7 +111,11 @@ public boolean requestPermit(

private PermitsPerNetworkSegment createAndInitialize(final short throttlingConfigId) {
final PermitsPerNetworkSegment permitsPerNetworkSegment =
new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs);
new PermitsPerNetworkSegment(
this.permitRepository,
this.permitReleasedNotifier,
this.highPrioPoolEnabled,
this.maxWaitForHighPrioInMs);
permitsPerNetworkSegment.initialize(throttlingConfigId);
return permitsPerNetworkSegment;
}
Expand All @@ -113,7 +128,11 @@ public void newThrottlingConfigCreated(final short throttlingConfigId) {
*/
this.permitsPerSegmentByConfig.putIfAbsent(
throttlingConfigId,
new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs));
new PermitsPerNetworkSegment(
this.permitRepository,
this.permitReleasedNotifier,
this.highPrioPoolEnabled,
this.maxWaitForHighPrioInMs));
}

public boolean releasePermit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,34 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
import org.opensmartgridplatform.throttling.repositories.PermitRepository.PermitCountByNetworkSegment;
import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PermitsPerNetworkSegment {
private static final Logger LOGGER = LoggerFactory.getLogger(PermitsPerNetworkSegment.class);

private static final int WAIT_TIME = 1000;

private final ConcurrentMap<Integer, ConcurrentMap<Integer, AtomicInteger>> permitsPerSegment =
new ConcurrentHashMap<>();

private final PermitRepository permitRepository;
private final PermitReleasedNotifier permitReleasedNotifier;
private final boolean highPrioPoolEnabled;
private final int maxWaitForHighPrioInMs;

public PermitsPerNetworkSegment(
final PermitRepository permitRepository, final int maxWaitForHighPrioInMs) {
final PermitRepository permitRepository,
final PermitReleasedNotifier permitReleasedNotifier,
final boolean highPrioPoolEnabled,
final int maxWaitForHighPrioInMs) {
this.permitRepository = permitRepository;
this.permitReleasedNotifier = permitReleasedNotifier;
this.highPrioPoolEnabled = highPrioPoolEnabled;
this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs;
}

Expand Down Expand Up @@ -97,15 +108,7 @@ public boolean requestPermit(
final int requestId,
final int priority,
final int maxConcurrency) {

final AtomicInteger permitCounter =
this.permitsPerSegment
.computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>())
.computeIfAbsent(cellId, key -> new AtomicInteger(0));

final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
if (numberOfPermitsIfGranted > maxConcurrency) {
permitCounter.decrementAndGet();
if (!this.isPermitAvailable(baseTransceiverStationId, cellId, priority, maxConcurrency)) {
return false;
}

Expand All @@ -120,10 +123,7 @@ public boolean releasePermit(
final int cellId,
final int requestId) {

final AtomicInteger permitCounter =
this.permitsPerSegment
.getOrDefault(baseTransceiverStationId, new ConcurrentHashMap<>())
.getOrDefault(cellId, new AtomicInteger(0));
final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);

final int numberOfPermitsIfReleased = permitCounter.decrementAndGet();
if (numberOfPermitsIfReleased < 0) {
Expand All @@ -134,9 +134,70 @@ public boolean releasePermit(
this.permitRepository.releasePermit(
throttlingConfigId, clientId, baseTransceiverStationId, cellId, requestId);

if (this.highPrioPoolEnabled) {
this.permitReleasedNotifier.notifyPermitReleased(baseTransceiverStationId, cellId);
}

return numberOfReleasedPermits == 1;
}

private boolean isPermitAvailable(
final int baseTransceiverStationId,
final int cellId,
final int priority,
final int maxConcurrency) {
final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);

final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
if (numberOfPermitsIfGranted > maxConcurrency) {
permitCounter.decrementAndGet();

if (!this.highPrioPoolEnabled) {
return false;
}

if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) {
return false;
}

// Wait until permit is released
return this.waitUntilPermitIsAvailable(
baseTransceiverStationId, cellId, maxConcurrency, this.maxWaitForHighPrioInMs);
}
return true;
}

private boolean waitUntilPermitIsAvailable(
final int baseTransceiverStationId,
final int cellId,
final int maxConcurrency,
final int maxWaitForHighPrioInMs) {

final long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < maxWaitForHighPrioInMs) {
final boolean permitAvailable =
this.permitReleasedNotifier.waitForAvailablePermit(
baseTransceiverStationId, cellId, WAIT_TIME);
if (!permitAvailable) {
continue;
}
final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);
final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
if (numberOfPermitsIfGranted > maxConcurrency) {
permitCounter.decrementAndGet();
} else {
return true;
}
}
return false;
}

private AtomicInteger getPermitCounter(final int baseTransceiverStationId, final int cellId) {
return this.permitsPerSegment
.computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>())
.computeIfAbsent(cellId, key -> new AtomicInteger(0));
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
//
// SPDX-License-Identifier: Apache-2.0

package org.opensmartgridplatform.throttling.config;

import com.zaxxer.hikari.util.DriverDataSource;
import java.util.Properties;
import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ThrottlingConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(ThrottlingConfig.class);

@Value("${spring.datasource.url}")
private String jdbcUrl;

@Value("${spring.datasource.driver-class-name}")
private String databaseDriver;

@Value("${spring.datasource.username}")
private String databaseUsername;

@Value("${spring.datasource.password}")
private String databasePassword;

@Bean
public PermitReleasedNotifier permitReleasedNotifier() {
LOGGER.info("Created jdbcUrl {} for permitReleasedNotifier", this.jdbcUrl);
final DriverDataSource dataSource =
new DriverDataSource(
this.jdbcUrl,
this.databaseDriver,
new Properties(),
this.databaseUsername,
this.databasePassword);

return new PermitReleasedNotifier(dataSource);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
//
// SPDX-License-Identifier: Apache-2.0

package org.opensmartgridplatform.throttling.service;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;

@Slf4j
public class PermitReleasedNotifier {
private static final String PERMIT_RELEASED_CHANNEL = "permit_released";
private final DataSource dataSource;

public PermitReleasedNotifier(final DataSource dataSource) {
this.dataSource = dataSource;
}

public boolean waitForAvailablePermit(
final int baseTransceiverStationId, final int cellId, final int maxWaitInMs) {
Connection connection = null;
try {
connection = this.dataSource.getConnection();
final PGConnection pgConnection =
this.startListening(connection, baseTransceiverStationId, cellId);

final PGNotification[] notifications = pgConnection.getNotifications(maxWaitInMs);
if (notifications == null) {
log.info(
"Received no notifications for btsId: {}, cellId: {} within {} ms",
baseTransceiverStationId,
cellId,
maxWaitInMs);
return false;
}

log.debug(
"Received notification for btsId: {}, cellId: {}", baseTransceiverStationId, cellId);
} catch (final SQLException sqle) {
log.error(
"SQLException occurred while listening for notification for btsId: {}, cellId: {}",
baseTransceiverStationId,
cellId,
sqle);
return false;
} finally {
this.closeConnection(connection, baseTransceiverStationId, cellId);
}

return true;
}

public void notifyPermitReleased(final int baseTransceiverStationId, final int cellId) {
Connection connection = null;
try {
connection = this.dataSource.getConnection();

final String sqlStatement =
String.format("NOTIFY %s", this.getChannelName(baseTransceiverStationId, cellId));
try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) {
preparedStatement.executeUpdate();
}
} catch (final SQLException sqle) {
log.error(
"SQLException occurred while notify for btsId: {}, cellId: {}",
baseTransceiverStationId,
cellId,
sqle);
} finally {
this.closeConnection(connection, baseTransceiverStationId, cellId);
}
}

private PGConnection startListening(
final Connection connection, final int baseTransceiverStationId, final int cellId)
throws SQLException {

final PGConnection pgConnection = connection.unwrap(PGConnection.class);
final String sqlStatement =
String.format("LISTEN %s", this.getChannelName(baseTransceiverStationId, cellId));
try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) {
preparedStatement.executeUpdate();
}

return pgConnection;
}

private void closeConnection(
final Connection connection, final int baseTransceiverStationId, final int cellId) {
try {
if (connection != null) {
connection.close();
}
} catch (final SQLException e) {
log.error(
"SQLException occurred while trying to close the connection that is listening for notification for btsId: {}, cellId: {}",
baseTransceiverStationId,
cellId,
e);
}
}

private String getChannelName(final int baseTransceiverStationId, final int cellId) {
return (PERMIT_RELEASED_CHANNEL + "_" + baseTransceiverStationId + "_" + cellId)
.replace("-", "minus");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ scheduling.task.cleanup.permits.cron.expression=0 0/30 * * * ?
# Releasing expired permits will happen in batches of the following size.
cleanup.permits.batch.size=100

#max.wait.for.high.prio.in.ms=5000
# For now we disable this high prio pool by default
max.wait.for.high.prio.in.ms=0
wait.for.high.prio.enabled=true
wait.for.high.prio.max.in.ms=10000

# The task to reset in memory counters with db state is executed by cron expression.
scheduling.task.reinitialize.state.cron.expression=30 0/30 * * * ?
Loading

0 comments on commit af89ec6

Please sign in to comment.