From 6a50d3d35d5e05510132bd9033839e770c942dbf Mon Sep 17 00:00:00 2001 From: Tianci Shen Date: Mon, 11 Mar 2024 14:39:50 -0700 Subject: [PATCH] feat: add nucleus connectivity validation --- .../DeploymentConfigMergingTest.java | 175 ++++++++++++- .../ServiceDependencyLifecycleTest.java | 9 +- .../status/EventFleetStatusServiceTest.java | 4 + .../connectivityValidationConfig.yaml | 17 ++ .../deployment/ConnectivityValidator.java | 240 ++++++++++++++++++ .../deployment/DeploymentConfigMerger.java | 24 ++ .../deployment/DeviceConfiguration.java | 10 +- .../errorcode/DeploymentErrorCode.java | 1 + .../greengrass/lifecyclemanager/Kernel.java | 4 + .../aws/greengrass/mqttclient/MqttClient.java | 117 +++++---- .../util/GreengrassServiceClientFactory.java | 32 ++- .../deployment/ConnectivityValidatorTest.java | 123 +++++++++ .../DeploymentConfigMergerTest.java | 35 ++- .../deployment/DeviceConfigurationTest.java | 2 +- .../LogManagerHelperTest.java | 10 +- .../greengrass/mqttclient/MqttClientTest.java | 2 +- .../tes/TokenExchangeServiceTest.java | 2 +- 17 files changed, 731 insertions(+), 76 deletions(-) create mode 100644 src/integrationtests/resources/com/aws/greengrass/integrationtests/deployment/connectivityValidationConfig.yaml create mode 100644 src/main/java/com/aws/greengrass/deployment/ConnectivityValidator.java create mode 100644 src/test/java/com/aws/greengrass/deployment/ConnectivityValidatorTest.java diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/deployment/DeploymentConfigMergingTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/deployment/DeploymentConfigMergingTest.java index b9f3a2522e..16e0112fb1 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/deployment/DeploymentConfigMergingTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/deployment/DeploymentConfigMergingTest.java @@ -9,6 +9,7 @@ import com.aws.greengrass.config.Topics; import com.aws.greengrass.dependency.State; import com.aws.greengrass.deployment.DeploymentConfigMerger; +import com.aws.greengrass.deployment.DeviceConfiguration; import com.aws.greengrass.deployment.model.ComponentUpdatePolicy; import com.aws.greengrass.deployment.model.Deployment; import com.aws.greengrass.deployment.model.DeploymentDocument; @@ -25,10 +26,15 @@ import com.aws.greengrass.logging.api.Logger; import com.aws.greengrass.logging.impl.GreengrassLogMessage; import com.aws.greengrass.logging.impl.LogManager; +import com.aws.greengrass.mqttclient.MqttClient; +import com.aws.greengrass.mqttclient.PublishRequest; import com.aws.greengrass.status.FleetStatusService; import com.aws.greengrass.testcommons.testutilities.GGExtension; import com.aws.greengrass.testcommons.testutilities.NoOpPathOwnershipHandler; import com.aws.greengrass.testcommons.testutilities.TestUtils; +import com.aws.greengrass.util.Coerce; +import com.aws.greengrass.util.GreengrassServiceClientFactory; +import com.aws.greengrass.util.Pair; import org.apache.commons.lang3.SystemUtils; import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterAll; @@ -37,16 +43,26 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.mockito.Mock; import org.slf4j.event.Level; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.ComponentUpdatePolicyEvents; import software.amazon.awssdk.aws.greengrass.model.DeferComponentUpdateRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToComponentUpdatesRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToComponentUpdatesResponse; +import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; +import software.amazon.awssdk.crt.mqtt.MqttException; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder; import software.amazon.awssdk.services.greengrassv2.model.DeploymentConfigurationValidationPolicy; +import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClient; +import software.amazon.awssdk.services.greengrassv2data.model.GreengrassV2DataException; +import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceRequest; import java.io.IOException; import java.time.Duration; @@ -76,6 +92,7 @@ import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICE_DEPENDENCIES_NAMESPACE_TOPIC; import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICE_LIFECYCLE_NAMESPACE_TOPIC; import static com.aws.greengrass.lifecyclemanager.GreengrassService.SETENV_CONFIG_NAMESPACE; +import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType; import static com.aws.greengrass.testcommons.testutilities.SudoUtil.assumeCanSudoShell; import static com.aws.greengrass.testcommons.testutilities.TestUtils.createCloseableLogListener; import static com.aws.greengrass.testcommons.testutilities.TestUtils.createServiceStateChangeWaiter; @@ -92,11 +109,25 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; import static software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction.NOTIFY_COMPONENTS; import static software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction.SKIP_NOTIFY_COMPONENTS; @ExtendWith(GGExtension.class) class DeploymentConfigMergingTest extends BaseITCase { + @Mock + private MqttClient mqttClient; + @Mock + private AwsIotMqttConnectionBuilder awsIotMqttConnectionBuilder; + @Mock + private MqttClientConnection mqttClientConnection; + @Mock + private GreengrassServiceClientFactory gscFactory; + @Mock + private GreengrassV2DataClient greengrassV2DataClient; + private Kernel kernel; private DeploymentConfigMerger deploymentConfigMerger; private static SocketOptions socketOptions; @@ -111,6 +142,8 @@ static void initialize() { void before() { kernel = new Kernel(); NoOpPathOwnershipHandler.register(kernel); + kernel.getContext().put(MqttClient.class, mqttClient); + kernel.getContext().put(GreengrassServiceClientFactory.class, gscFactory); deploymentConfigMerger = kernel.getContext().get(DeploymentConfigMerger.class); } @@ -371,14 +404,14 @@ void GIVEN_kernel_running_single_service_WHEN_merge_same_doc_happens_twice_THEN_ put(SERVICES_NAMESPACE_TOPIC, new HashMap() {{ put("main", new HashMap() {{ put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, - Arrays.asList("new_service", DEFAULT_NUCLEUS_COMPONENT_NAME)); + new ArrayList<>(Arrays.asList("new_service", DEFAULT_NUCLEUS_COMPONENT_NAME))); }}); put("new_service", new HashMap() {{ put(SERVICE_LIFECYCLE_NAMESPACE_TOPIC, new HashMap() {{ put(LIFECYCLE_RUN_NAMESPACE_TOPIC, "echo done"); }}); - put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, Arrays.asList("new_service2")); + put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, new ArrayList<>(Arrays.asList("new_service2"))); }}); put("new_service2", new HashMap() {{ @@ -709,6 +742,144 @@ void GIVEN_kernel_running_service_WHEN_run_with_change_THEN_service_restarts() t } } } + + @Test + void GIVEN_kernel_running_with_some_config_WHEN_connectivity_validation_successful_THEN_config_is_updated() + throws Throwable { + // GIVEN + DeviceConfiguration deviceConfiguration = new DeviceConfiguration(kernel.getConfig(), kernel.getKernelCommandLine(), "ThingName", "xxxxxx-ats.iot.us-east-1.amazonaws.com", + "xxxxxx.credentials.iot.us-east-1.amazonaws.com", "privKeyFilePath", "certFilePath", "caFilePath", + "us-east-1", "roleAliasName"); + kernel.getContext().put(DeviceConfiguration.class, deviceConfiguration); + ConfigPlatformResolver.initKernelWithMultiPlatformConfig(kernel, getClass().getResource("config.yaml")); + + // Mock Fss trigger at kernel launch + when(mqttClient.publish(any(PublishRequest.class))).thenReturn(CompletableFuture.completedFuture(0)); + kernel.launch(); + + // Read deployment config from yaml file + Map deploymentConfig = ConfigPlatformResolver + .resolvePlatformMap(getClass().getResource("connectivityValidationConfig.yaml")); + int expectedMqttPort = 8080; // mqtt.port + int expectedDataPlanePort = 443; // greengrassDataPlanePort + + // Mock Successful Mqtt Validation + when(mqttClient.createMqttConnectionBuilder(any(), any(), eq(null))).thenReturn(awsIotMqttConnectionBuilder); + when(awsIotMqttConnectionBuilder.withClientId(any())).thenReturn(awsIotMqttConnectionBuilder); + when(awsIotMqttConnectionBuilder.build()).thenReturn(mqttClientConnection); + CompletableFuture mqttConnection = CompletableFuture.completedFuture(true); + when(mqttClientConnection.connect()).thenReturn(mqttConnection); + + // Mock Successful Http Validation + Pair pair = new Pair<>(null, greengrassV2DataClient); + when(gscFactory.createClientFromConfig(any())).thenReturn(pair); + when(greengrassV2DataClient.listThingGroupsForCoreDevice((ListThingGroupsForCoreDeviceRequest) any())) + .thenReturn(null); + + // WHEN + Topics t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS); + assertNotNull(t, "FSS Topics should not be null before merging"); + deploymentConfigMerger.mergeInNewConfig(testDeployment(), deploymentConfig).get(60, TimeUnit.SECONDS); + t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS); + assertNotNull(t, "FSS Topics should not be null after merging"); + + // THEN + deviceConfiguration = kernel.getContext().get(DeviceConfiguration.class); + int actualMqttPort = Coerce.toInt(deviceConfiguration.getMQTTNamespace().find(MqttClient.MQTT_PORT_KEY)); + assertEquals(expectedMqttPort, actualMqttPort); + int actualDataPlanePort = Coerce.toInt(deviceConfiguration.getGreengrassDataPlanePort()); + assertEquals(expectedDataPlanePort, actualDataPlanePort); + } + + @Test + void GIVEN_kernel_running_with_some_config_WHEN_mqtt_validation_fails_THEN_config_is_not_updated(ExtensionContext context) + throws Throwable { + ignoreExceptionOfType(context, MqttException.class); + // GIVEN + DeviceConfiguration deviceConfiguration = new DeviceConfiguration(kernel.getConfig(), kernel.getKernelCommandLine(), "ThingName", "xxxxxx-ats.iot.us-east-1.amazonaws.com", + "xxxxxx.credentials.iot.us-east-1.amazonaws.com", "privKeyFilePath", "certFilePath", "caFilePath", + "us-east-1", "roleAliasName"); + kernel.getContext().put(DeviceConfiguration.class, deviceConfiguration); + ConfigPlatformResolver.initKernelWithMultiPlatformConfig(kernel, getClass().getResource("config.yaml")); + + // Mock fss trigger at kernel launch + when(mqttClient.publish(any(PublishRequest.class))).thenReturn(CompletableFuture.completedFuture(0)); + kernel.launch(); + + // Read deployment config from yaml file + Map deploymentConfig = ConfigPlatformResolver + .resolvePlatformMap(getClass().getResource("connectivityValidationConfig.yaml")); + + // Mock Failed Mqtt Validation + when(mqttClient.createMqttConnectionBuilder(any(), any(), eq(null))).thenReturn(awsIotMqttConnectionBuilder); + when(awsIotMqttConnectionBuilder.withClientId(any())).thenReturn(awsIotMqttConnectionBuilder); + when(awsIotMqttConnectionBuilder.build()).thenReturn(mqttClientConnection); + when(mqttClientConnection.connect()).thenThrow(new MqttException("mocked failure")); + + // WHEN + Topics t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS); + assertNotNull(t, "FSS Topics should not be null before merging"); + deploymentConfigMerger.mergeInNewConfig(testDeployment(), deploymentConfig).get(60, TimeUnit.SECONDS); + t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS); + assertNotNull(t, "FSS Topics should not be null after merging"); + + // THEN + deviceConfiguration = kernel.getContext().get(DeviceConfiguration.class); + int actualMqttPort = Coerce.toInt(deviceConfiguration.getMQTTNamespace().find(MqttClient.MQTT_PORT_KEY)); + assertEquals(0, actualMqttPort); + int actualDataPlanePort = Coerce.toInt(deviceConfiguration.getGreengrassDataPlanePort()); + assertEquals(8443, actualDataPlanePort); + } + + @Test + void GIVEN_kernel_running_with_some_config_WHEN_http_validation_fails_THEN_config_is_not_updated(ExtensionContext context) + throws Throwable { + ignoreExceptionOfType(context, GreengrassV2DataException.class); + // GIVEN + DeviceConfiguration deviceConfiguration = new DeviceConfiguration(kernel.getConfig(), kernel.getKernelCommandLine(), "ThingName", "xxxxxx-ats.iot.us-east-1.amazonaws.com", + "xxxxxx.credentials.iot.us-east-1.amazonaws.com", "privKeyFilePath", "certFilePath", "caFilePath", + "us-east-1", "roleAliasName"); + kernel.getContext().put(DeviceConfiguration.class, deviceConfiguration); + ConfigPlatformResolver.initKernelWithMultiPlatformConfig(kernel, getClass().getResource("config.yaml")); + + // Mock fss trigger at kernel launch + when(mqttClient.publish(any(PublishRequest.class))).thenReturn(CompletableFuture.completedFuture(0)); + kernel.launch(); + + // Read deployment config from yaml file + Map deploymentConfig = ConfigPlatformResolver + .resolvePlatformMap(getClass().getResource("connectivityValidationConfig.yaml")); + + // Mock Successful Mqtt Validation + when(mqttClient.createMqttConnectionBuilder(any(), any(), eq(null))).thenReturn(awsIotMqttConnectionBuilder); + when(awsIotMqttConnectionBuilder.withClientId(any())).thenReturn(awsIotMqttConnectionBuilder); + when(awsIotMqttConnectionBuilder.build()).thenReturn(mqttClientConnection); + CompletableFuture mqttConnection = new CompletableFuture<>(); + mqttConnection.complete(true); + when(mqttClientConnection.connect()).thenReturn(mqttConnection); + + // Mock Failed Http Validation + Pair pair = new Pair<>(null, greengrassV2DataClient); + when(gscFactory.createClientFromConfig(any())).thenReturn(pair); + AwsServiceException exception = GreengrassV2DataException.builder().message("mocked failure").build(); + when(greengrassV2DataClient.listThingGroupsForCoreDevice((ListThingGroupsForCoreDeviceRequest) any())) + .thenThrow(exception); + + // WHEN + Topics t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS); + assertNotNull(t, "FSS Topics should not be null before merging"); + deploymentConfigMerger.mergeInNewConfig(testDeployment(), deploymentConfig).get(60, TimeUnit.SECONDS); + t = kernel.findServiceTopic(FleetStatusService.FLEET_STATUS_SERVICE_TOPICS); + assertNotNull(t, "FSS Topics should not be null after merging"); + + // THEN + deviceConfiguration = kernel.getContext().get(DeviceConfiguration.class); + int actualMqttPort = Coerce.toInt(deviceConfiguration.getMQTTNamespace().find(MqttClient.MQTT_PORT_KEY)); + assertEquals(0, actualMqttPort); + int actualDataPlanePort = Coerce.toInt(deviceConfiguration.getGreengrassDataPlanePort()); + assertEquals(8443, actualDataPlanePort); + } + private Deployment testDeployment() { DeploymentDocument doc = DeploymentDocument.builder().timestamp(System.currentTimeMillis()).deploymentId("id") .failureHandlingPolicy(FailureHandlingPolicy.DO_NOTHING) diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/lifecyclemanager/ServiceDependencyLifecycleTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/lifecyclemanager/ServiceDependencyLifecycleTest.java index 23159914e3..22f0d96099 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/lifecyclemanager/ServiceDependencyLifecycleTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/lifecyclemanager/ServiceDependencyLifecycleTest.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.services.greengrassv2.model.DeploymentConfigurationValidationPolicy; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -181,8 +182,8 @@ void GIVEN_hard_dependency_WHEN_dependency_goes_through_lifecycle_events_THEN_cu Map configRemoveDep = new HashMap() {{ put(SERVICES_NAMESPACE_TOPIC, new HashMap() {{ put("main", new HashMap() {{ - put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, Arrays.asList(CustomerApp, - DEFAULT_NUCLEUS_COMPONENT_NAME)); + put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, new ArrayList<>(Arrays.asList(CustomerApp, + DEFAULT_NUCLEUS_COMPONENT_NAME))); }}); put(CustomerApp, new HashMap() {{ putAll(kernel.findServiceTopic(CustomerApp).toPOJO()); @@ -294,8 +295,8 @@ void GIVEN_soft_dependency_WHEN_dependency_goes_through_lifecycle_events_THEN_cu Map configRemoveDep = new HashMap() {{ put(SERVICES_NAMESPACE_TOPIC, new HashMap() {{ put("main", new HashMap() {{ - put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, Arrays.asList(CustomerApp, - DEFAULT_NUCLEUS_COMPONENT_NAME)); + put(SERVICE_DEPENDENCIES_NAMESPACE_TOPIC, new ArrayList<>(Arrays.asList(CustomerApp, + DEFAULT_NUCLEUS_COMPONENT_NAME))); }}); put(CustomerApp, new HashMap() {{ putAll(kernel.findServiceTopic(CustomerApp).toPOJO()); diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/status/EventFleetStatusServiceTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/status/EventFleetStatusServiceTest.java index f1c547f0a6..eb8e87b8b4 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/status/EventFleetStatusServiceTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/status/EventFleetStatusServiceTest.java @@ -10,6 +10,7 @@ import com.aws.greengrass.componentmanager.exceptions.PackageLoadingException; import com.aws.greengrass.dependency.ComponentStatusCode; import com.aws.greengrass.dependency.State; +import com.aws.greengrass.deployment.ConnectivityValidator; import com.aws.greengrass.deployment.DeploymentQueue; import com.aws.greengrass.deployment.DeploymentService; import com.aws.greengrass.deployment.DeviceConfiguration; @@ -126,6 +127,8 @@ class EventFleetStatusServiceTest extends BaseITCase { private IotJobsClientWrapper mockIotJobsClientWrapper; @Mock private ThingGroupHelper thingGroupHelper; + @Mock + private ConnectivityValidator connectivityValidator; private AtomicReference> fleetStatusDetailsList; private final CountDownLatch mainFinished = new CountDownLatch(1); @@ -177,6 +180,7 @@ void setupKernel(ExtensionContext context) throws Exception { EventFleetStatusServiceTest.class.getResource("onlyMain.yaml")); kernel.getContext().put(MqttClient.class, mqttClient); kernel.getContext().put(ThingGroupHelper.class, thingGroupHelper); + kernel.getContext().put(ConnectivityValidator.class, connectivityValidator); // Mock out cloud communication GreengrassServiceClientFactory mgscf = mock(GreengrassServiceClientFactory.class); diff --git a/src/integrationtests/resources/com/aws/greengrass/integrationtests/deployment/connectivityValidationConfig.yaml b/src/integrationtests/resources/com/aws/greengrass/integrationtests/deployment/connectivityValidationConfig.yaml new file mode 100644 index 0000000000..9659b02b5c --- /dev/null +++ b/src/integrationtests/resources/com/aws/greengrass/integrationtests/deployment/connectivityValidationConfig.yaml @@ -0,0 +1,17 @@ +services: + main: + dependencies: + - aws.greengrass.Nucleus + aws.greengrass.Nucleus: + componentType: NUCLEUS + configuration: + awsRegion: us-east-1 + greengrassDataPlanePort: 443 + iotCredEndpoint: xxxxxx.credentials.iot.us-east-1.amazonaws.com + iotDataEndpoint: xxxxxx-ats.iot.us-east-1.amazonaws.com + iotRoleAlias: roleAliasName + mqtt: + port: 8080 + runWithDefault: + posixUser: nobody + windowsUser: integ-tester diff --git a/src/main/java/com/aws/greengrass/deployment/ConnectivityValidator.java b/src/main/java/com/aws/greengrass/deployment/ConnectivityValidator.java new file mode 100644 index 0000000000..f12feb6c34 --- /dev/null +++ b/src/main/java/com/aws/greengrass/deployment/ConnectivityValidator.java @@ -0,0 +1,240 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.deployment; + +import com.aws.greengrass.config.Configuration; +import com.aws.greengrass.config.Topic; +import com.aws.greengrass.config.Topics; +import com.aws.greengrass.dependency.Context; +import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode; +import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException; +import com.aws.greengrass.deployment.model.DeploymentResult; +import com.aws.greengrass.lifecyclemanager.Kernel; +import com.aws.greengrass.logging.api.Logger; +import com.aws.greengrass.logging.impl.LogManager; +import com.aws.greengrass.mqttclient.MqttClient; +import com.aws.greengrass.security.SecurityService; +import com.aws.greengrass.util.Coerce; +import com.aws.greengrass.util.GreengrassServiceClientFactory; +import com.aws.greengrass.util.Pair; +import lombok.AccessLevel; +import lombok.Setter; +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; +import software.amazon.awssdk.crt.mqtt.MqttException; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder; +import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClient; +import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceRequest; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class ConnectivityValidator { + private static final Logger logger = LogManager.getLogger(ConnectivityValidator.class); + public static final String clientIdSuffix = "#validation"; + private static final String withNewConfigMessageSuffix = " with the new configuration"; + private final MqttClient mqttClient; + private final GreengrassServiceClientFactory factory; + private final SecurityService securityService; + @Setter(AccessLevel.PACKAGE) // for unit tests + private DeviceConfiguration deploymentConfiguration; + + /** + * Constructor. + * + * @param kernel kernel to get current config and context from + * @param context temporary context used for deployment configuration + * @param mqttClient Used to create AWS Mqtt Connection Client + * @param factory Used to create Greengrass data client + * @param securityService Used for Mqtt connection builder + * @param deploymentConfig Map of the configs to be merged in and validated against + * @param timestamp time stamp of the deployment + */ + public ConnectivityValidator(Kernel kernel, Context context, MqttClient mqttClient, + GreengrassServiceClientFactory factory, SecurityService securityService, + Map deploymentConfig, long timestamp) { + this.mqttClient = mqttClient; + this.factory = factory; + this.securityService = securityService; + + Configuration config = new Configuration(context); + // Copy the current device configuration Map to preserve time stamps + config.copyFrom(kernel.getConfig()); + // Attempt to merge the deployment configs using the deployment time stamp + config.mergeMap(timestamp, deploymentConfig); + try { + config.waitConfigUpdateComplete(); + } catch (InterruptedException e) { + logger.atInfo().log("Interrupted while waiting for deployment config update to complete"); + Thread.currentThread().interrupt(); + } + deploymentConfiguration = new DeviceConfiguration(config, kernel.getKernelCommandLine()); + /* + * Need to set security service due to plugin dependency workaround + * after removing Kernel from DeviceConfiguration + */ + deploymentConfiguration.setSecurityService(securityService); + } + + private boolean mqttClientNeedsValidation(DeviceConfiguration currentDeviceConfiguration) { + return networkPoxyHasChanged(currentDeviceConfiguration) || awsRegionHasChanged(currentDeviceConfiguration) + || mqttHasChanged(currentDeviceConfiguration) || iotDataEndpointHasChanged(currentDeviceConfiguration); + } + + private boolean httpClientNeedsValidation(DeviceConfiguration currentDeviceConfiguration) { + return networkPoxyHasChanged(currentDeviceConfiguration) || awsRegionHasChanged(currentDeviceConfiguration) + || greengrassDataPlaneEndpointHasChanged(currentDeviceConfiguration) + || greengrassDataPlanePortHasChanged(currentDeviceConfiguration); + } + + private boolean networkPoxyHasChanged(DeviceConfiguration currentdeviceConfiguration) { + Topics currentNetworkProxy = currentdeviceConfiguration.getNetworkProxyNamespace(); + Topics newNetworkProxy = deploymentConfiguration.getNetworkProxyNamespace(); + return !Topics.compareChildren(currentNetworkProxy, newNetworkProxy); + } + + private boolean awsRegionHasChanged(DeviceConfiguration currentdeviceConfiguration) { + // Defaults to empty string topic + Topic currentAwsRegion = currentdeviceConfiguration.getAWSRegion(); + Topic newAwsRegion = deploymentConfiguration.getAWSRegion(); + return !Topic.compareValue(currentAwsRegion, newAwsRegion); + } + + private boolean mqttHasChanged(DeviceConfiguration currentdeviceConfiguration) { + Topics currentMqtt = currentdeviceConfiguration.getMQTTNamespace(); + Topics newMqtt = deploymentConfiguration.getMQTTNamespace(); + return !Topics.compareChildren(currentMqtt, newMqtt); + } + + private boolean iotDataEndpointHasChanged(DeviceConfiguration currentdeviceConfiguration) { + // Defaults to empty string topic + Topic currentIotDataEndpoint = currentdeviceConfiguration.getIotDataEndpoint(); + Topic newIotDataEndpoint = deploymentConfiguration.getIotDataEndpoint(); + return !Topic.compareValue(currentIotDataEndpoint, newIotDataEndpoint); + } + + private boolean greengrassDataPlaneEndpointHasChanged(DeviceConfiguration currentdeviceConfiguration) { + // Defaults to empty string topic + Topic currentGreengrassDataPlaneEndpoint = currentdeviceConfiguration.getGGDataEndpoint(); + Topic newGreengrassDataPlaneEndpoint = deploymentConfiguration.getGGDataEndpoint(); + return !Topic.compareValue(currentGreengrassDataPlaneEndpoint, newGreengrassDataPlaneEndpoint); + } + + private boolean greengrassDataPlanePortHasChanged(DeviceConfiguration currentdeviceConfiguration) { + Topic currentGreengrassDataPlanePort = currentdeviceConfiguration.getGreengrassDataPlanePort(); + Topic newGreengrassDataPlanePort = deploymentConfiguration.getGreengrassDataPlanePort(); + return !Topic.compareValue(currentGreengrassDataPlanePort, newGreengrassDataPlanePort); + } + + /** + * Creates an MQTT client and checks if the device can connect to AWS. + * + * @param totallyCompleteFuture Future will be updated if validation fails + */ + public boolean mqttClientCanConnect(CompletableFuture totallyCompleteFuture) { + logger.atDebug().log("Checking MQTT client can connect"); + + MqttClientConnection connection = null; + String message; + try (AwsIotMqttConnectionBuilder builder = mqttClient.createMqttConnectionBuilder(deploymentConfiguration, + securityService, null)) { + String clientId = Coerce.toString(deploymentConfiguration.getThingName()) + clientIdSuffix; + connection = builder.withClientId(clientId).build(); + int operationTimeoutMillis = MqttClient.getMqttOperationTimeoutMillis(deploymentConfiguration); + connection.connect().get(operationTimeoutMillis, TimeUnit.MILLISECONDS); + return true; + } catch (MqttException e) { + message = "Mqtt client failed to connect: " + e.getMessage(); + logger.atError().cause(e).log(message + withNewConfigMessageSuffix); + } catch (TimeoutException e) { + message = "Mqtt client validation timed out"; // exception has no message + logger.atError().cause(e).log(message + withNewConfigMessageSuffix); + } catch (ExecutionException e) { + message = "Mqtt client validation completed exceptionally: " + e.getMessage(); + logger.atError().cause(e).log(message + withNewConfigMessageSuffix); + } catch (InterruptedException e) { + message = "Mqtt client connection was interrupted: " + e.getMessage(); + logger.atInfo().cause(e).log(message); + Thread.currentThread().interrupt(); + } finally { + if (connection != null) { + connection.disconnect(); + connection.close(); + } + } + + totallyCompleteFuture + .complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, + new ComponentConfigurationValidationException(message, + DeploymentErrorCode.NUCLEUS_CONNECTIVITY_CONFIG_NOT_VALID))); + return false; + } + + /** + * Creates an HTTP client and checks if the device can connect to AWS. + * + * @param totallyCompleteFuture Future will be updated if validation fails + */ + @SuppressWarnings("PMD.AvoidCatchingGenericException") + public boolean httpClientCanConnect(CompletableFuture totallyCompleteFuture) { + logger.atDebug().log("Checking HTTP client can connect"); + + String message; + Pair pair = factory.createClientFromConfig(deploymentConfiguration); + try (SdkHttpClient httpClient = pair.getLeft(); + GreengrassV2DataClient greengrassV2DataClient = pair.getRight()) { + ListThingGroupsForCoreDeviceRequest request = ListThingGroupsForCoreDeviceRequest.builder() + .coreDeviceThingName(Coerce.toString(deploymentConfiguration.getThingName())).build(); + greengrassV2DataClient.listThingGroupsForCoreDevice(request); + return true; + } catch (Exception e) { + StringBuilder sb = new StringBuilder("HTTP client validation failed due to: "); + if (e.getMessage() == null || e.getMessage().isEmpty()) { + sb.append(e.getClass().getSimpleName()); + } else { + sb.append(e.getMessage()); + } + message = sb.toString(); + logger.atError().cause(e).log(message + withNewConfigMessageSuffix); + } + + totallyCompleteFuture + .complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, + new ComponentConfigurationValidationException(message, + DeploymentErrorCode.NUCLEUS_CONNECTIVITY_CONFIG_NOT_VALID))); + return false; + } + + /** + * Perform connectivity validation if configs have changed meaningfully. + * + * @param totallyCompleteFuture Future will be updated if validation fails + * @param currentDeviceConfiguration current configs the device is using + */ + @SuppressWarnings("PMD.SimplifyBooleanReturns") + public boolean validate(CompletableFuture totallyCompleteFuture, + DeviceConfiguration currentDeviceConfiguration) { + boolean validationEnabled = deploymentConfiguration.isConnectivityValidationEnabled(); + boolean configuredToTalkToCloud = deploymentConfiguration.isDeviceConfiguredToTalkToCloud(); + if (!validationEnabled || !configuredToTalkToCloud) { + logger.atDebug().log("Skipping connectivity validation"); + return true; + } + boolean needsValidation; + needsValidation = mqttClientNeedsValidation(currentDeviceConfiguration); + if (needsValidation && !mqttClientCanConnect(totallyCompleteFuture)) { + return false; + } + needsValidation = httpClientNeedsValidation(currentDeviceConfiguration); + if (needsValidation && !httpClientCanConnect(totallyCompleteFuture)) { + return false; + } + return true; + } +} diff --git a/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java b/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java index 961591dece..52ad957842 100644 --- a/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java +++ b/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java @@ -7,6 +7,7 @@ import com.aws.greengrass.config.Topics; +import com.aws.greengrass.dependency.Context; import com.aws.greengrass.dependency.Context.Value; import com.aws.greengrass.dependency.State; import com.aws.greengrass.deployment.activator.DeploymentActivator; @@ -27,11 +28,15 @@ import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException; import com.aws.greengrass.logging.api.Logger; import com.aws.greengrass.logging.impl.LogManager; +import com.aws.greengrass.mqttclient.MqttClient; +import com.aws.greengrass.security.SecurityService; import com.aws.greengrass.util.Coerce; +import com.aws.greengrass.util.GreengrassServiceClientFactory; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; import software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction; +import software.amazon.awssdk.services.greengrassv2.model.DeploymentConfigurationValidationPolicy; import java.util.Collection; import java.util.HashMap; @@ -66,6 +71,9 @@ public class DeploymentConfigMerger { private Kernel kernel; private DeviceConfiguration deviceConfiguration; private DynamicComponentConfigurationValidator validator; + private MqttClient mqttClient; + private GreengrassServiceClientFactory factory; + private SecurityService securityService; /** * Merge in new configuration values and new services. @@ -112,6 +120,7 @@ public Future mergeInNewConfig(Deployment deployment, return totallyCompleteFuture; } + @SuppressWarnings({"PMD.AvoidCatchingGenericException", "PMD.AvoidCatchingThrowable"}) private void updateActionForDeployment(Map newConfig, Deployment deployment, DeploymentActivator activator, CompletableFuture totallyCompleteFuture) { @@ -147,6 +156,21 @@ private void updateActionForDeployment(Map newConfig, Deployment return; } + // As long as the timeout is not 0, we will try to run the connectivity validation + DeploymentConfigurationValidationPolicy policy = deployment.getDeploymentDocumentObj() + .getConfigurationValidationPolicy(); + if (policy != null && Coerce.toInt(policy.timeoutInSeconds()) != 0) { + try (Context context = new Context()) { + ConnectivityValidator connectivityValidator = new ConnectivityValidator(kernel, context, mqttClient, + factory, securityService, newConfig, deployment.getDeploymentDocumentObj().getTimestamp()); + if (!connectivityValidator.validate(totallyCompleteFuture, deviceConfiguration)) { + return; + } + } catch (Throwable e) { + logger.atInfo().cause(e).log("Unexpected exception during connectivity validation"); + } + } + logger.atInfo(MERGE_CONFIG_EVENT_KEY).kv("deployment", deploymentId) .log("Applying deployment changes, deployment cannot be cancelled now"); activator.activate(newConfig, deployment, totallyCompleteFuture); diff --git a/src/main/java/com/aws/greengrass/deployment/DeviceConfiguration.java b/src/main/java/com/aws/greengrass/deployment/DeviceConfiguration.java index 74a42b6748..10e09458ab 100644 --- a/src/main/java/com/aws/greengrass/deployment/DeviceConfiguration.java +++ b/src/main/java/com/aws/greengrass/deployment/DeviceConfiguration.java @@ -114,6 +114,7 @@ public class DeviceConfiguration { public static final long DEPLOYMENT_POLLING_FREQUENCY_DEFAULT_SECONDS = 15L; public static final String DEVICE_PARAM_GG_DATA_PLANE_PORT = "greengrassDataPlanePort"; private static final int GG_DATA_PLANE_PORT_DEFAULT = 8443; + private static final String CONNECTIVITY_VALIDATION = "validateConnectivityDuringDeployment"; private static final String DEVICE_PARAM_ENV_STAGE = "envStage"; private static final String DEFAULT_ENV_STAGE = "prod"; @@ -153,7 +154,6 @@ public DeviceConfiguration(Configuration config, KernelCommandLine kernelCommand this.kernelCommandLine = kernelCommandLine; deTildeValidator = getDeTildeValidator(); regionValidator = getRegionValidator(); - handleLoggingConfig(); getComponentStoreMaxSizeBytes().dflt(COMPONENT_STORE_MAX_SIZE_DEFAULT_BYTES); getDeploymentPollingFrequencySeconds().dflt(DEPLOYMENT_POLLING_FREQUENCY_DEFAULT_SECONDS); if (System.getProperty(S3_ENDPOINT_PROP_NAME) != null @@ -276,7 +276,7 @@ private void initializeNucleusComponentConfig(String nucleusComponentName) { /** * Handles subscribing and reconfiguring logger based on the correct topic. */ - private void handleLoggingConfig() { + public void handleLoggingConfig() { loggingTopics = getLoggingConfigurationTopics(); loggingTopics.subscribe(this::handleLoggingConfigurationChanges); } @@ -601,6 +601,10 @@ public boolean isDeviceConfiguredToTalkToCloud() { } } + public boolean isConnectivityValidationEnabled() { + return Coerce.toBoolean(getTopic(CONNECTIVITY_VALIDATION).dflt(true)); + } + /** * Reports if device provisioning values have changed. * @@ -663,7 +667,7 @@ public static String getVersionFromBuildRecipeFile() { .resolve(NUCLEUS_RECIPE_FILENAME).toFile(), com.amazon.aws.iot.greengrass.component.common.ComponentRecipe.class); if (recipe != null) { - return recipe.getComponentVersion().toString(); + return recipe.getComponentVersion().toString(); } } catch (IOException | URISyntaxException e) { logger.atError().log("Unable to determine Greengrass version", e); diff --git a/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java b/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java index cbedeb0211..a5e9b009e5 100644 --- a/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java +++ b/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java @@ -87,6 +87,7 @@ public enum DeploymentErrorCode { UNSUPPORTED_REGION(DeploymentErrorType.REQUEST_ERROR), IOT_CRED_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR), IOT_DATA_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR), + NUCLEUS_CONNECTIVITY_CONFIG_NOT_VALID(DeploymentErrorType.REQUEST_ERROR), /* Docker issues */ DOCKER_ERROR(DeploymentErrorType.DEPENDENCY_ERROR), diff --git a/src/main/java/com/aws/greengrass/lifecyclemanager/Kernel.java b/src/main/java/com/aws/greengrass/lifecyclemanager/Kernel.java index bbf6f2bfec..4d851dde40 100644 --- a/src/main/java/com/aws/greengrass/lifecyclemanager/Kernel.java +++ b/src/main/java/com/aws/greengrass/lifecyclemanager/Kernel.java @@ -736,6 +736,10 @@ public Kernel parseArgs(String... args) { // Create DeviceConfiguration DeviceConfiguration deviceConfiguration = getContext().get(DeviceConfiguration.class); + // Moved to from DeviceConfiguration constructor + // ConnectivityValidator creates a new DeviceConfiguration from deployment config + // We do not want that config to modify logging + deviceConfiguration.handleLoggingConfig(); SecurityService securityService = getContext().get(SecurityService.class); // Needs to be set due to ShadowManager plugin dependency deviceConfiguration.setSecurityService(securityService); diff --git a/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java b/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java index 389d61ddef..6b2739ca50 100644 --- a/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java +++ b/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java @@ -211,55 +211,11 @@ public void onConnectionResumed(boolean sessionPresent) { * @param kernel kernel instance */ @Inject - @SuppressWarnings("PMD.PreserveStackTrace") public MqttClient(DeviceConfiguration deviceConfiguration, ScheduledExecutorService ses, ExecutorService executorService, SecurityService securityService, Kernel kernel) { this(deviceConfiguration, null, ses, executorService, kernel); - - this.builderProvider = (clientBootstrap) -> { - AwsIotMqttConnectionBuilder builder; - try { - builder = securityService.getDeviceIdentityMqttConnectionBuilder(); - } catch (MqttConnectionProviderException e) { - throw new MqttException(e.getMessage()); - } - - int pingTimeoutMs = Coerce.toInt( - mqttTopics.findOrDefault(DEFAULT_MQTT_PING_TIMEOUT, MQTT_PING_TIMEOUT_KEY)); - int keepAliveMs = Coerce.toInt( - mqttTopics.findOrDefault(DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT, MQTT_KEEP_ALIVE_TIMEOUT_KEY)); - if (keepAliveMs != 0 && keepAliveMs <= pingTimeoutMs) { - throw new MqttException(String.format("%s must be greater than %s", - MQTT_KEEP_ALIVE_TIMEOUT_KEY, MQTT_PING_TIMEOUT_KEY)); - } - - String endpoint = Coerce.toString(deviceConfiguration.getIotDataEndpoint()); - builder.withCertificateAuthorityFromPath(null, Coerce.toString(deviceConfiguration.getRootCAFilePath())) - .withEndpoint(endpoint) - .withPort((short) Coerce.toInt(mqttTopics.findOrDefault(DEFAULT_MQTT_PORT, MQTT_PORT_KEY))) - .withCleanSession(false).withBootstrap(clientBootstrap) - .withKeepAliveMs(keepAliveMs) - .withProtocolOperationTimeoutMs(getMqttOperationTimeoutMillis()) - .withPingTimeoutMs(pingTimeoutMs) - .withSocketOptions(new SocketOptions()).withTimeoutMs(Coerce.toInt( - mqttTopics.findOrDefault(DEFAULT_MQTT_SOCKET_TIMEOUT, MQTT_SOCKET_TIMEOUT_KEY))); - try (LockScope ls = LockScope.lock(httpProxyLock)) { - HttpProxyOptions httpProxyOptions = - ProxyUtils.getHttpProxyOptions(deviceConfiguration, proxyTlsContext); - if (httpProxyOptions != null) { - String noProxy = Coerce.toString(deviceConfiguration.getNoProxyAddresses()); - boolean useProxy = true; - // Only use the proxy when the endpoint we're connecting to is not in the NoProxyAddress list - if (Utils.isNotEmpty(noProxy) && Utils.isNotEmpty(endpoint)) { - useProxy = Arrays.stream(noProxy.split(",")).noneMatch(endpoint::matches); - } - if (useProxy) { - builder.withHttpProxyOptions(httpProxyOptions); - } - } - } - return builder; - }; + this.builderProvider = (clientBootstrap) -> createMqttConnectionBuilder(deviceConfiguration, securityService, + clientBootstrap); } protected MqttClient(DeviceConfiguration deviceConfiguration, @@ -427,7 +383,7 @@ private void validateAndSetMqttPublishConfiguration() { } // if maxPublishRetryCount = -1, publish request would be retried with unlimited times. - maxPublishRetryCount = Coerce.toInt(mqttTopics.findOrDefault(DEFAULT_MQTT_MAX_OF_PUBLISH_RETRY_COUNT, + maxPublishRetryCount = Coerce.toInt(mqttTopics.findOrDefault(DEFAULT_MQTT_MAX_OF_PUBLISH_RETRY_COUNT, MQTT_MAX_OF_PUBLISH_RETRY_COUNT_KEY)); } @@ -979,6 +935,68 @@ protected int getNextClientIdNumber() { return 0; } + /** + * Create AwsIotMqttConnectionBuilder to generate a mqtt connection client. + * + * @param deviceConfiguration used to get the values needed for mqtt connection builder + * @param securityService used to get mqtt connection builder + * @param clientBootstrap client bootstrap values for mqtt connection builder + */ + @SuppressWarnings("PMD.PreserveStackTrace") + public AwsIotMqttConnectionBuilder createMqttConnectionBuilder(DeviceConfiguration deviceConfiguration, + SecurityService securityService, + ClientBootstrap clientBootstrap) { + AwsIotMqttConnectionBuilder builder; + try { + // this will get the security service for the existing configuration + builder = securityService.getDeviceIdentityMqttConnectionBuilder(); + } catch (MqttConnectionProviderException e) { + throw new MqttException(e.getMessage()); + } + + Topics mqttTopics = deviceConfiguration.getMQTTNamespace(); + int pingTimeoutMs = Coerce.toInt( + mqttTopics.findOrDefault(DEFAULT_MQTT_PING_TIMEOUT, MQTT_PING_TIMEOUT_KEY)); + int keepAliveMs = Coerce.toInt( + mqttTopics.findOrDefault(DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT, MQTT_KEEP_ALIVE_TIMEOUT_KEY)); + if (keepAliveMs != 0 && keepAliveMs <= pingTimeoutMs) { + throw new MqttException(String.format("%s must be greater than %s", + MQTT_KEEP_ALIVE_TIMEOUT_KEY, MQTT_PING_TIMEOUT_KEY)); + } + + String endpoint = Coerce.toString(deviceConfiguration.getIotDataEndpoint()); + builder.withCertificateAuthorityFromPath(null, Coerce.toString(deviceConfiguration.getRootCAFilePath())) + .withEndpoint(endpoint) + .withPort((short) Coerce.toInt(mqttTopics.findOrDefault(DEFAULT_MQTT_PORT, MQTT_PORT_KEY))) + .withCleanSession(false) + .withKeepAliveMs(keepAliveMs) + .withProtocolOperationTimeoutMs(getMqttOperationTimeoutMillis()) + .withPingTimeoutMs(pingTimeoutMs) + .withSocketOptions(new SocketOptions()).withTimeoutMs(Coerce.toInt( + mqttTopics.findOrDefault(DEFAULT_MQTT_SOCKET_TIMEOUT, MQTT_SOCKET_TIMEOUT_KEY))); + + if (clientBootstrap != null) { + builder.withBootstrap(clientBootstrap); + } + + try (LockScope ls = LockScope.lock(httpProxyLock)) { + HttpProxyOptions httpProxyOptions = + ProxyUtils.getHttpProxyOptions(deviceConfiguration, proxyTlsContext); + if (httpProxyOptions != null) { + String noProxy = Coerce.toString(deviceConfiguration.getNoProxyAddresses()); + boolean useProxy = true; + // Only use the proxy when the endpoint we're connecting to is not in the NoProxyAddress list + if (Utils.isNotEmpty(noProxy) && Utils.isNotEmpty(endpoint)) { + useProxy = Arrays.stream(noProxy.split(",")).noneMatch(endpoint::matches); + } + if (useProxy) { + builder.withHttpProxyOptions(httpProxyOptions); + } + } + } + return builder; + } + @SuppressWarnings({"PMD.AvoidCatchingGenericException", "PMD.AvoidRethrowingException"}) protected IndividualMqttClient getNewMqttClient() { int clientIdNum = getNextClientIdNumber(); @@ -1047,6 +1065,11 @@ public int getMqttOperationTimeoutMillis() { return Coerce.toInt(mqttTopics.findOrDefault(DEFAULT_MQTT_OPERATION_TIMEOUT, MQTT_OPERATION_TIMEOUT_KEY)); } + public static int getMqttOperationTimeoutMillis(DeviceConfiguration deviceConfiguration) { + Topics mqttTopics = deviceConfiguration.getMQTTNamespace(); + return Coerce.toInt(mqttTopics.findOrDefault(DEFAULT_MQTT_OPERATION_TIMEOUT, MQTT_OPERATION_TIMEOUT_KEY)); + } + private String getMqttVersion() { return Coerce.toString(mqttTopics.findOrDefault(DEFAULT_MQTT_VERSION, MQTT_VERSION_KEY)); } diff --git a/src/main/java/com/aws/greengrass/util/GreengrassServiceClientFactory.java b/src/main/java/com/aws/greengrass/util/GreengrassServiceClientFactory.java index c309fbfc0b..288969c63c 100644 --- a/src/main/java/com/aws/greengrass/util/GreengrassServiceClientFactory.java +++ b/src/main/java/com/aws/greengrass/util/GreengrassServiceClientFactory.java @@ -171,23 +171,20 @@ public GreengrassV2DataClient fetchGreengrassV2DataClient() throws DeviceConfigu } // Caching a http client since it only needs to be recreated if the cert/keys change - private void configureHttpClient(DeviceConfiguration deviceConfiguration) { + private SdkHttpClient configureHttpClient(DeviceConfiguration deviceConfiguration) { logger.atDebug().log("Configuring http client for greengrass v2 data client"); ApacheHttpClient.Builder httpClientBuilder = ClientConfigurationUtils.getConfiguredClientBuilder(deviceConfiguration); - cachedHttpClient = httpClientBuilder.build(); + return httpClientBuilder.build(); } - private void configureClient(DeviceConfiguration deviceConfiguration) { - if (cachedHttpClient == null) { - configureHttpClient(deviceConfiguration); - } + private GreengrassV2DataClient createClient(DeviceConfiguration deviceConfiguration, SdkHttpClient httpClient) { logger.atDebug().log(CONFIGURING_GGV2_INFO_MESSAGE); GreengrassV2DataClientBuilder clientBuilder = GreengrassV2DataClient.builder() // Use an empty credential provider because our requests don't need SigV4 // signing, as they are going through IoT Core instead .credentialsProvider(AnonymousCredentialsProvider.create()) - .httpClient(cachedHttpClient) + .httpClient(httpClient) .overrideConfiguration(ClientOverrideConfiguration.builder().retryPolicy(RetryMode.STANDARD).build()); String region = Coerce.toString(deviceConfiguration.getAWSRegion()); @@ -210,6 +207,25 @@ private void configureClient(DeviceConfiguration deviceConfiguration) { clientBuilder.region(Region.of(region)); } } - this.greengrassV2DataClient = clientBuilder.build(); + return clientBuilder.build(); + } + + private void configureClient(DeviceConfiguration deviceConfiguration) { + if (cachedHttpClient == null) { + cachedHttpClient = configureHttpClient(deviceConfiguration); + } + this.greengrassV2DataClient = createClient(deviceConfiguration, cachedHttpClient); + } + + /** + * Creates new GreengrassV2DataClient. + * + * @param config Device configuration + */ + @SuppressWarnings("PMD.CloseResource") + public Pair createClientFromConfig(DeviceConfiguration config) { + SdkHttpClient httpClient = configureHttpClient(config); + GreengrassV2DataClient ggdc = createClient(config, httpClient); + return new Pair<>(httpClient, ggdc); } } diff --git a/src/test/java/com/aws/greengrass/deployment/ConnectivityValidatorTest.java b/src/test/java/com/aws/greengrass/deployment/ConnectivityValidatorTest.java new file mode 100644 index 0000000000..0f82a121c2 --- /dev/null +++ b/src/test/java/com/aws/greengrass/deployment/ConnectivityValidatorTest.java @@ -0,0 +1,123 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.deployment; + +import com.aws.greengrass.config.Topic; +import com.aws.greengrass.config.Topics; +import com.aws.greengrass.dependency.Context; +import com.aws.greengrass.deployment.model.DeploymentResult; +import com.aws.greengrass.lifecyclemanager.Kernel; +import com.aws.greengrass.mqttclient.MqttClient; +import com.aws.greengrass.security.SecurityService; +import com.aws.greengrass.testcommons.testutilities.GGExtension; +import com.aws.greengrass.util.GreengrassServiceClientFactory; +import com.aws.greengrass.util.Pair; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; +import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder; +import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClient; +import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceRequest; + +import java.util.HashMap; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith({GGExtension.class, MockitoExtension.class}) +public class ConnectivityValidatorTest { + @Mock + private MqttClient mqttClient; + @Mock + private AwsIotMqttConnectionBuilder awsIotMqttConnectionBuilder; + @Mock + private MqttClientConnection mqttClientConnection; + @Mock + private GreengrassServiceClientFactory factory; + @Mock + private SecurityService securityService; + @Mock + private GreengrassV2DataClient greengrassV2DataClient; + @Mock + private DeviceConfiguration deviceConfiguration; + @Mock + private DeviceConfiguration deploymentConfiguration; + + private Kernel kernel; + private Context context; + private Topics topics; + private Topic topic; + private Topic differentTopic; + private ConnectivityValidator validator; + + + @BeforeEach + void beforeEach() throws InterruptedException { + kernel = new Kernel(); + context = kernel.getContext(); + topics = Topics.of(context, "topics-name", null); + topic = Topic.of(context, "topic-name", "value"); + differentTopic = Topic.of(context, "topic-name", "different-value"); + validator = new ConnectivityValidator(kernel, kernel.getContext(), mqttClient, factory, securityService, new HashMap<>(), 0); + validator.setDeploymentConfiguration(deploymentConfiguration); + lenient().when(deploymentConfiguration.isDeviceConfiguredToTalkToCloud()).thenReturn(true); + } + + @AfterEach + void afterEach() { + kernel.shutdown(); + } + + @Test + void GIVEN_connectivity_validator_When_new_configuration_is_valid_THEN_return_true() { + when(deviceConfiguration.getNetworkProxyNamespace()).thenReturn(topics); + when(deploymentConfiguration.getNetworkProxyNamespace()).thenReturn(topics); + when(deviceConfiguration.getAWSRegion()).thenReturn(topic); + when(deploymentConfiguration.getAWSRegion()).thenReturn(differentTopic); + when(deploymentConfiguration.isConnectivityValidationEnabled()).thenReturn(true); + + // Mock Mqtt Validation + String thingName = "thingName"; + String expectedClientId = thingName + ConnectivityValidator.clientIdSuffix; + int mqttTimeout = 1000; + CompletableFuture mqttConnection = new CompletableFuture<>(); + mqttConnection.complete(true); + + when(mqttClient.createMqttConnectionBuilder(deploymentConfiguration, securityService, null)) + .thenReturn(awsIotMqttConnectionBuilder); + when(deploymentConfiguration.getThingName()).thenReturn(Topic.of(context, "name", thingName)); + when(awsIotMqttConnectionBuilder.withClientId(expectedClientId)).thenReturn(awsIotMqttConnectionBuilder); + when(awsIotMqttConnectionBuilder.build()).thenReturn(mqttClientConnection); + Topics mockMqttTopics = mock(Topics.class); + when(deploymentConfiguration.getMQTTNamespace()).thenReturn(mockMqttTopics); + when(mockMqttTopics.findOrDefault(anyInt(), anyString())).thenReturn(mqttTimeout); + when(mqttClientConnection.connect()).thenReturn(mqttConnection); + + // Mock Http Validation + when(factory.createClientFromConfig(any())).thenReturn(new Pair<>(null, greengrassV2DataClient)); + when(greengrassV2DataClient.listThingGroupsForCoreDevice((ListThingGroupsForCoreDeviceRequest) any())) + .thenReturn(null); + + CompletableFuture totallyCompleteFuture = new CompletableFuture<>(); + assertTrue(validator.validate(totallyCompleteFuture, deviceConfiguration)); + } + + @Test + void GIVEN_bad_config_When_connectivity_validation_is_disabled_THEN_return_true() { + when(deploymentConfiguration.isConnectivityValidationEnabled()).thenReturn(false); + assertTrue(validator.validate(null, null)); + } +} diff --git a/src/test/java/com/aws/greengrass/deployment/DeploymentConfigMergerTest.java b/src/test/java/com/aws/greengrass/deployment/DeploymentConfigMergerTest.java index a6a78438fb..26f63c0266 100644 --- a/src/test/java/com/aws/greengrass/deployment/DeploymentConfigMergerTest.java +++ b/src/test/java/com/aws/greengrass/deployment/DeploymentConfigMergerTest.java @@ -26,7 +26,10 @@ import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException; import com.aws.greengrass.logging.api.Logger; import com.aws.greengrass.logging.impl.LogManager; +import com.aws.greengrass.mqttclient.MqttClient; +import com.aws.greengrass.security.SecurityService; import com.aws.greengrass.testcommons.testutilities.GGExtension; +import com.aws.greengrass.util.GreengrassServiceClientFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,6 +39,7 @@ import org.mockito.Mock; import org.mockito.exceptions.misusing.InvalidUseOfMatchersException; import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.services.greengrassv2.model.DeploymentConfigurationValidationPolicy; import java.io.IOException; import java.util.Arrays; @@ -90,6 +94,12 @@ class DeploymentConfigMergerTest { @Mock private DynamicComponentConfigurationValidator validator; @Mock + private MqttClient mqttClient; + @Mock + private GreengrassServiceClientFactory ggscFactory; + @Mock + private SecurityService securityService; + @Mock private Context context; @BeforeEach @@ -307,13 +317,16 @@ void GIVEN_deployment_WHEN_check_safety_selected_THEN_check_safety_before_update when(deploymentActivatorFactory.getDeploymentActivator(any())).thenReturn(deploymentActivator); when(context.get(DeploymentActivatorFactory.class)).thenReturn(deploymentActivatorFactory); - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, + ggscFactory, securityService); DeploymentDocument doc = new DeploymentDocument(); doc.setConfigurationArn("NoSafetyCheckDeploy"); doc.setComponentUpdatePolicy( new ComponentUpdatePolicy(0, SKIP_NOTIFY_COMPONENTS)); - + doc.setConfigurationValidationPolicy( + DeploymentConfigurationValidationPolicy.builder().timeoutInSeconds(0).build()); + doc.setTimestamp((long) 0); merger.mergeInNewConfig(createMockDeployment(doc), new HashMap<>()); verify(updateSystemPolicyService, times(0)).addUpdateAction(any(), any()); @@ -345,7 +358,8 @@ void GIVEN_deployment_WHEN_task_cancelled_THEN_update_is_cancelled() throws Thro }); // GIVEN - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, + ggscFactory, securityService); DeploymentDocument doc = mock(DeploymentDocument.class); when(doc.getDeploymentId()).thenReturn("DeploymentId"); when(doc.getComponentUpdatePolicy()).thenReturn( @@ -381,11 +395,14 @@ void GIVEN_deployment_WHEN_task_not_cancelled_THEN_update_is_continued() throws when(context.get(DefaultActivator.class)).thenReturn(defaultActivator); // GIVEN - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, + ggscFactory, securityService); DeploymentDocument doc = mock(DeploymentDocument.class); when(doc.getDeploymentId()).thenReturn("DeploymentId"); when(doc.getComponentUpdatePolicy()).thenReturn( new ComponentUpdatePolicy(0, NOTIFY_COMPONENTS)); + when(doc.getConfigurationValidationPolicy()).thenReturn( + DeploymentConfigurationValidationPolicy.builder().timeoutInSeconds(0).build()); merger.mergeInNewConfig(createMockDeployment(doc), new HashMap<>()); @@ -437,11 +454,14 @@ void GIVEN_deployment_activate_WHEN_deployment_has_new_config_THEN_new_config_is newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3); newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2); // GIVEN - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, + ggscFactory, securityService); DeploymentDocument doc = mock(DeploymentDocument.class); when(doc.getDeploymentId()).thenReturn("DeploymentId"); when(doc.getComponentUpdatePolicy()).thenReturn( new ComponentUpdatePolicy(0, NOTIFY_COMPONENTS)); + when(doc.getConfigurationValidationPolicy()).thenReturn( + DeploymentConfigurationValidationPolicy.builder().timeoutInSeconds(0).build()); merger.mergeInNewConfig(createMockDeployment(doc), newConfig); @@ -498,11 +518,14 @@ void GIVEN_deployment_activate_WHEN_deployment_has_some_new_config_THEN_old_conf newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3); newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2); // GIVEN - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, + ggscFactory, securityService); DeploymentDocument doc = mock(DeploymentDocument.class); when(doc.getDeploymentId()).thenReturn("DeploymentId"); when(doc.getComponentUpdatePolicy()).thenReturn( new ComponentUpdatePolicy(0, NOTIFY_COMPONENTS)); + when(doc.getConfigurationValidationPolicy()).thenReturn( + DeploymentConfigurationValidationPolicy.builder().timeoutInSeconds(0).build()); merger.mergeInNewConfig(createMockDeployment(doc), newConfig); diff --git a/src/test/java/com/aws/greengrass/deployment/DeviceConfigurationTest.java b/src/test/java/com/aws/greengrass/deployment/DeviceConfigurationTest.java index 3e13de0405..77d7cae9e4 100644 --- a/src/test/java/com/aws/greengrass/deployment/DeviceConfigurationTest.java +++ b/src/test/java/com/aws/greengrass/deployment/DeviceConfigurationTest.java @@ -67,7 +67,7 @@ void beforeEach() { Topics topics = Topics.of(mock(Context.class), SERVICES_NAMESPACE_TOPIC, mock(Topics.class)); when(mockTopics.subscribe(any())).thenReturn(mockTopics); when(configuration.lookupTopics(anyString(), anyString(), anyString())).thenReturn(mockTopics); - when(configuration.lookupTopics(anyString(), anyString(), anyString(), anyString())).thenReturn(mockTopics); + lenient().when(configuration.lookupTopics(anyString(), anyString(), anyString(), anyString())).thenReturn(mockTopics); when(configuration.lookupTopics(anyString())).thenReturn(topics); lenient().when(configuration.lookupTopics(anyString())).thenReturn(topics); } diff --git a/src/test/java/com/aws/greengrass/lifecyclemanager/LogManagerHelperTest.java b/src/test/java/com/aws/greengrass/lifecyclemanager/LogManagerHelperTest.java index 7afaeca9ee..f0dab6d36d 100644 --- a/src/test/java/com/aws/greengrass/lifecyclemanager/LogManagerHelperTest.java +++ b/src/test/java/com/aws/greengrass/lifecyclemanager/LogManagerHelperTest.java @@ -241,6 +241,7 @@ void GIVEN_mock_service_logger_WHEN_reset_THEN_applied_correctly() { when(configuration.lookupTopics(SERVICES_NAMESPACE_TOPIC, DEFAULT_NUCLEUS_COMPONENT_NAME, CONFIGURATION_CONFIG_KEY)).thenReturn(topics); when(configuration.lookupTopics(SYSTEM_NAMESPACE_KEY)).thenReturn(topics); DeviceConfiguration deviceConfiguration = new DeviceConfiguration(configuration, kernelCommandLine); + deviceConfiguration.handleLoggingConfig(); LogManagerHelper.getComponentLogger(mockGreengrassService); LogConfig testLogConfig = LogManager.getLogConfigurations().get(mockServiceName); PersistenceConfig defaultConfig = new PersistenceConfig(LOG_FILE_EXTENSION, LOGS_DIRECTORY); @@ -307,7 +308,8 @@ void GIVEN_all_fields_logger_config_WHEN_subscribe_THEN_correctly_reconfigures_a when(configuration.lookupTopics(anyString())).thenReturn(topics); when(configuration.lookupTopics(SERVICES_NAMESPACE_TOPIC, DEFAULT_NUCLEUS_COMPONENT_NAME, CONFIGURATION_CONFIG_KEY)).thenReturn(topics); when(configuration.lookupTopics(SYSTEM_NAMESPACE_KEY)).thenReturn(topics); - new DeviceConfiguration(configuration, kernelCommandLine); + DeviceConfiguration deviceConfiguration = new DeviceConfiguration(configuration, kernelCommandLine); + deviceConfiguration.handleLoggingConfig(); assertEquals(Level.TRACE, LogManager.getRootLogConfiguration().getLevel()); assertEquals(LogStore.FILE, LogManager.getRootLogConfiguration().getStore()); @@ -338,7 +340,8 @@ void GIVEN_null_logger_config_WHEN_subscribe_THEN_correctly_reconfigures_all_log when(configuration.lookupTopics(anyString())).thenReturn(topics); when(configuration.lookupTopics(SERVICES_NAMESPACE_TOPIC, DEFAULT_NUCLEUS_COMPONENT_NAME, CONFIGURATION_CONFIG_KEY)).thenReturn(topics); when(configuration.lookupTopics(SYSTEM_NAMESPACE_KEY)).thenReturn(topics); - new DeviceConfiguration(configuration, kernelCommandLine); + DeviceConfiguration deviceConfiguration = new DeviceConfiguration(configuration, kernelCommandLine); + deviceConfiguration.handleLoggingConfig(); assertEquals(Level.INFO, LogManager.getRootLogConfiguration().getLevel()); assertEquals("greengrass", LogManager.getRootLogConfiguration().getFileName()); @@ -377,7 +380,8 @@ void loggers_created_before_or_after_log_level_change_get_the_correct_level() th Logger logger1 = LogManagerHelper.getComponentLogger(mockGreengrassService); assertFalse(logger1.isDebugEnabled()); - new DeviceConfiguration(config, kernelCommandLine); + DeviceConfiguration deviceConfiguration = new DeviceConfiguration(config, kernelCommandLine); + deviceConfiguration.handleLoggingConfig(); context.runOnPublishQueueAndWait(() -> logTopics.updateFromMap(Utils.immutableMap("level", "DEBUG"), new UpdateBehaviorTree(UpdateBehaviorTree.UpdateBehavior.REPLACE, System.currentTimeMillis()))); diff --git a/src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java b/src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java index 21c8bf5bb7..3953017fe2 100644 --- a/src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java +++ b/src/test/java/com/aws/greengrass/mqttclient/MqttClientTest.java @@ -208,7 +208,7 @@ void afterEach() throws IOException { @ParameterizedTest @CsvSource({"10000,10000", "10000,10001"}) void GIVEN_ping_timeout_gte_keep_alive_WHEN_mqtt_client_connects_THEN_throws_exception(int keepAlive, - int pingTimeout) { + int pingTimeout) { mqttNamespace.lookup(MqttClient.MQTT_KEEP_ALIVE_TIMEOUT_KEY).withValue(keepAlive); mqttNamespace.lookup(MqttClient.MQTT_PING_TIMEOUT_KEY).withValue(pingTimeout); MqttClient mqttClient = new MqttClient(deviceConfiguration, ses, executorService, diff --git a/src/test/java/com/aws/greengrass/tes/TokenExchangeServiceTest.java b/src/test/java/com/aws/greengrass/tes/TokenExchangeServiceTest.java index ae1ef40cce..815cc67dbb 100644 --- a/src/test/java/com/aws/greengrass/tes/TokenExchangeServiceTest.java +++ b/src/test/java/com/aws/greengrass/tes/TokenExchangeServiceTest.java @@ -111,7 +111,7 @@ void setup() { .thenReturn(componentTypeTopic); when(configuration.lookup(SERVICES_NAMESPACE_TOPIC, DEFAULT_NUCLEUS_COMPONENT_NAME, CONFIGURATION_CONFIG_KEY, COMPONENT_STORE_MAX_SIZE_BYTES)).thenReturn(componentStoreSizeLimitTopic); - when(configuration.lookupTopics(SERVICES_NAMESPACE_TOPIC, DEFAULT_NUCLEUS_COMPONENT_NAME, CONFIGURATION_CONFIG_KEY, + lenient().when(configuration.lookupTopics(SERVICES_NAMESPACE_TOPIC, DEFAULT_NUCLEUS_COMPONENT_NAME, CONFIGURATION_CONFIG_KEY, NUCLEUS_CONFIG_LOGGING_TOPICS)).thenReturn(mock(Topics.class)); when(configuration.lookup(SERVICES_NAMESPACE_TOPIC, DEFAULT_NUCLEUS_COMPONENT_NAME, CONFIGURATION_CONFIG_KEY, DEPLOYMENT_POLLING_FREQUENCY_SECONDS)).thenReturn(deploymentPollingFrequency);