Skip to content

Commit

Permalink
feat: add nucleus connectivity validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tianci Shen committed May 9, 2024
1 parent cdf1ce9 commit 6a50d3d
Show file tree
Hide file tree
Showing 17 changed files with 731 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.deployment.DeploymentConfigMerger;
import com.aws.greengrass.deployment.DeviceConfiguration;
import com.aws.greengrass.deployment.model.ComponentUpdatePolicy;
import com.aws.greengrass.deployment.model.Deployment;
import com.aws.greengrass.deployment.model.DeploymentDocument;
Expand All @@ -25,10 +26,15 @@
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.GreengrassLogMessage;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.mqttclient.MqttClient;
import com.aws.greengrass.mqttclient.PublishRequest;
import com.aws.greengrass.status.FleetStatusService;
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import com.aws.greengrass.testcommons.testutilities.NoOpPathOwnershipHandler;
import com.aws.greengrass.testcommons.testutilities.TestUtils;
import com.aws.greengrass.util.Coerce;
import com.aws.greengrass.util.GreengrassServiceClientFactory;
import com.aws.greengrass.util.Pair;
import org.apache.commons.lang3.SystemUtils;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -37,16 +43,26 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.mockito.Mock;
import org.slf4j.event.Level;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.model.ComponentUpdatePolicyEvents;
import software.amazon.awssdk.aws.greengrass.model.DeferComponentUpdateRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToComponentUpdatesRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToComponentUpdatesResponse;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt.MqttException;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder;
import software.amazon.awssdk.services.greengrassv2.model.DeploymentConfigurationValidationPolicy;
import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClient;
import software.amazon.awssdk.services.greengrassv2data.model.GreengrassV2DataException;
import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceRequest;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -76,6 +92,7 @@
import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICE_DEPENDENCIES_NAMESPACE_TOPIC;
import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICE_LIFECYCLE_NAMESPACE_TOPIC;
import static com.aws.greengrass.lifecyclemanager.GreengrassService.SETENV_CONFIG_NAMESPACE;
import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType;
import static com.aws.greengrass.testcommons.testutilities.SudoUtil.assumeCanSudoShell;
import static com.aws.greengrass.testcommons.testutilities.TestUtils.createCloseableLogListener;
import static com.aws.greengrass.testcommons.testutilities.TestUtils.createServiceStateChangeWaiter;
Expand All @@ -92,11 +109,25 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import static software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction.NOTIFY_COMPONENTS;
import static software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction.SKIP_NOTIFY_COMPONENTS;

@ExtendWith(GGExtension.class)
class DeploymentConfigMergingTest extends BaseITCase {
@Mock
private MqttClient mqttClient;
@Mock
private AwsIotMqttConnectionBuilder awsIotMqttConnectionBuilder;
@Mock
private MqttClientConnection mqttClientConnection;
@Mock
private GreengrassServiceClientFactory gscFactory;
@Mock
private GreengrassV2DataClient greengrassV2DataClient;

private Kernel kernel;
private DeploymentConfigMerger deploymentConfigMerger;
private static SocketOptions socketOptions;
Expand All @@ -111,6 +142,8 @@ static void initialize() {
void before() {
kernel = new Kernel();
NoOpPathOwnershipHandler.register(kernel);
kernel.getContext().put(MqttClient.class, mqttClient);
kernel.getContext().put(GreengrassServiceClientFactory.class, gscFactory);
deploymentConfigMerger = kernel.getContext().get(DeploymentConfigMerger.class);
}

Expand Down Expand Up @@ -371,14 +404,14 @@ void GIVEN_kernel_running_single_service_WHEN_merge_same_doc_happens_twice_THEN_
put(SERVICES_NAMESPACE_TOPIC, new HashMap<String, Object>() {{
put("main", new HashMap<String, Object>() {{
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC,
Arrays.asList("new_service", DEFAULT_NUCLEUS_COMPONENT_NAME));
new ArrayList<>(Arrays.asList("new_service", DEFAULT_NUCLEUS_COMPONENT_NAME)));
}});

put("new_service", new HashMap<String, Object>() {{
put(SERVICE_LIFECYCLE_NAMESPACE_TOPIC, new HashMap<String, Object>() {{
put(LIFECYCLE_RUN_NAMESPACE_TOPIC, "echo done");
}});
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, Arrays.asList("new_service2"));
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, new ArrayList<>(Arrays.asList("new_service2")));
}});

put("new_service2", new HashMap<String, Object>() {{
Expand Down Expand Up @@ -709,6 +742,144 @@ void GIVEN_kernel_running_service_WHEN_run_with_change_THEN_service_restarts() t
}
}
}

@Test
void GIVEN_kernel_running_with_some_config_WHEN_connectivity_validation_successful_THEN_config_is_updated()
throws Throwable {
// GIVEN
DeviceConfiguration deviceConfiguration = new DeviceConfiguration(kernel.getConfig(), kernel.getKernelCommandLine(), "ThingName", "xxxxxx-ats.iot.us-east-1.amazonaws.com",
"xxxxxx.credentials.iot.us-east-1.amazonaws.com", "privKeyFilePath", "certFilePath", "caFilePath",
"us-east-1", "roleAliasName");
kernel.getContext().put(DeviceConfiguration.class, deviceConfiguration);
ConfigPlatformResolver.initKernelWithMultiPlatformConfig(kernel, getClass().getResource("config.yaml"));

// Mock Fss trigger at kernel launch
when(mqttClient.publish(any(PublishRequest.class))).thenReturn(CompletableFuture.completedFuture(0));
kernel.launch();

// Read deployment config from yaml file
Map<String, Object> deploymentConfig = ConfigPlatformResolver
.resolvePlatformMap(getClass().getResource("connectivityValidationConfig.yaml"));
int expectedMqttPort = 8080; // mqtt.port
int expectedDataPlanePort = 443; // greengrassDataPlanePort

// Mock Successful Mqtt Validation
when(mqttClient.createMqttConnectionBuilder(any(), any(), eq(null))).thenReturn(awsIotMqttConnectionBuilder);
when(awsIotMqttConnectionBuilder.withClientId(any())).thenReturn(awsIotMqttConnectionBuilder);
when(awsIotMqttConnectionBuilder.build()).thenReturn(mqttClientConnection);
CompletableFuture<Boolean> mqttConnection = CompletableFuture.completedFuture(true);
when(mqttClientConnection.connect()).thenReturn(mqttConnection);

// Mock Successful Http Validation
Pair<SdkHttpClient, GreengrassV2DataClient> pair = new Pair<>(null, greengrassV2DataClient);
when(gscFactory.createClientFromConfig(any())).thenReturn(pair);
when(greengrassV2DataClient.listThingGroupsForCoreDevice((ListThingGroupsForCoreDeviceRequest) any()))
.thenReturn(null);

// WHEN
Topics t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS);
assertNotNull(t, "FSS Topics should not be null before merging");
deploymentConfigMerger.mergeInNewConfig(testDeployment(), deploymentConfig).get(60, TimeUnit.SECONDS);
t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS);
assertNotNull(t, "FSS Topics should not be null after merging");

// THEN
deviceConfiguration = kernel.getContext().get(DeviceConfiguration.class);
int actualMqttPort = Coerce.toInt(deviceConfiguration.getMQTTNamespace().find(MqttClient.MQTT_PORT_KEY));
assertEquals(expectedMqttPort, actualMqttPort);
int actualDataPlanePort = Coerce.toInt(deviceConfiguration.getGreengrassDataPlanePort());
assertEquals(expectedDataPlanePort, actualDataPlanePort);
}

@Test
void GIVEN_kernel_running_with_some_config_WHEN_mqtt_validation_fails_THEN_config_is_not_updated(ExtensionContext context)
throws Throwable {
ignoreExceptionOfType(context, MqttException.class);
// GIVEN
DeviceConfiguration deviceConfiguration = new DeviceConfiguration(kernel.getConfig(), kernel.getKernelCommandLine(), "ThingName", "xxxxxx-ats.iot.us-east-1.amazonaws.com",
"xxxxxx.credentials.iot.us-east-1.amazonaws.com", "privKeyFilePath", "certFilePath", "caFilePath",
"us-east-1", "roleAliasName");
kernel.getContext().put(DeviceConfiguration.class, deviceConfiguration);
ConfigPlatformResolver.initKernelWithMultiPlatformConfig(kernel, getClass().getResource("config.yaml"));

// Mock fss trigger at kernel launch
when(mqttClient.publish(any(PublishRequest.class))).thenReturn(CompletableFuture.completedFuture(0));
kernel.launch();

// Read deployment config from yaml file
Map<String, Object> deploymentConfig = ConfigPlatformResolver
.resolvePlatformMap(getClass().getResource("connectivityValidationConfig.yaml"));

// Mock Failed Mqtt Validation
when(mqttClient.createMqttConnectionBuilder(any(), any(), eq(null))).thenReturn(awsIotMqttConnectionBuilder);
when(awsIotMqttConnectionBuilder.withClientId(any())).thenReturn(awsIotMqttConnectionBuilder);
when(awsIotMqttConnectionBuilder.build()).thenReturn(mqttClientConnection);
when(mqttClientConnection.connect()).thenThrow(new MqttException("mocked failure"));

// WHEN
Topics t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS);
assertNotNull(t, "FSS Topics should not be null before merging");
deploymentConfigMerger.mergeInNewConfig(testDeployment(), deploymentConfig).get(60, TimeUnit.SECONDS);
t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS);
assertNotNull(t, "FSS Topics should not be null after merging");

// THEN
deviceConfiguration = kernel.getContext().get(DeviceConfiguration.class);
int actualMqttPort = Coerce.toInt(deviceConfiguration.getMQTTNamespace().find(MqttClient.MQTT_PORT_KEY));
assertEquals(0, actualMqttPort);
int actualDataPlanePort = Coerce.toInt(deviceConfiguration.getGreengrassDataPlanePort());
assertEquals(8443, actualDataPlanePort);
}

@Test
void GIVEN_kernel_running_with_some_config_WHEN_http_validation_fails_THEN_config_is_not_updated(ExtensionContext context)
throws Throwable {
ignoreExceptionOfType(context, GreengrassV2DataException.class);
// GIVEN
DeviceConfiguration deviceConfiguration = new DeviceConfiguration(kernel.getConfig(), kernel.getKernelCommandLine(), "ThingName", "xxxxxx-ats.iot.us-east-1.amazonaws.com",
"xxxxxx.credentials.iot.us-east-1.amazonaws.com", "privKeyFilePath", "certFilePath", "caFilePath",
"us-east-1", "roleAliasName");
kernel.getContext().put(DeviceConfiguration.class, deviceConfiguration);
ConfigPlatformResolver.initKernelWithMultiPlatformConfig(kernel, getClass().getResource("config.yaml"));

// Mock fss trigger at kernel launch
when(mqttClient.publish(any(PublishRequest.class))).thenReturn(CompletableFuture.completedFuture(0));
kernel.launch();

// Read deployment config from yaml file
Map<String, Object> deploymentConfig = ConfigPlatformResolver
.resolvePlatformMap(getClass().getResource("connectivityValidationConfig.yaml"));

// Mock Successful Mqtt Validation
when(mqttClient.createMqttConnectionBuilder(any(), any(), eq(null))).thenReturn(awsIotMqttConnectionBuilder);
when(awsIotMqttConnectionBuilder.withClientId(any())).thenReturn(awsIotMqttConnectionBuilder);
when(awsIotMqttConnectionBuilder.build()).thenReturn(mqttClientConnection);
CompletableFuture<Boolean> mqttConnection = new CompletableFuture<>();
mqttConnection.complete(true);
when(mqttClientConnection.connect()).thenReturn(mqttConnection);

// Mock Failed Http Validation
Pair<SdkHttpClient, GreengrassV2DataClient> pair = new Pair<>(null, greengrassV2DataClient);
when(gscFactory.createClientFromConfig(any())).thenReturn(pair);
AwsServiceException exception = GreengrassV2DataException.builder().message("mocked failure").build();
when(greengrassV2DataClient.listThingGroupsForCoreDevice((ListThingGroupsForCoreDeviceRequest) any()))
.thenThrow(exception);

// WHEN
Topics t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS);
assertNotNull(t, "FSS Topics should not be null before merging");
deploymentConfigMerger.mergeInNewConfig(testDeployment(), deploymentConfig).get(60, TimeUnit.SECONDS);
t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS);
assertNotNull(t, "FSS Topics should not be null after merging");

// THEN
deviceConfiguration = kernel.getContext().get(DeviceConfiguration.class);
int actualMqttPort = Coerce.toInt(deviceConfiguration.getMQTTNamespace().find(MqttClient.MQTT_PORT_KEY));
assertEquals(0, actualMqttPort);
int actualDataPlanePort = Coerce.toInt(deviceConfiguration.getGreengrassDataPlanePort());
assertEquals(8443, actualDataPlanePort);
}

private Deployment testDeployment() {
DeploymentDocument doc = DeploymentDocument.builder().timestamp(System.currentTimeMillis()).deploymentId("id")
.failureHandlingPolicy(FailureHandlingPolicy.DO_NOTHING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.services.greengrassv2.model.DeploymentConfigurationValidationPolicy;

import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -181,8 +182,8 @@ void GIVEN_hard_dependency_WHEN_dependency_goes_through_lifecycle_events_THEN_cu
Map<String, Object> configRemoveDep = new HashMap<String, Object>() {{
put(SERVICES_NAMESPACE_TOPIC, new HashMap<String, Object>() {{
put("main", new HashMap<String, Object>() {{
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, Arrays.asList(CustomerApp,
DEFAULT_NUCLEUS_COMPONENT_NAME));
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, new ArrayList<>(Arrays.asList(CustomerApp,
DEFAULT_NUCLEUS_COMPONENT_NAME)));
}});
put(CustomerApp, new HashMap<String, Object>() {{
putAll(kernel.findServiceTopic(CustomerApp).toPOJO());
Expand Down Expand Up @@ -294,8 +295,8 @@ void GIVEN_soft_dependency_WHEN_dependency_goes_through_lifecycle_events_THEN_cu
Map<String, Object> configRemoveDep = new HashMap<String, Object>() {{
put(SERVICES_NAMESPACE_TOPIC, new HashMap<String, Object>() {{
put("main", new HashMap<String, Object>() {{
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, Arrays.asList(CustomerApp,
DEFAULT_NUCLEUS_COMPONENT_NAME));
put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, new ArrayList<>(Arrays.asList(CustomerApp,
DEFAULT_NUCLEUS_COMPONENT_NAME)));
}});
put(CustomerApp, new HashMap<String, Object>() {{
putAll(kernel.findServiceTopic(CustomerApp).toPOJO());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.aws.greengrass.componentmanager.exceptions.PackageLoadingException;
import com.aws.greengrass.dependency.ComponentStatusCode;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.deployment.ConnectivityValidator;
import com.aws.greengrass.deployment.DeploymentQueue;
import com.aws.greengrass.deployment.DeploymentService;
import com.aws.greengrass.deployment.DeviceConfiguration;
Expand Down Expand Up @@ -126,6 +127,8 @@ class EventFleetStatusServiceTest extends BaseITCase {
private IotJobsClientWrapper mockIotJobsClientWrapper;
@Mock
private ThingGroupHelper thingGroupHelper;
@Mock
private ConnectivityValidator connectivityValidator;

private AtomicReference<List<FleetStatusDetails>> fleetStatusDetailsList;
private final CountDownLatch mainFinished = new CountDownLatch(1);
Expand Down Expand Up @@ -177,6 +180,7 @@ void setupKernel(ExtensionContext context) throws Exception {
EventFleetStatusServiceTest.class.getResource("onlyMain.yaml"));
kernel.getContext().put(MqttClient.class, mqttClient);
kernel.getContext().put(ThingGroupHelper.class, thingGroupHelper);
kernel.getContext().put(ConnectivityValidator.class, connectivityValidator);

// Mock out cloud communication
GreengrassServiceClientFactory mgscf = mock(GreengrassServiceClientFactory.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
services:
main:
dependencies:
- aws.greengrass.Nucleus
aws.greengrass.Nucleus:
componentType: NUCLEUS
configuration:
awsRegion: us-east-1
greengrassDataPlanePort: 443
iotCredEndpoint: xxxxxx.credentials.iot.us-east-1.amazonaws.com
iotDataEndpoint: xxxxxx-ats.iot.us-east-1.amazonaws.com
iotRoleAlias: roleAliasName
mqtt:
port: 8080
runWithDefault:
posixUser: nobody
windowsUser: integ-tester
Loading

0 comments on commit 6a50d3d

Please sign in to comment.