Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pool for high prio requests based on db events #1176

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion osgp/platform/osgp-throttling-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ SPDX-License-Identifier: Apache-2.0
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensmartgridplatform.throttling.entities.ThrottlingConfig;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
import org.opensmartgridplatform.throttling.repositories.ThrottlingConfigRepository;
import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -28,15 +29,18 @@ public class PermitsByThrottlingConfig {

private final ThrottlingConfigRepository throttlingConfigRepository;
private final PermitRepository permitRepository;
private final PermitReleasedNotifier permitReleasedNotifier;
private final int maxWaitForHighPrioInMs;

public PermitsByThrottlingConfig(
final ThrottlingConfigRepository throttlingConfigRepository,
final PermitRepository permitRepository,
final PermitReleasedNotifier permitReleasedNotifier,
@Value("${max.wait.for.high.prio.in.ms:5000}") final int maxWaitForHighPrioInMs) {

this.throttlingConfigRepository = throttlingConfigRepository;
this.permitRepository = permitRepository;
this.permitReleasedNotifier = permitReleasedNotifier;
this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs;
}

Expand All @@ -54,7 +58,10 @@ public void initialize() {
throttlingConfigId ->
this.permitsPerSegmentByConfig.putIfAbsent(
throttlingConfigId,
new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs)));
new PermitsPerNetworkSegment(
this.permitRepository,
this.permitReleasedNotifier,
this.maxWaitForHighPrioInMs)));

/* Update config */
this.permitsPerSegmentByConfig.entrySet().parallelStream()
Expand Down Expand Up @@ -100,7 +107,8 @@ public boolean requestPermit(

private PermitsPerNetworkSegment createAndInitialize(final short throttlingConfigId) {
final PermitsPerNetworkSegment permitsPerNetworkSegment =
new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs);
new PermitsPerNetworkSegment(
this.permitRepository, this.permitReleasedNotifier, this.maxWaitForHighPrioInMs);
permitsPerNetworkSegment.initialize(throttlingConfigId);
return permitsPerNetworkSegment;
}
Expand All @@ -113,7 +121,8 @@ public void newThrottlingConfigCreated(final short throttlingConfigId) {
*/
this.permitsPerSegmentByConfig.putIfAbsent(
throttlingConfigId,
new PermitsPerNetworkSegment(this.permitRepository, this.maxWaitForHighPrioInMs));
new PermitsPerNetworkSegment(
this.permitRepository, this.permitReleasedNotifier, this.maxWaitForHighPrioInMs));
}

public boolean releasePermit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,31 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
import org.opensmartgridplatform.throttling.repositories.PermitRepository.PermitCountByNetworkSegment;
import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PermitsPerNetworkSegment {
private static final Logger LOGGER = LoggerFactory.getLogger(PermitsPerNetworkSegment.class);

private static final int WAIT_TIME = 1000;

private final ConcurrentMap<Integer, ConcurrentMap<Integer, AtomicInteger>> permitsPerSegment =
new ConcurrentHashMap<>();

private final PermitRepository permitRepository;
private final PermitReleasedNotifier permitReleasedNotifier;
private final int maxWaitForHighPrioInMs;

public PermitsPerNetworkSegment(
final PermitRepository permitRepository, final int maxWaitForHighPrioInMs) {
final PermitRepository permitRepository,
final PermitReleasedNotifier permitReleasedNotifier,
final int maxWaitForHighPrioInMs) {
this.permitRepository = permitRepository;
this.permitReleasedNotifier = permitReleasedNotifier;
this.maxWaitForHighPrioInMs = maxWaitForHighPrioInMs;
}

Expand Down Expand Up @@ -97,15 +105,7 @@ public boolean requestPermit(
final int requestId,
final int priority,
final int maxConcurrency) {

final AtomicInteger permitCounter =
this.permitsPerSegment
.computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>())
.computeIfAbsent(cellId, key -> new AtomicInteger(0));

final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
if (numberOfPermitsIfGranted > maxConcurrency) {
permitCounter.decrementAndGet();
if (!this.isPermitAvailable(baseTransceiverStationId, cellId, priority, maxConcurrency)) {
return false;
}

Expand All @@ -120,10 +120,7 @@ public boolean releasePermit(
final int cellId,
final int requestId) {

final AtomicInteger permitCounter =
this.permitsPerSegment
.getOrDefault(baseTransceiverStationId, new ConcurrentHashMap<>())
.getOrDefault(cellId, new AtomicInteger(0));
final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);

final int numberOfPermitsIfReleased = permitCounter.decrementAndGet();
if (numberOfPermitsIfReleased < 0) {
Expand All @@ -134,9 +131,74 @@ public boolean releasePermit(
this.permitRepository.releasePermit(
throttlingConfigId, clientId, baseTransceiverStationId, cellId, requestId);

if (this.useHighPrioPool()) {
this.permitReleasedNotifier.notifyPermitReleased(baseTransceiverStationId, cellId);
}

return numberOfReleasedPermits == 1;
}

private boolean useHighPrioPool() {
return this.maxWaitForHighPrioInMs != 0;
}

private boolean isPermitAvailable(
final int baseTransceiverStationId,
final int cellId,
final int priority,
final int maxConcurrency) {
final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);

final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
if (numberOfPermitsIfGranted > maxConcurrency) {
permitCounter.decrementAndGet();

if (!this.useHighPrioPool()) {
return false;
}

if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) {
return false;
}

// Wait until permit is released
return this.waitUntilPermitIsAvailable(
baseTransceiverStationId, cellId, maxConcurrency, this.maxWaitForHighPrioInMs);
}
return true;
}

private boolean waitUntilPermitIsAvailable(
final int baseTransceiverStationId,
final int cellId,
final int maxConcurrency,
final int maxWaitForHighPrioInMs) {

final long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < maxWaitForHighPrioInMs) {
final boolean permitAvailable =
this.permitReleasedNotifier.waitForAvailablePermit(
baseTransceiverStationId, cellId, WAIT_TIME);
if (!permitAvailable) {
continue;
}
final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);
final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
if (numberOfPermitsIfGranted > maxConcurrency) {
permitCounter.decrementAndGet();
} else {
return true;
}
}
return false;
}

private AtomicInteger getPermitCounter(final int baseTransceiverStationId, final int cellId) {
return this.permitsPerSegment
.computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>())
.computeIfAbsent(cellId, key -> new AtomicInteger(0));
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
//
// SPDX-License-Identifier: Apache-2.0

package org.opensmartgridplatform.throttling.config;

import com.zaxxer.hikari.util.DriverDataSource;
import java.util.Properties;
import org.opensmartgridplatform.throttling.service.PermitReleasedNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ThrottlingConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(ThrottlingConfig.class);

@Value("${spring.datasource.url}")
private String jdbcUrl;

@Value("${spring.datasource.driver-class-name}")
private String databaseDriver;

@Value("${spring.datasource.username}")
private String databaseUsername;

@Value("${spring.datasource.password}")
private String databasePassword;

@Bean
public PermitReleasedNotifier permitReleasedNotifier() {
LOGGER.info("Created jdbcUrl {} for permitReleasedNotifier", this.jdbcUrl);
final DriverDataSource dataSource =
new DriverDataSource(
this.jdbcUrl,
this.databaseDriver,
new Properties(),
this.databaseUsername,
this.databasePassword);

return new PermitReleasedNotifier(dataSource);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// SPDX-FileCopyrightText: Copyright Contributors to the GXF project
//
// SPDX-License-Identifier: Apache-2.0

package org.opensmartgridplatform.throttling.service;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;

@Slf4j
public class PermitReleasedNotifier {
private static final String PERMIT_RELEASED_CHANNEL = "permit_released";
private final DataSource dataSource;

public PermitReleasedNotifier(final DataSource dataSource) {
this.dataSource = dataSource;
}

public boolean waitForAvailablePermit(
final int baseTransceiverStationId, final int cellId, final int maxWaitInMs) {
Connection connection = null;
try {
connection = this.dataSource.getConnection();
final PGConnection pgConnection =
this.startListening(connection, baseTransceiverStationId, cellId);

final PGNotification[] notifications = pgConnection.getNotifications(maxWaitInMs);
if (notifications == null) {
log.info(
"Received no notifications for btsId: {}, cellId: {} within {} ms",
baseTransceiverStationId,
cellId,
maxWaitInMs);
return false;
}

log.debug(
"Received notification for btsId: {}, cellId: {}", baseTransceiverStationId, cellId);
} catch (final SQLException sqle) {
log.error(
"SQLException occurred while listening for notification for btsId: {}, cellId: {}",
baseTransceiverStationId,
cellId,
sqle);
return false;
} finally {
this.closeConnection(connection, baseTransceiverStationId, cellId);
}

return true;
}

public void notifyPermitReleased(final int baseTransceiverStationId, final int cellId) {
Connection connection = null;
try {
connection = this.dataSource.getConnection();

final String sqlStatement =
String.format("NOTIFY %s", this.getChannelName(baseTransceiverStationId, cellId));
try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) {
preparedStatement.executeUpdate();
}
} catch (final SQLException sqle) {
log.error(
"SQLException occurred while notify for btsId: {}, cellId: {}",
baseTransceiverStationId,
cellId,
sqle);
} finally {
this.closeConnection(connection, baseTransceiverStationId, cellId);
}
}

private PGConnection startListening(
final Connection connection, final int baseTransceiverStationId, final int cellId)
throws SQLException {

final PGConnection pgConnection = connection.unwrap(PGConnection.class);
final String sqlStatement =
String.format("LISTEN %s", this.getChannelName(baseTransceiverStationId, cellId));
try (final PreparedStatement preparedStatement = connection.prepareStatement(sqlStatement)) {
preparedStatement.executeUpdate();
}

return pgConnection;
}

private void closeConnection(
final Connection connection, final int baseTransceiverStationId, final int cellId) {
try {
if (connection != null) {
connection.close();
}
} catch (final SQLException e) {
log.error(
"SQLException occurred while listening for notification for btsId: {}, cellId: {}",
baseTransceiverStationId,
cellId,
e);
}
}

private String getChannelName(final int baseTransceiverStationId, final int cellId) {
return (PERMIT_RELEASED_CHANNEL + "_" + baseTransceiverStationId + "_" + cellId)
.replace("-", "minus");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ scheduling.task.cleanup.permits.cron.expression=0 0/30 * * * ?
# Releasing expired permits will happen in batches of the following size.
cleanup.permits.batch.size=100

#max.wait.for.high.prio.in.ms=5000
# For now we disable this high prio pool by default
max.wait.for.high.prio.in.ms=0
max.wait.for.high.prio.in.ms=10000

# The task to reset in memory counters with db state is executed by cron expression.
scheduling.task.reinitialize.state.cron.expression=30 0/30 * * * ?
Loading
Loading