Skip to content

Commit

Permalink
fix(deps): fix race condition in reacting to state changes for depend…
Browse files Browse the repository at this point in the history
…encies (#1237)
  • Loading branch information
shaguptashaikh committed Jun 21, 2022
1 parent 642ce16 commit da82889
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
package com.aws.greengrass.integrationtests.deployment;

import com.aws.greengrass.config.PlatformResolver;
import com.aws.greengrass.config.Topic;
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.config.WhatHappened;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.deployment.DeploymentConfigMerger;
import com.aws.greengrass.deployment.model.ComponentUpdatePolicy;
Expand All @@ -31,7 +29,6 @@
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import com.aws.greengrass.testcommons.testutilities.NoOpPathOwnershipHandler;
import com.aws.greengrass.testcommons.testutilities.TestUtils;
import com.aws.greengrass.util.Coerce;
import org.apache.commons.lang3.SystemUtils;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -451,8 +448,8 @@ void GIVEN_kernel_running_services_WHEN_merge_removes_service_THEN_removed_servi
kernel.launch();

CountDownLatch mainRunningLatch = new CountDownLatch(1);
kernel.getMain().addStateSubscriber((WhatHappened what, Topic t) -> {
if (Coerce.toEnum(State.class, t).isRunning()) {
kernel.getContext().addGlobalStateChangeListener((service, oldState, newState) -> {
if (kernel.getMain().equals(service) && newState.isRunning()) {
mainRunningLatch.countDown();
}
});
Expand Down Expand Up @@ -515,8 +512,8 @@ void GIVEN_a_running_service_is_not_disruptable_WHEN_deployed_THEN_deployment_wa
kernel.launch();

CountDownLatch mainFinished = new CountDownLatch(1);
kernel.getMain().addStateSubscriber((WhatHappened what, Topic t) -> {
if (Coerce.toEnum(State.class, t).equals(State.FINISHED)) {
kernel.getContext().addGlobalStateChangeListener((service, oldState, newState) -> {
if (kernel.getMain().equals(service) && State.FINISHED.equals(newState)) {
mainFinished.countDown();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.aws.greengrass.logging.impl.GreengrassLogMessage;
import com.aws.greengrass.logging.impl.Slf4jLogAdapter;
import com.aws.greengrass.testcommons.testutilities.NoOpPathOwnershipHandler;
import com.aws.greengrass.util.Coerce;
import com.aws.greengrass.util.platforms.unix.linux.Cgroup;
import com.aws.greengrass.util.platforms.unix.linux.LinuxSystemResourceController;
import org.apache.commons.lang3.SystemUtils;
Expand Down Expand Up @@ -596,8 +595,8 @@ void GIVEN_running_service_WHEN_pause_resume_requested_THEN_pause_resume_Service
kernel.launch();

CountDownLatch mainRunningLatch = new CountDownLatch(1);
kernel.getMain().addStateSubscriber((WhatHappened what, Topic t) -> {
if (Coerce.toEnum(State.class, t).isRunning()) {
kernel.getContext().addGlobalStateChangeListener((service, oldState, newState) -> {
if (kernel.getMain().equals(service) && newState.isRunning()) {
mainRunningLatch.countDown();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@

package com.aws.greengrass.integrationtests.lifecyclemanager;

import com.aws.greengrass.config.Topic;
import com.aws.greengrass.config.WhatHappened;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.integrationtests.BaseITCase;
import com.aws.greengrass.integrationtests.util.ConfigPlatformResolver;
import com.aws.greengrass.lifecyclemanager.Kernel;
import com.aws.greengrass.util.Coerce;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -59,8 +55,8 @@ void WHEN_kernel_shutdown_THEN_services_are_shutdown_in_reverse_dependency_order
});

CountDownLatch mainRunningLatch = new CountDownLatch(1);
kernel.getMain().addStateSubscriber((WhatHappened what, Topic t) -> {
if (Coerce.toEnum(State.class, t).isRunning()) {
kernel.getContext().addGlobalStateChangeListener((service, oldState, newState) -> {
if (kernel.getMain().equals(service) && newState.isRunning()) {
mainRunningLatch.countDown();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package com.aws.greengrass.lifecyclemanager;

import com.amazon.aws.iot.greengrass.component.common.DependencyType;
import com.aws.greengrass.config.Subscriber;
import com.aws.greengrass.config.Topic;
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.config.WhatHappened;
Expand Down Expand Up @@ -346,7 +345,7 @@ protected CompletableFuture<Void> close(boolean waitForDependers) {
logger.atInfo("service-close").log("Service is now closing");
// removing listeners on dependencies
dependencies.forEach((service, dependencyInfo) ->
service.removeStateSubscriber(dependencyInfo.stateTopicSubscriber));
getContext().removeGlobalStateChangeListener(dependencyInfo.stateListener));
try {
Periodicity t = periodicityInformation;
if (t != null) {
Expand Down Expand Up @@ -377,42 +376,41 @@ protected CompletableFuture<Void> close(boolean waitForDependers) {
/**
* Add a dependency.
*
* @param dependentGreengrassService the service to add as a dependency.
* @param dependencyType type of the dependency.
* @param isDefault True if the dependency is added without explicit declaration in 'dependencies'
* Topic.
* @param dependencyService the service to add as a dependency.
* @param dependencyType type of the dependency.
* @param isDefault True if the dependency is added without explicit declaration in 'dependencies' Topic.
* @throws InputValidationException if the provided arguments are invalid.
*/
public void addOrUpdateDependency(GreengrassService dependentGreengrassService,
DependencyType dependencyType, boolean isDefault)
public void addOrUpdateDependency(GreengrassService dependencyService, DependencyType dependencyType,
boolean isDefault)
throws InputValidationException {
if (dependentGreengrassService == null || dependencyType == null) {
if (dependencyService == null || dependencyType == null) {
throw new InputValidationException("One or more parameters was null");
}

synchronized (dependencies) {
dependencies.compute(dependentGreengrassService, (dependentService, dependencyInfo) -> {
dependencies.compute(dependencyService, (dependentService, dependencyInfo) -> {
// If the dependency already exists, we should first remove the subscriber before creating the
// new subscriber with updated input.
if (dependencyInfo != null) {
dependentGreengrassService.removeStateSubscriber(dependencyInfo.stateTopicSubscriber);
getContext().removeGlobalStateChangeListener(dependencyInfo.stateListener);
}
Subscriber subscriber = createDependencySubscriber(dependentGreengrassService, dependencyType);
dependentGreengrassService.addStateSubscriber(subscriber);
GlobalStateChangeListener listener = createDependencyListener(dependencyService, dependencyType);
getContext().addGlobalStateChangeListener(listener);
context.get(Kernel.class).clearODcache();
return new DependencyInfo(dependencyType, isDefault, subscriber);
return new DependencyInfo(dependencyType, isDefault, listener);
});
}
}

private Subscriber createDependencySubscriber(GreengrassService dependentGreengrassService,
DependencyType dependencyType) {
return (WhatHappened what, Topic t) -> {
if ((State.STARTING.equals(getState()) || State.RUNNING.equals(getState())) && !dependencyReady(
dependentGreengrassService, dependencyType)) {
private GlobalStateChangeListener createDependencyListener(GreengrassService dependencyService,
DependencyType dependencyType) {
return (service, oldState, newState) -> {
if (service.equals(dependencyService) && (State.STARTING.equals(getState()) || State.RUNNING.equals(
getState())) && !dependencyReady(dependencyService, dependencyType)) {
requestRestart();
logger.atInfo("service-restart").log("Restarting service because dependency {} was in a bad state",
dependentGreengrassService.getName());
dependencyService.getName());
}
synchronized (dependencyReadyLock) {
if (dependencyReady()) {
Expand All @@ -435,44 +433,41 @@ private List<GreengrassService> getHardDependers() {
return dependers;
}

public void addStateSubscriber(Subscriber s) {
lifecycle.getStateTopic().subscribe(s);
}

public void removeStateSubscriber(Subscriber s) {
lifecycle.getStateTopic().remove(s);
}

private void waitForDependersToExit() throws InterruptedException {

List<GreengrassService> dependers = getHardDependers();
Subscriber dependerExitWatcher = (WhatHappened what, Topic t) -> {
synchronized (dependersExitedLock) {
if (dependersExited(dependers)) {
dependersExitedLock.notifyAll();
}
}
};
List<GlobalStateChangeListener> watchers = new ArrayList<>();

// subscribing to depender state changes
dependers.forEach(dependerGreengrassService ->
dependerGreengrassService.addStateSubscriber(dependerExitWatcher));
dependers.forEach(dependerGreengrassService -> {
GlobalStateChangeListener dependerExitWatcher = (service, oldState, newState) -> {
if (service.equals(dependerGreengrassService)) {
synchronized (dependersExitedLock) {
if (dependersExited(dependers)) {
dependersExitedLock.notifyAll();
}
}
}
};
getContext().addGlobalStateChangeListener(dependerExitWatcher);
watchers.add(dependerExitWatcher);
});

synchronized (dependersExitedLock) {
while (!dependersExited(dependers)) {
logger.atDebug("service-waiting-for-depender-to-finish").log();
logger.atDebug("service-waiting-for-dependent-to-finish").log();
dependersExitedLock.wait();
}
}
// removing state change watchers
dependers.forEach(
dependerGreengrassService -> dependerGreengrassService.removeStateSubscriber(dependerExitWatcher));
watchers.forEach(w -> getContext().removeGlobalStateChangeListener(w));
}

private boolean dependersExited(List<GreengrassService> dependers) {
Optional<GreengrassService> dependerService =
dependers.stream().filter(d -> !d.getState().isClosable()).findAny();
if (dependerService.isPresent()) {
logger.atDebug("continue-waiting-for-dependencies").kv("waitingFor", dependerService.get().getName()).log();
logger.atDebug("continue-waiting-for-dependents").kv("waitingFor", dependerService.get().getName()).log();
return false;
}
return true;
Expand Down Expand Up @@ -607,7 +602,7 @@ private void setupDependencies(Collection<String> dependencyList)

removedDependencies.forEach(dependency -> {
DependencyInfo dependencyInfo = dependencies.remove(dependency);
dependency.removeStateSubscriber(dependencyInfo.stateTopicSubscriber);
getContext().removeGlobalStateChangeListener(dependencyInfo.stateListener);
});
context.get(Kernel.class).clearODcache();
}
Expand Down Expand Up @@ -699,6 +694,6 @@ protected static class DependencyInfo {
DependencyType dependencyType;
// true if the dependency isn't explicitly declared in config
boolean isDefaultDependency;
Subscriber stateTopicSubscriber;
GlobalStateChangeListener stateListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package com.aws.greengrass.lifecyclemanager;

import com.amazon.aws.iot.greengrass.component.common.DependencyType;
import com.aws.greengrass.config.Subscriber;
import com.aws.greengrass.lifecyclemanager.exceptions.InputValidationException;
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
import com.aws.greengrass.testcommons.testutilities.GGServiceTestUtil;
Expand Down Expand Up @@ -57,7 +56,7 @@ void GIVEN_dependency_exist_WHEN_dependency_is_updated_THEN_update_successful()
GreengrassService dep1 = mock(GreengrassService.class);

greengrassService.addOrUpdateDependency(dep1, DependencyType.SOFT, false);
verify(dep1).addStateSubscriber(any(Subscriber.class));
verify(context).addGlobalStateChangeListener(any(GlobalStateChangeListener.class));

Map<GreengrassService, DependencyType> dependencies = greengrassService.getDependencies();
assertEquals(1, dependencies.size());
Expand All @@ -71,7 +70,7 @@ void GIVEN_dependency_exist_WHEN_dependency_is_updated_THEN_update_successful()
assertEquals(1, dependencies.size());
assertEquals(DependencyType.HARD, dependencies.get(dep1));
// Remove the previous subscriber.
verify(dep1).removeStateSubscriber(any(Subscriber.class));
verify(context).removeGlobalStateChangeListener(any(GlobalStateChangeListener.class));
}

@Test
Expand Down

0 comments on commit da82889

Please sign in to comment.