Skip to content

Commit

Permalink
Merge pull request #1156 from OSGP/feature/throttling_bug_fix_develop
Browse files Browse the repository at this point in the history
Max wait for default
  • Loading branch information
kroesctrl authored Dec 20, 2023
2 parents b641891 + b904fe3 commit 1ac0a0b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.opensmartgridplatform.adapter.protocol.dlms.application.config.annotation.LocalThrottlingServiceCondition;
import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum;
import org.opensmartgridplatform.throttling.ThrottlingPermitDeniedException;
import org.opensmartgridplatform.throttling.api.Permit;
import org.slf4j.Logger;
Expand All @@ -31,15 +30,18 @@
@Component
@Conditional(LocalThrottlingServiceCondition.class)
public class LocalThrottlingServiceImpl implements ThrottlingService {

private static final long WAIT_FOR_LOCK = 10;

private final AtomicInteger requestIdCounter = new AtomicInteger(0);

private static final Logger LOGGER = LoggerFactory.getLogger(LocalThrottlingServiceImpl.class);

private final int maxOpenConnections;
private final int maxNewConnectionRequests;

@Value("${throttling.max.wait.high.prio}")
private int maxWaitForHighPrioInMs;
@Value("${throttling.max.wait.for.permit}")
private int maxWaitForPermitInMs;

@Value("${throttling.max.new.connection.reset.time}")
private int maxNewConnectionResetTime;
Expand Down Expand Up @@ -126,18 +128,7 @@ private void requestPermit(
LOGGER.debug("{}. available = {} ", permitDescription, semaphore.availablePermits());

try {
if (semaphore.availablePermits() == 0) {
if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) {
throw new ThrottlingPermitDeniedException(
permitDescription + ": no available permits", priority);
} else {
LOGGER.debug(
"{} wait for available permit for request with priority {}",
permitDescription,
priority);
}
}
if (!semaphore.tryAcquire(this.maxWaitForHighPrioInMs, TimeUnit.MILLISECONDS)) {
if (!semaphore.tryAcquire(this.maxWaitForPermitInMs, TimeUnit.MILLISECONDS)) {
throw new ThrottlingPermitDeniedException(
permitDescription + ": could not acquire permit for request with priority " + priority,
priority);
Expand All @@ -149,15 +140,17 @@ private void requestPermit(
}
}

private synchronized void awaitReset() {
private void awaitReset() {
LOGGER.debug(
"Await reset for newConnection. available = {} ",
this.newConnectionRequestsSemaphore.availablePermits());

while (this.resetTimerLock.isLocked()) {
try {
LOGGER.info("Wait {}ms while reset timer is locked", this.maxNewConnectionResetTime);
this.resetTimerLock.wait(this.maxNewConnectionResetTime);
LOGGER.info("Wait {}ms while reset timer is locked", WAIT_FOR_LOCK);
synchronized (this.requestIdCounter) {
this.requestIdCounter.wait(WAIT_FOR_LOCK);
}
} catch (final InterruptedException e) {
LOGGER.warn("Unable to acquire New Connection Request Lock", e);
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ throttling.rejected.max.delay=PT70S
throttling.rejected.high.prio.delay=PT2S
# Configuration for the throttling service on the GPRS network
throttling.max.open.connections=1000
throttling.max.wait.high.prio=10000
throttling.max.wait.for.permit=60000
throttling.max.new.connection.reset.time=1000
throttling.max.new.connection.requests=30

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensmartgridplatform.shared.wsheaderattribute.priority.MessagePriorityEnum;
import org.opensmartgridplatform.throttling.ThrottlingPermitDeniedException;
import org.opensmartgridplatform.throttling.api.Permit;
import org.slf4j.Logger;
Expand All @@ -37,7 +36,7 @@ class LocalThrottlingServiceImplTest {
private static final Integer MAX_NEW_CONNECTION_REQUESTS = 10;
private static final Integer MAX_OPEN_CONNECTIONS = MAX_NEW_CONNECTION_REQUESTS * 2;
private static final Integer MAX_NEW_CONNECTION_RESET_TIME = 200;
private static final Integer MAX_WAIT_FOR_HIGH_PRIO = 500;
private static final Integer MAX_WAIT_FOR_PERMIT = 500;
private static final Integer CLEANUP_PERMITS_INTERVAL = 200;
private static final Duration PERMIT_TTL = Duration.of(2, ChronoUnit.SECONDS);

Expand All @@ -50,7 +49,7 @@ void setUp() {
ReflectionTestUtils.setField(
this.throttlingService, "maxNewConnectionResetTime", MAX_NEW_CONNECTION_RESET_TIME);
ReflectionTestUtils.setField(
this.throttlingService, "maxWaitForHighPrioInMs", MAX_WAIT_FOR_HIGH_PRIO);
this.throttlingService, "maxWaitForPermitInMs", MAX_WAIT_FOR_PERMIT);
ReflectionTestUtils.setField(
this.throttlingService, "cleanupExpiredPermitsInterval", CLEANUP_PERMITS_INTERVAL);
ReflectionTestUtils.setField(this.throttlingService, "timeToLive", PERMIT_TTL);
Expand All @@ -60,6 +59,8 @@ void setUp() {
@ParameterizedTest
@ValueSource(ints = {4, 5})
void testThrottlingOpenConnections(final int priority) throws InterruptedException {
ReflectionTestUtils.setField(this.throttlingService, "maxWaitForPermitInMs", 0);

// Claim 10
final List<Permit> firstBatch = this.requestPermitLowPrio(MAX_NEW_CONNECTION_REQUESTS);
// Sleep longer than reset time
Expand Down Expand Up @@ -93,19 +94,10 @@ void testThrottlingMaxNewConnections(final int priority) {
final List<Permit> permits = this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority);
this.assertAvailableNewConnections(0);

this.releasePermitWithDelay(permits, MAX_WAIT_FOR_HIGH_PRIO / 2);
this.releasePermitWithDelay(permits, MAX_WAIT_FOR_PERMIT / 2);

final int nrOfOpenConnections;
if (priority <= MessagePriorityEnum.DEFAULT.getPriority()) {
assertThrows(
ThrottlingPermitDeniedException.class,
() -> this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority));
nrOfOpenConnections = MAX_NEW_CONNECTION_REQUESTS;
} else {
// high prio will wait for connection
this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority);
nrOfOpenConnections = MAX_NEW_CONNECTION_REQUESTS * 2;
}
this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority);
final int nrOfOpenConnections = MAX_NEW_CONNECTION_REQUESTS * 2;

this.assertPermitsInMemory(nrOfOpenConnections);
this.assertAvailablePermits(MAX_OPEN_CONNECTIONS - nrOfOpenConnections);
Expand Down Expand Up @@ -154,6 +146,43 @@ void testCleanupPermit() throws InterruptedException {
this.assertAvailablePermits(MAX_OPEN_CONNECTIONS - 1);
}

@ParameterizedTest
@ValueSource(ints = {0, 600, 1200})
void testPermitMaxWait(final int releaseDelay) throws InterruptedException {
final int priority = 4;
// Claim 10
final List<Permit> firstBatch = this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority);
// Sleep longer than reset time
Thread.sleep(MAX_NEW_CONNECTION_RESET_TIME + 100);
// Next 10
this.requestPermit(MAX_NEW_CONNECTION_REQUESTS, priority);
// Sleep longer than reset time
Thread.sleep(MAX_NEW_CONNECTION_RESET_TIME + 100);

final long startTime = System.currentTimeMillis();
if (releaseDelay == 0) {
this.releasePermit(List.of(firstBatch.get(0)));
this.assertPermitsInMemory(MAX_OPEN_CONNECTIONS - 1);
this.assertAvailablePermits(1);
} else {
this.releasePermitWithDelay(List.of(firstBatch.get(0)), releaseDelay);
this.assertPermitsInMemory(MAX_OPEN_CONNECTIONS);
this.assertAvailablePermits(0);
}

if (releaseDelay > MAX_WAIT_FOR_PERMIT) {
assertThrows(ThrottlingPermitDeniedException.class, () -> this.requestPermit(1, priority));
assertThat(System.currentTimeMillis() - startTime)
.isGreaterThanOrEqualTo(MAX_WAIT_FOR_PERMIT);
} else {
this.requestPermit(1, priority);
assertThat(System.currentTimeMillis() - startTime).isGreaterThanOrEqualTo(releaseDelay);
}

this.assertPermitsInMemory(MAX_OPEN_CONNECTIONS);
this.assertAvailablePermits(0);
}

private List<Permit> requestPermitLowPrio(final int requests) {
return this.requestPermit(requests, 3);
}
Expand Down

0 comments on commit 1ac0a0b

Please sign in to comment.