diff --git a/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java b/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java index 961591dece..9c06a8192e 100644 --- a/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java +++ b/src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java @@ -27,15 +27,55 @@ import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException; import com.aws.greengrass.logging.api.Logger; import com.aws.greengrass.logging.impl.LogManager; +import com.aws.greengrass.mqttclient.MqttClient; +import com.aws.greengrass.security.SecurityService; +import com.aws.greengrass.security.exceptions.MqttConnectionProviderException; import com.aws.greengrass.util.Coerce; +import com.aws.greengrass.util.EncryptionUtils; +import com.aws.greengrass.util.IotSdkClientFactory; +import com.aws.greengrass.util.ProxyUtils; +import com.aws.greengrass.util.RegionUtils; +import com.aws.greengrass.util.Utils; +import com.aws.greengrass.util.exceptions.InvalidEnvironmentStageException; +import com.aws.greengrass.util.exceptions.TLSAuthException; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; +import org.apache.http.client.utils.URIBuilder; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.retry.RetryMode; +import software.amazon.awssdk.crt.http.HttpProxyOptions; +import software.amazon.awssdk.crt.io.ClientTlsContext; +import software.amazon.awssdk.crt.io.SocketOptions; +import software.amazon.awssdk.crt.io.TlsContextOptions; +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; +import software.amazon.awssdk.crt.mqtt.MqttException; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.iot.AwsIotMqttConnectionBuilder; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.greengrassv2.model.DeploymentComponentUpdatePolicyAction; - +import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClient; +import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClientBuilder; +import software.amazon.awssdk.services.greengrassv2data.model.GreengrassV2DataException; +import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceRequest; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.time.Duration; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -45,13 +85,40 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.inject.Inject; +import javax.net.ssl.KeyManager; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.security.auth.x500.X500Principal; import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEFAULT_ENV_STAGE; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_MQTT_NAMESPACE; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_NETWORK_PROXY_NAMESPACE; import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_AWS_REGION; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_ENV_STAGE; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_GG_DATA_ENDPOINT; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_GG_DATA_PLANE_PORT; import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_IOT_CRED_ENDPOINT; import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_IOT_DATA_ENDPOINT; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_NO_PROXY_ADDRESSES; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_PROXY_URL; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_PROXY_USERNAME; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_PROXY_PASSWORD; +import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PROXY_NAMESPACE; +import static com.aws.greengrass.deployment.DeviceConfiguration.GG_DATA_PLANE_PORT_DEFAULT; +import static com.aws.greengrass.deployment.DynamicComponentConfigurationValidator.DEFAULT_TIMEOUT_SECOND; import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICES_NAMESPACE_TOPIC; import static com.aws.greengrass.lifecyclemanager.GreengrassService.SERVICE_NAME_KEY; +import static com.aws.greengrass.mqttclient.MqttClient.DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT; +import static com.aws.greengrass.mqttclient.MqttClient.DEFAULT_MQTT_OPERATION_TIMEOUT; +import static com.aws.greengrass.mqttclient.MqttClient.DEFAULT_MQTT_PING_TIMEOUT; +import static com.aws.greengrass.mqttclient.MqttClient.DEFAULT_MQTT_PORT; +import static com.aws.greengrass.mqttclient.MqttClient.DEFAULT_MQTT_SOCKET_TIMEOUT; +import static com.aws.greengrass.mqttclient.MqttClient.MQTT_KEEP_ALIVE_TIMEOUT_KEY; +import static com.aws.greengrass.mqttclient.MqttClient.MQTT_OPERATION_TIMEOUT_KEY; +import static com.aws.greengrass.mqttclient.MqttClient.MQTT_PING_TIMEOUT_KEY; +import static com.aws.greengrass.mqttclient.MqttClient.MQTT_PORT_KEY; +import static com.aws.greengrass.mqttclient.MqttClient.MQTT_SOCKET_TIMEOUT_KEY; @AllArgsConstructor(onConstructor = @__(@Inject)) public class DeploymentConfigMerger { @@ -66,6 +133,7 @@ public class DeploymentConfigMerger { private Kernel kernel; private DeviceConfiguration deviceConfiguration; private DynamicComponentConfigurationValidator validator; + private SecurityService securityService; /** * Merge in new configuration values and new services. @@ -143,7 +211,8 @@ private void updateActionForDeployment(Map newConfig, Deployment } // Validate the AWS region, IoT credentials endpoint as well as the IoT data endpoint. - if (!validateNucleusConfig(totallyCompleteFuture, nucleusConfig)) { + if (!validateNucleusConfig(totallyCompleteFuture, nucleusConfig, + deployment.getDeploymentDocumentObj().getConfigurationValidationPolicy().timeoutInSeconds())) { return; } @@ -153,7 +222,7 @@ private void updateActionForDeployment(Map newConfig, Deployment } private boolean validateNucleusConfig(CompletableFuture totallyCompleteFuture, - Map nucleusConfig) { + Map nucleusConfig, Integer timeoutSec) { if (nucleusConfig != null) { String awsRegion = tryGetAwsRegionFromNewConfig(nucleusConfig); String iotCredEndpoint = tryGetIoTCredEndpointFromNewConfig(nucleusConfig); @@ -166,10 +235,229 @@ private boolean validateNucleusConfig(CompletableFuture totall .complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, e)); return false; } + + long configTimeout = Duration.ofSeconds(DEFAULT_TIMEOUT_SECOND).toMillis(); + if (timeoutSec != null) { + configTimeout = Duration.ofSeconds(timeoutSec).toMillis(); + } + + if (configTimeout == 0 || !deviceConfiguration.isDeviceConfiguredToTalkToCloud()) { + logger.atDebug().log("Skipping connectivity validation"); + return true; + } + + MqttClientConnection connection = null; + try { + logger.atDebug().log("Checking MQTT client can connect"); + connection = createMqttConnection(nucleusConfig); + checkMqttConnection(connection, configTimeout); + + logger.atDebug().log("Checking HTTP client can connect"); + GreengrassV2DataClient greengrassV2DataClient = createGGv2DataClient(nucleusConfig); + checkHttpConnection(greengrassV2DataClient, configTimeout); + } catch (ComponentConfigurationValidationException e) { + logger.atError().cause(e).log("Nucleus connectivity validation failed"); + connection.close(); + totallyCompleteFuture + .complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, e)); + return false; + } catch (CertificateException | IOException | + KeyStoreException | NoSuchAlgorithmException + | TLSAuthException | InvalidEnvironmentStageException e) { + logger.atError().cause(e).log("Unable to create GGv2 data client"); + totallyCompleteFuture + .complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, e)); + return false; + } catch (InterruptedException e) { + logger.atError().cause(e).log("Validation interrupted unexpectedly"); + totallyCompleteFuture + .complete(new DeploymentResult(DeploymentResult.DeploymentStatus.FAILED_NO_STATE_CHANGE, e)); + return false; + } + connection.close(); } return true; } + private void checkHttpConnection(GreengrassV2DataClient greengrassV2DataClient, long timeout) throws InterruptedException, ComponentConfigurationValidationException { + ListThingGroupsForCoreDeviceRequest request = ListThingGroupsForCoreDeviceRequest.builder() + .coreDeviceThingName(Coerce.toString(deviceConfiguration.getThingName())) + .build(); + long tryUntil = System.currentTimeMillis() + timeout; + while(System.currentTimeMillis() < tryUntil) { + try { + greengrassV2DataClient.listThingGroupsForCoreDevice(request); + return; + } catch (GreengrassV2DataException | SdkClientException e) { + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + } + } + throw new ComponentConfigurationValidationException("HTTP client failed to connect to IoT Core using new configurations", DeploymentErrorCode.FAILED_TO_RECONNECT); + } + + private void checkMqttConnection(MqttClientConnection connection, long timeout) throws InterruptedException, ComponentConfigurationValidationException { + long tryUntil = System.currentTimeMillis() + timeout; + while(System.currentTimeMillis() < tryUntil) { + try { + connection.connect().get(); + return; + } catch (MqttException | ExecutionException e) { + Thread.sleep(TimeUnit.SECONDS.toMillis(2)); + } + } + throw new ComponentConfigurationValidationException("MQTT client failed to connect to IoT Core using new configurations", DeploymentErrorCode.FAILED_TO_RECONNECT); + } + + private MqttClientConnection createMqttConnection(Map nucleusConfig) { + AwsIotMqttConnectionBuilder builder; + try { + builder = securityService.getDeviceIdentityMqttConnectionBuilder(); + } catch (MqttConnectionProviderException e) { + throw new MqttException(e.getMessage()); + } + + // get mqtt values from nucleus config to construct a MQTT client + int pingTimeoutMs = tryGetMqttPingTimeoutFromNewConfig(nucleusConfig); + int keepAliveMs = tryGetMqttKeepAliveMsFromNewConfig(nucleusConfig); + if (keepAliveMs != 0 && keepAliveMs <= pingTimeoutMs) { + throw new MqttException(String.format("%s must be greater than %s", + MQTT_KEEP_ALIVE_TIMEOUT_KEY, MQTT_PING_TIMEOUT_KEY)); + } + String rootCaPath = Coerce.toString(deviceConfiguration.getRootCAFilePath()); + String clientId = Coerce.toString(deviceConfiguration.getThingName()); + String endpoint = tryGetIoTDataEndpointFromNewConfig(nucleusConfig); + short port = tryGetMqttPortFromNewConfig(nucleusConfig); + int operationTimeout = tryGetMqttOperationTimeoutFromNewConfig(nucleusConfig); + int socketTimeout = tryGetMqttSocketTimeoutFromNewConfig(nucleusConfig); + + // start building mqtt connection + builder.withCertificateAuthorityFromPath(null, rootCaPath) + .withClientId(clientId) + .withEndpoint(endpoint) + .withPort(port) + .withCleanSession(true) + .withKeepAliveMs(keepAliveMs) + .withProtocolOperationTimeoutMs(operationTimeout) + .withPingTimeoutMs(pingTimeoutMs) + .withSocketOptions(new SocketOptions()).withTimeoutMs(socketTimeout); + + // add proxy settings if configured + String proxyUrl = tryGetProxyUrlFromNewConfig(nucleusConfig); + if (!Utils.isEmpty(proxyUrl)) { + HttpProxyOptions httpProxyOptions = new HttpProxyOptions(); + httpProxyOptions.setHost(ProxyUtils.getHostFromProxyUrl(proxyUrl)); + httpProxyOptions.setPort(ProxyUtils.getPortFromProxyUrl(proxyUrl)); + httpProxyOptions.setConnectionType(HttpProxyOptions.HttpProxyConnectionType.Tunneling); + + if ("https".equalsIgnoreCase(ProxyUtils.getSchemeFromProxyUrl(proxyUrl))) { + TlsContextOptions proxyTlsOptions = MqttClient.getTlsContextOptions(rootCaPath); + ClientTlsContext tlsContext = new ClientTlsContext(proxyTlsOptions); + httpProxyOptions.setTlsContext(tlsContext); + } + + String username = tryGetProxyUsernameFromNewConfig(nucleusConfig); + String password = tryGetProxyPasswordFromNewConfig(nucleusConfig); + String proxyUsername = ProxyUtils.getProxyUsername(proxyUrl, username); + if (Utils.isNotEmpty(proxyUsername)) { + httpProxyOptions.setAuthorizationType(HttpProxyOptions.HttpProxyAuthorizationType.Basic); + httpProxyOptions.setAuthorizationUsername(proxyUsername); + httpProxyOptions + .setAuthorizationPassword(ProxyUtils.getProxyPassword(proxyUrl, password)); + } + + String noProxy = tryGetNoProxyAddressFromNewConfig(nucleusConfig); + boolean useProxy = true; + // Only use the proxy when the endpoint we're connecting to is not in the NoProxyAddress list + if (Utils.isNotEmpty(noProxy) && Utils.isNotEmpty(endpoint)) { + useProxy = Arrays.stream(noProxy.split(",")).noneMatch(endpoint::matches); + } + if (useProxy) { + builder.withHttpProxyOptions(httpProxyOptions); + } + } + + return builder.build(); + } + + private GreengrassV2DataClient createGGv2DataClient(Map nucleusConfig) throws CertificateException, IOException, KeyStoreException, NoSuchAlgorithmException, TLSAuthException, InvalidEnvironmentStageException { + ApacheHttpClient.Builder httpClient = ProxyUtils.getSdkHttpClientBuilder(); + String rootCAPath = Coerce.toString(deviceConfiguration.getRootCAFilePath()); + if (Utils.isEmpty(rootCAPath)) { + throw new RuntimeException("Missing root CA"); + } + + List trustCertificates = EncryptionUtils.loadX509Certificates(Paths.get(rootCAPath)); + KeyStore tmKeyStore = KeyStore.getInstance("JKS"); + tmKeyStore.load(null, null); + for (X509Certificate certificate : trustCertificates) { + X500Principal principal = certificate.getSubjectX500Principal(); + String name = principal.getName("RFC2253"); + tmKeyStore.setCertificateEntry(name, certificate); + } + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509"); + trustManagerFactory.init(tmKeyStore); + TrustManager[] trustManagers = trustManagerFactory.getTrustManagers(); + KeyManager[] keyManagers = deviceConfiguration.getDeviceIdentityKeyManagers(); + httpClient.tlsKeyManagersProvider(() -> keyManagers).tlsTrustManagersProvider(() -> trustManagers); + + GreengrassV2DataClientBuilder clientBuilder = GreengrassV2DataClient.builder() + // Use an empty credential provider because our requests don't need SigV4 + // signing, as they are going through IoT Core instead + .credentialsProvider(AnonymousCredentialsProvider.create()) + .httpClient(httpClient.build()) + .overrideConfiguration(ClientOverrideConfiguration.builder().retryPolicy(RetryMode.STANDARD).build()); + + String region = tryGetAwsRegionFromNewConfig(nucleusConfig); + if (!Utils.isEmpty(region)) { + String greengrassServiceEndpoint = getGreengrassServiceEndpoint(nucleusConfig); + if (!Utils.isEmpty(greengrassServiceEndpoint)) { + clientBuilder.endpointOverride(URI.create(greengrassServiceEndpoint)); + clientBuilder.region(Region.of(region)); + } else { + clientBuilder.region(Region.of(region)); + } + } + return clientBuilder.build(); + } + + private String getGreengrassServiceEndpoint(Map nucleusConfig) throws InvalidEnvironmentStageException { + IotSdkClientFactory.EnvironmentStage stage; + stage = IotSdkClientFactory.EnvironmentStage.fromString(tryGetEnvironmentStageFromNewConfig(nucleusConfig)); + + // Use customer configured GG endpoint if it is set + String endpoint = tryGetGreengrassEndpointFromNewConfig(nucleusConfig); + int port = tryGetGreengrassPortFromNewConfig(nucleusConfig); + if (Utils.isEmpty(endpoint)) { + // Fallback to global endpoint if no GG endpoint was specified + return RegionUtils.getGreengrassDataPlaneEndpoint(tryGetAwsRegionFromNewConfig(nucleusConfig), stage, port); + } + + // If customer specifies "iotdata" then use the iotdata endpoint, rather than needing them to specify the same + // endpoint twice in the config. + String iotData = tryGetIoTDataEndpointFromNewConfig(nucleusConfig); + if ("iotdata".equalsIgnoreCase(endpoint) && Utils.isNotEmpty(iotData)) { + // Use customer configured IoT data endpoint if it is set + endpoint = iotData; + } + + // This method returns a URI, not just an endpoint + if (!endpoint.startsWith("https://")) { + endpoint = "https://" + endpoint; + } + try { + URI endpointUri = new URI(endpoint); + // If the port is defined in the URI, then return it as-is + if (endpointUri.getPort() != -1) { + return endpoint; + } + // Modify the URI with the user's chosen port + return new URIBuilder(endpointUri).setPort(port).toString(); + } catch (URISyntaxException e) { + logger.atError().log("Invalid endpoint {}", endpoint, e); + return RegionUtils.getGreengrassDataPlaneEndpoint(tryGetAwsRegionFromNewConfig(nucleusConfig), stage, port); + } + } + /** * Completes the provided future when all of the listed services are running. * @@ -240,6 +528,84 @@ private String tryGetIoTDataEndpointFromNewConfig(Map kernelConf return iotDataEndpoint; } + private Map tryGetMqttFromNewConfig(Map kernelConfig) { + Map mqtt = deviceConfiguration.getMQTTNamespace().toPOJO(); + if (kernelConfig.containsKey(DEVICE_MQTT_NAMESPACE)) { + mqtt = (Map) kernelConfig.get(DEVICE_MQTT_NAMESPACE); + } + return mqtt; + } + + private int tryGetMqttPingTimeoutFromNewConfig(Map kernelConfig) { + Map mqtt = tryGetMqttFromNewConfig(kernelConfig); + return Coerce.toInt(mqtt.getOrDefault(MQTT_PING_TIMEOUT_KEY, DEFAULT_MQTT_PING_TIMEOUT)); + } + + private int tryGetMqttKeepAliveMsFromNewConfig(Map kernelConfig) { + Map mqtt = tryGetMqttFromNewConfig(kernelConfig); + return Coerce.toInt(mqtt.getOrDefault(MQTT_KEEP_ALIVE_TIMEOUT_KEY, DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT)); + } + + private short tryGetMqttPortFromNewConfig(Map kernelConfig) { + Map mqtt = tryGetMqttFromNewConfig(kernelConfig); + return (short) Coerce.toInt(mqtt.getOrDefault(MQTT_PORT_KEY, DEFAULT_MQTT_PORT)); + } + + private int tryGetMqttOperationTimeoutFromNewConfig(Map kernelConfig) { + Map mqtt = tryGetMqttFromNewConfig(kernelConfig); + return Coerce.toInt(mqtt.getOrDefault(MQTT_OPERATION_TIMEOUT_KEY, DEFAULT_MQTT_OPERATION_TIMEOUT)); + } + + private int tryGetMqttSocketTimeoutFromNewConfig(Map kernelConfig) { + Map mqtt = tryGetMqttFromNewConfig(kernelConfig); + return Coerce.toInt(mqtt.getOrDefault(MQTT_SOCKET_TIMEOUT_KEY, DEFAULT_MQTT_SOCKET_TIMEOUT)); + } + + private Map tryGetNetworkProxyFromNewConfig(Map kernelConfig) { + Map networkProxy = deviceConfiguration.getNetworkProxyNamespace().toPOJO(); + if (kernelConfig.containsKey(DEVICE_NETWORK_PROXY_NAMESPACE)) { + networkProxy = (Map) kernelConfig.get(DEVICE_NETWORK_PROXY_NAMESPACE); + } + return networkProxy; + } + + private Map tryGetProxyFromNewConfig(Map kernelConfig) { + Map networkProxy = tryGetNetworkProxyFromNewConfig(kernelConfig); + return (Map) networkProxy.get(DEVICE_PROXY_NAMESPACE); + } + + private String tryGetProxyUrlFromNewConfig(Map kernelConfig) { + Map proxy = tryGetProxyFromNewConfig(kernelConfig); + return Coerce.toString(proxy.get(DEVICE_PARAM_PROXY_URL)); + } + + private String tryGetProxyUsernameFromNewConfig(Map kernelConfig) { + Map proxy = tryGetProxyFromNewConfig(kernelConfig); + return Coerce.toString(proxy.getOrDefault(DEVICE_PARAM_PROXY_USERNAME, "")); + } + + private String tryGetProxyPasswordFromNewConfig(Map kernelConfig) { + Map proxy = tryGetProxyFromNewConfig(kernelConfig); + return Coerce.toString(proxy.getOrDefault(DEVICE_PARAM_PROXY_PASSWORD, "")); + } + + private String tryGetNoProxyAddressFromNewConfig(Map kernelConfig) { + Map proxy = tryGetProxyFromNewConfig(kernelConfig); + return Coerce.toString(proxy.getOrDefault(DEVICE_PARAM_NO_PROXY_ADDRESSES, "")); + } + + private String tryGetEnvironmentStageFromNewConfig(Map kernelConfig) { + return Coerce.toString(kernelConfig.getOrDefault(DEVICE_PARAM_ENV_STAGE, DEFAULT_ENV_STAGE)); + } + + private String tryGetGreengrassEndpointFromNewConfig(Map kernelConfig) { + return Coerce.toString(kernelConfig.getOrDefault(DEVICE_PARAM_GG_DATA_ENDPOINT, "")); + } + + private int tryGetGreengrassPortFromNewConfig(Map kernelConfig) { + return Coerce.toInt(kernelConfig.getOrDefault(DEVICE_PARAM_GG_DATA_PLANE_PORT, GG_DATA_PLANE_PORT_DEFAULT)); + } + @Getter @AllArgsConstructor(access = AccessLevel.PRIVATE) @@ -307,7 +673,6 @@ public AggregateServicesChangeManager createRollbackManager() { /** * Start the new services the merge intends to add. - * */ public void startNewServices() { for (String serviceName : servicesToAdd) { diff --git a/src/main/java/com/aws/greengrass/deployment/DeviceConfiguration.java b/src/main/java/com/aws/greengrass/deployment/DeviceConfiguration.java index 72c9c1a047..5de1522679 100644 --- a/src/main/java/com/aws/greengrass/deployment/DeviceConfiguration.java +++ b/src/main/java/com/aws/greengrass/deployment/DeviceConfiguration.java @@ -136,10 +136,10 @@ public class DeviceConfiguration { public static final long COMPONENT_STORE_MAX_SIZE_DEFAULT_BYTES = 10_000_000_000L; public static final long DEPLOYMENT_POLLING_FREQUENCY_DEFAULT_SECONDS = 15L; public static final String DEVICE_PARAM_GG_DATA_PLANE_PORT = "greengrassDataPlanePort"; - private static final int GG_DATA_PLANE_PORT_DEFAULT = 8443; + public static final int GG_DATA_PLANE_PORT_DEFAULT = 8443; - private static final String DEVICE_PARAM_ENV_STAGE = "envStage"; - private static final String DEFAULT_ENV_STAGE = "prod"; + public static final String DEVICE_PARAM_ENV_STAGE = "envStage"; + public static final String DEFAULT_ENV_STAGE = "prod"; private static final String CANNOT_BE_EMPTY = " cannot be empty"; private static final Logger logger = LogManager.getLogger(DeviceConfiguration.class); public static final String AWS_IOT_THING_NAME_ENV = "AWS_IOT_THING_NAME"; diff --git a/src/main/java/com/aws/greengrass/deployment/ThingGroupHelper.java b/src/main/java/com/aws/greengrass/deployment/ThingGroupHelper.java index caa6953758..b170f4d933 100644 --- a/src/main/java/com/aws/greengrass/deployment/ThingGroupHelper.java +++ b/src/main/java/com/aws/greengrass/deployment/ThingGroupHelper.java @@ -5,6 +5,8 @@ package com.aws.greengrass.deployment; +import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode; +import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException; import com.aws.greengrass.deployment.exceptions.DeviceConfigurationException; import com.aws.greengrass.deployment.exceptions.RetryableServerErrorException; import com.aws.greengrass.logging.api.Logger; @@ -16,6 +18,7 @@ import lombok.Getter; import lombok.Setter; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClient; import software.amazon.awssdk.services.greengrassv2data.model.GreengrassV2DataException; import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceRequest; import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceResponse; @@ -102,4 +105,40 @@ public Optional> listThingGroupsForDevice(int maxAttemptCount) throw return Optional.of(thingGroupNames); }, "get-thing-group-hierarchy", logger); } + + public void waitForReconnect(long timeoutMillis, GreengrassV2DataClient greengrassV2DataClient) throws ComponentConfigurationValidationException { + if (!deviceConfiguration.isDeviceConfiguredToTalkToCloud()) { + return; + } + Duration initialInterval = Duration.ofMillis(timeoutMillis / 8); + Duration maxRetryInterval = Duration.ofMillis(timeoutMillis / 4); + + try { + RetryUtils.runWithRetry(clientExceptionRetryConfig.toBuilder() + .maxAttempt(3) + .initialRetryInterval(initialInterval) + .maxRetryInterval(maxRetryInterval) + .build(), + () -> { + ListThingGroupsForCoreDeviceRequest request = ListThingGroupsForCoreDeviceRequest.builder() + .coreDeviceThingName(Coerce.toString(deviceConfiguration.getThingName())) + .build(); + + ListThingGroupsForCoreDeviceResponse response; + try { + response = greengrassV2DataClient.listThingGroupsForCoreDevice(request); + } catch (GreengrassV2DataException e) { + if (RetryUtils.retryErrorCodes(e.statusCode())) { + throw new RetryableServerErrorException("Failed with retryable error " + e.statusCode() + + " when calling listThingGroupsForCoreDevice", e); + } + throw e; + } + return response; + }, "get-thing-group-hierarchy", logger); + } catch (Exception e) { + throw new ComponentConfigurationValidationException("HTTP client failed to reconnect with new configuration: " + e, + DeploymentErrorCode.FAILED_TO_RECONNECT); + } + } } diff --git a/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java b/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java index cbedeb0211..7452e0fd4b 100644 --- a/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java +++ b/src/main/java/com/aws/greengrass/deployment/errorcode/DeploymentErrorCode.java @@ -87,6 +87,7 @@ public enum DeploymentErrorCode { UNSUPPORTED_REGION(DeploymentErrorType.REQUEST_ERROR), IOT_CRED_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR), IOT_DATA_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR), + FAILED_TO_RECONNECT(DeploymentErrorType.REQUEST_ERROR), /* Docker issues */ DOCKER_ERROR(DeploymentErrorType.DEPENDENCY_ERROR), diff --git a/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java b/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java index c86dd87f70..e54ea5eafb 100644 --- a/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java +++ b/src/main/java/com/aws/greengrass/mqttclient/MqttClient.java @@ -94,18 +94,18 @@ @SuppressWarnings({"PMD.AvoidDuplicateLiterals"}) public class MqttClient implements Closeable { private static final Logger logger = LogManager.getLogger(MqttClient.class); - static final String MQTT_KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeoutMs"; - static final int DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT = (int) Duration.ofSeconds(60).toMillis(); - static final String MQTT_PING_TIMEOUT_KEY = "pingTimeoutMs"; - private static final int DEFAULT_MQTT_PING_TIMEOUT = (int) Duration.ofSeconds(30).toMillis(); + public static final String MQTT_KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeoutMs"; + public static final int DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT = (int) Duration.ofSeconds(60).toMillis(); + public static final String MQTT_PING_TIMEOUT_KEY = "pingTimeoutMs"; + public static final int DEFAULT_MQTT_PING_TIMEOUT = (int) Duration.ofSeconds(30).toMillis(); private static final String MQTT_THREAD_POOL_SIZE_KEY = "threadPoolSize"; public static final int DEFAULT_MQTT_PORT = 8883; public static final String MQTT_PORT_KEY = "port"; - private static final String MQTT_SOCKET_TIMEOUT_KEY = "socketTimeoutMs"; + public static final String MQTT_SOCKET_TIMEOUT_KEY = "socketTimeoutMs"; // Default taken from AWS SDK - private static final int DEFAULT_MQTT_SOCKET_TIMEOUT = (int) Duration.ofSeconds(3).toMillis(); - static final String MQTT_OPERATION_TIMEOUT_KEY = "operationTimeoutMs"; - static final int DEFAULT_MQTT_OPERATION_TIMEOUT = (int) Duration.ofSeconds(30).toMillis(); + public static final int DEFAULT_MQTT_SOCKET_TIMEOUT = (int) Duration.ofSeconds(3).toMillis(); + public static final String MQTT_OPERATION_TIMEOUT_KEY = "operationTimeoutMs"; + public static final int DEFAULT_MQTT_OPERATION_TIMEOUT = (int) Duration.ofSeconds(30).toMillis(); static final int DEFAULT_MQTT_CLOSE_TIMEOUT = (int) Duration.ofSeconds(2).toMillis(); static final String MQTT_MAX_IN_FLIGHT_PUBLISHES_KEY = "maxInFlightPublishes"; static final int DEFAULT_MAX_IN_FLIGHT_PUBLISHES = 5; @@ -390,7 +390,7 @@ public MqttClient(DeviceConfiguration deviceConfiguration, Spool spool, boolean validateAndSetMqttPublishConfiguration(); } - private TlsContextOptions getTlsContextOptions(String rootCaPath) { + public static TlsContextOptions getTlsContextOptions(String rootCaPath) { return Utils.isNotEmpty(rootCaPath) ? TlsContextOptions.createDefaultClient().withCertificateAuthorityFromPath(null, rootCaPath) : TlsContextOptions.createDefaultClient(); diff --git a/src/test/java/com/aws/greengrass/deployment/DeploymentConfigMergerTest.java b/src/test/java/com/aws/greengrass/deployment/DeploymentConfigMergerTest.java index a6a78438fb..c31b16c306 100644 --- a/src/test/java/com/aws/greengrass/deployment/DeploymentConfigMergerTest.java +++ b/src/test/java/com/aws/greengrass/deployment/DeploymentConfigMergerTest.java @@ -26,6 +26,7 @@ import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException; import com.aws.greengrass.logging.api.Logger; import com.aws.greengrass.logging.impl.LogManager; +import com.aws.greengrass.mqttclient.MqttClient; import com.aws.greengrass.testcommons.testutilities.GGExtension; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -90,6 +91,10 @@ class DeploymentConfigMergerTest { @Mock private DynamicComponentConfigurationValidator validator; @Mock + private MqttClient mqttClient; + @Mock + private ThingGroupHelper thingGroupHelper; + @Mock private Context context; @BeforeEach @@ -307,7 +312,7 @@ void GIVEN_deployment_WHEN_check_safety_selected_THEN_check_safety_before_update when(deploymentActivatorFactory.getDeploymentActivator(any())).thenReturn(deploymentActivator); when(context.get(DeploymentActivatorFactory.class)).thenReturn(deploymentActivatorFactory); - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper); DeploymentDocument doc = new DeploymentDocument(); doc.setConfigurationArn("NoSafetyCheckDeploy"); @@ -345,7 +350,7 @@ void GIVEN_deployment_WHEN_task_cancelled_THEN_update_is_cancelled() throws Thro }); // GIVEN - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper); DeploymentDocument doc = mock(DeploymentDocument.class); when(doc.getDeploymentId()).thenReturn("DeploymentId"); when(doc.getComponentUpdatePolicy()).thenReturn( @@ -381,7 +386,7 @@ void GIVEN_deployment_WHEN_task_not_cancelled_THEN_update_is_continued() throws when(context.get(DefaultActivator.class)).thenReturn(defaultActivator); // GIVEN - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper); DeploymentDocument doc = mock(DeploymentDocument.class); when(doc.getDeploymentId()).thenReturn("DeploymentId"); when(doc.getComponentUpdatePolicy()).thenReturn( @@ -437,7 +442,7 @@ void GIVEN_deployment_activate_WHEN_deployment_has_new_config_THEN_new_config_is newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3); newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2); // GIVEN - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper); DeploymentDocument doc = mock(DeploymentDocument.class); when(doc.getDeploymentId()).thenReturn("DeploymentId"); when(doc.getComponentUpdatePolicy()).thenReturn( @@ -498,7 +503,7 @@ void GIVEN_deployment_activate_WHEN_deployment_has_some_new_config_THEN_old_conf newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3); newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2); // GIVEN - DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator); + DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper); DeploymentDocument doc = mock(DeploymentDocument.class); when(doc.getDeploymentId()).thenReturn("DeploymentId"); when(doc.getComponentUpdatePolicy()).thenReturn(