Skip to content

Commit

Permalink
Merge pull request #1175 from OSGP/feature/remove_high_prio_handling_…
Browse files Browse the repository at this point in the history
…throttling_service

Revert code base of handling high-prio requests
  • Loading branch information
Marcel-Jansen authored Feb 16, 2024
2 parents 9511057 + 2aac6ab commit e16ca5f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
import org.opensmartgridplatform.throttling.repositories.PermitRepository.PermitCountByNetworkSegment;
import org.slf4j.Logger;
Expand All @@ -19,10 +18,6 @@
public class PermitsPerNetworkSegment {
private static final Logger LOGGER = LoggerFactory.getLogger(PermitsPerNetworkSegment.class);

private static final ConcurrentMap<Integer, AtomicInteger> NO_PERMITS_FOR_STATION =
new ConcurrentHashMap<>();
private static final AtomicInteger NO_PERMITS_FOR_CELL = new AtomicInteger(0);

private final ConcurrentMap<Integer, ConcurrentMap<Integer, AtomicInteger>> permitsPerSegment =
new ConcurrentHashMap<>();

Expand Down Expand Up @@ -103,7 +98,14 @@ public boolean requestPermit(
final int priority,
final int maxConcurrency) {

if (!this.isPermitAvailable(baseTransceiverStationId, cellId, priority, maxConcurrency)) {
final AtomicInteger permitCounter =
this.permitsPerSegment
.computeIfAbsent(baseTransceiverStationId, key -> new ConcurrentHashMap<>())
.computeIfAbsent(cellId, key -> new AtomicInteger(0));

final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
if (numberOfPermitsIfGranted > maxConcurrency) {
permitCounter.decrementAndGet();
return false;
}

Expand All @@ -118,7 +120,10 @@ public boolean releasePermit(
final int cellId,
final int requestId) {

final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);
final AtomicInteger permitCounter =
this.permitsPerSegment
.getOrDefault(baseTransceiverStationId, new ConcurrentHashMap<>())
.getOrDefault(cellId, new AtomicInteger(0));

final int numberOfPermitsIfReleased = permitCounter.decrementAndGet();
if (numberOfPermitsIfReleased < 0) {
Expand All @@ -129,80 +134,9 @@ public boolean releasePermit(
this.permitRepository.releasePermit(
throttlingConfigId, clientId, baseTransceiverStationId, cellId, requestId);

if (this.useHighPrioPool()) {
// Notify that permit is released
synchronized (permitCounter) {
permitCounter.notifyAll();
}
}

return numberOfReleasedPermits == 1;
}

private boolean useHighPrioPool() {
return this.maxWaitForHighPrioInMs != 0;
}

private boolean isPermitAvailable(
final int baseTransceiverStationId,
final int cellId,
final int priority,
final int maxConcurrency) {
final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);

final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
if (numberOfPermitsIfGranted > maxConcurrency) {
permitCounter.decrementAndGet();

if (!this.useHighPrioPool()) {
return false;
}

if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) {
return false;
}

// Wait until permit is released
return this.waitUntilPermitIsAvailable(
baseTransceiverStationId, cellId, maxConcurrency, this.maxWaitForHighPrioInMs);
}
return true;
}

private boolean waitUntilPermitIsAvailable(
final int baseTransceiverStationId,
final int cellId,
final int maxConcurrency,
final int maxWaitForHighPrioInMs) {
final AtomicInteger permitCounter = this.getPermitCounter(baseTransceiverStationId, cellId);

synchronized (permitCounter) {
try {
final long startTime = System.currentTimeMillis();
final int wait = 10;
while (System.currentTimeMillis() - startTime < maxWaitForHighPrioInMs) {
permitCounter.wait(wait);

final int numberOfPermitsIfGranted = permitCounter.incrementAndGet();
if (numberOfPermitsIfGranted > maxConcurrency) {
permitCounter.decrementAndGet();
} else {
return true;
}
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return false;
}

private AtomicInteger getPermitCounter(final int baseTransceiverStationId, final int cellId) {
return this.permitsPerSegment
.computeIfAbsent(baseTransceiverStationId, key -> NO_PERMITS_FOR_STATION)
.computeIfAbsent(cellId, key -> NO_PERMITS_FOR_CELL);
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,13 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensmartgridplatform.throttling.repositories.PermitRepository;
Expand Down Expand Up @@ -136,78 +130,6 @@ void testInitializeDelete() {
assertThat(this.permitsPerNetworkSegment.permitsPerNetworkSegment()).isEmpty();
}

@ParameterizedTest
@ValueSource(ints = {0, 2000})
void testHighPrioPoolTime(final int maxWaitForHighPrio) {
this.permitsPerNetworkSegment =
new PermitsPerNetworkSegment(this.permitRepository, maxWaitForHighPrio);

final int btsId = 1;
final int cellId = 2;
final int numberOfPermits = 3;
final short throttlingConfigId = Integer.valueOf(1).shortValue();
final int clientId = 4;
final int requestId = 5;
final int priority = 6;
final int maxConcurrency = numberOfPermits;

this.preparePermits(btsId, cellId, numberOfPermits, throttlingConfigId);

this.permitsPerNetworkSegment.initialize(throttlingConfigId);

final long start = System.currentTimeMillis();
final boolean permitGranted =
this.permitsPerNetworkSegment.requestPermit(
throttlingConfigId, clientId, btsId, cellId, requestId, priority, maxConcurrency);
assertThat(permitGranted).isFalse();
assertThat(System.currentTimeMillis() - start).isGreaterThanOrEqualTo(maxWaitForHighPrio);

verify(this.permitRepository, never())
.grantPermit(throttlingConfigId, clientId, btsId, cellId, requestId);
}

@Test
void testHighPrioPool() {
final int maxWaitForHighPrio = 10000;
final int waitBeforeRelease = 1000;
this.permitsPerNetworkSegment =
new PermitsPerNetworkSegment(this.permitRepository, maxWaitForHighPrio);

final int btsId = 1;
final int cellId = 2;
final int numberOfPermits = 3;
final short throttlingConfigId = Integer.valueOf(1).shortValue();
final int clientId = 4;
final int requestId = 5;
final int priority = 6;
final int maxConcurrency = numberOfPermits;

this.preparePermits(btsId, cellId, numberOfPermits, throttlingConfigId);

when(this.permitRepository.grantPermit(throttlingConfigId, clientId, btsId, cellId, requestId))
.thenReturn(true);

this.permitsPerNetworkSegment.initialize(throttlingConfigId);

final long start = System.currentTimeMillis();

final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
executor.schedule(
() -> {
PermitsPerNetworkSegmentTest.this.permitsPerNetworkSegment.releasePermit(
throttlingConfigId, clientId, btsId, cellId, requestId);
},
waitBeforeRelease,
TimeUnit.MILLISECONDS);

final boolean permitGranted =
this.permitsPerNetworkSegment.requestPermit(
throttlingConfigId, clientId, btsId, cellId, requestId, priority, maxConcurrency);
assertThat(permitGranted).isTrue();
assertThat((int) (System.currentTimeMillis() - start))
.isBetween(waitBeforeRelease, maxWaitForHighPrio);
}

private void preparePermits(
final int btsId,
final int cellId,
Expand Down

0 comments on commit e16ca5f

Please sign in to comment.