Skip to content

Commit

Permalink
feat: add nucleus connectivity validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tianci Shen committed Mar 19, 2024
1 parent 2f671e3 commit 8f9f232
Show file tree
Hide file tree
Showing 6 changed files with 431 additions and 21 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ public class DeviceConfiguration {
public static final long COMPONENT_STORE_MAX_SIZE_DEFAULT_BYTES = 10_000_000_000L;
public static final long DEPLOYMENT_POLLING_FREQUENCY_DEFAULT_SECONDS = 15L;
public static final String DEVICE_PARAM_GG_DATA_PLANE_PORT = "greengrassDataPlanePort";
private static final int GG_DATA_PLANE_PORT_DEFAULT = 8443;
public static final int GG_DATA_PLANE_PORT_DEFAULT = 8443;

private static final String DEVICE_PARAM_ENV_STAGE = "envStage";
private static final String DEFAULT_ENV_STAGE = "prod";
public static final String DEVICE_PARAM_ENV_STAGE = "envStage";
public static final String DEFAULT_ENV_STAGE = "prod";
private static final String CANNOT_BE_EMPTY = " cannot be empty";
private static final Logger logger = LogManager.getLogger(DeviceConfiguration.class);
public static final String AWS_IOT_THING_NAME_ENV = "AWS_IOT_THING_NAME";
Expand Down
39 changes: 39 additions & 0 deletions src/main/java/com/aws/greengrass/deployment/ThingGroupHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.aws.greengrass.deployment;

import com.aws.greengrass.deployment.errorcode.DeploymentErrorCode;
import com.aws.greengrass.deployment.exceptions.ComponentConfigurationValidationException;
import com.aws.greengrass.deployment.exceptions.DeviceConfigurationException;
import com.aws.greengrass.deployment.exceptions.RetryableServerErrorException;
import com.aws.greengrass.logging.api.Logger;
Expand All @@ -16,6 +18,7 @@
import lombok.Getter;
import lombok.Setter;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.greengrassv2data.GreengrassV2DataClient;
import software.amazon.awssdk.services.greengrassv2data.model.GreengrassV2DataException;
import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceRequest;
import software.amazon.awssdk.services.greengrassv2data.model.ListThingGroupsForCoreDeviceResponse;
Expand Down Expand Up @@ -102,4 +105,40 @@ public Optional<Set<String>> listThingGroupsForDevice(int maxAttemptCount) throw
return Optional.of(thingGroupNames);
}, "get-thing-group-hierarchy", logger);
}

public void waitForReconnect(long timeoutMillis, GreengrassV2DataClient greengrassV2DataClient) throws ComponentConfigurationValidationException {
if (!deviceConfiguration.isDeviceConfiguredToTalkToCloud()) {
return;
}
Duration initialInterval = Duration.ofMillis(timeoutMillis / 8);
Duration maxRetryInterval = Duration.ofMillis(timeoutMillis / 4);

try {
RetryUtils.runWithRetry(clientExceptionRetryConfig.toBuilder()
.maxAttempt(3)
.initialRetryInterval(initialInterval)
.maxRetryInterval(maxRetryInterval)
.build(),
() -> {
ListThingGroupsForCoreDeviceRequest request = ListThingGroupsForCoreDeviceRequest.builder()
.coreDeviceThingName(Coerce.toString(deviceConfiguration.getThingName()))
.build();

ListThingGroupsForCoreDeviceResponse response;
try {
response = greengrassV2DataClient.listThingGroupsForCoreDevice(request);
} catch (GreengrassV2DataException e) {
if (RetryUtils.retryErrorCodes(e.statusCode())) {
throw new RetryableServerErrorException("Failed with retryable error " + e.statusCode()
+ " when calling listThingGroupsForCoreDevice", e);
}
throw e;
}
return response;
}, "get-thing-group-hierarchy", logger);
} catch (Exception e) {
throw new ComponentConfigurationValidationException("HTTP client failed to reconnect with new configuration: " + e,
DeploymentErrorCode.FAILED_TO_RECONNECT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public enum DeploymentErrorCode {
UNSUPPORTED_REGION(DeploymentErrorType.REQUEST_ERROR),
IOT_CRED_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR),
IOT_DATA_ENDPOINT_FORMAT_NOT_VALID(DeploymentErrorType.REQUEST_ERROR),
FAILED_TO_RECONNECT(DeploymentErrorType.REQUEST_ERROR),

/* Docker issues */
DOCKER_ERROR(DeploymentErrorType.DEPENDENCY_ERROR),
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/com/aws/greengrass/mqttclient/MqttClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,18 @@
@SuppressWarnings({"PMD.AvoidDuplicateLiterals"})
public class MqttClient implements Closeable {
private static final Logger logger = LogManager.getLogger(MqttClient.class);
static final String MQTT_KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeoutMs";
static final int DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT = (int) Duration.ofSeconds(60).toMillis();
static final String MQTT_PING_TIMEOUT_KEY = "pingTimeoutMs";
private static final int DEFAULT_MQTT_PING_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
public static final String MQTT_KEEP_ALIVE_TIMEOUT_KEY = "keepAliveTimeoutMs";
public static final int DEFAULT_MQTT_KEEP_ALIVE_TIMEOUT = (int) Duration.ofSeconds(60).toMillis();
public static final String MQTT_PING_TIMEOUT_KEY = "pingTimeoutMs";
public static final int DEFAULT_MQTT_PING_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
private static final String MQTT_THREAD_POOL_SIZE_KEY = "threadPoolSize";
public static final int DEFAULT_MQTT_PORT = 8883;
public static final String MQTT_PORT_KEY = "port";
private static final String MQTT_SOCKET_TIMEOUT_KEY = "socketTimeoutMs";
public static final String MQTT_SOCKET_TIMEOUT_KEY = "socketTimeoutMs";
// Default taken from AWS SDK
private static final int DEFAULT_MQTT_SOCKET_TIMEOUT = (int) Duration.ofSeconds(3).toMillis();
static final String MQTT_OPERATION_TIMEOUT_KEY = "operationTimeoutMs";
static final int DEFAULT_MQTT_OPERATION_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
public static final int DEFAULT_MQTT_SOCKET_TIMEOUT = (int) Duration.ofSeconds(3).toMillis();
public static final String MQTT_OPERATION_TIMEOUT_KEY = "operationTimeoutMs";
public static final int DEFAULT_MQTT_OPERATION_TIMEOUT = (int) Duration.ofSeconds(30).toMillis();
static final int DEFAULT_MQTT_CLOSE_TIMEOUT = (int) Duration.ofSeconds(2).toMillis();
static final String MQTT_MAX_IN_FLIGHT_PUBLISHES_KEY = "maxInFlightPublishes";
static final int DEFAULT_MAX_IN_FLIGHT_PUBLISHES = 5;
Expand Down Expand Up @@ -390,7 +390,7 @@ public MqttClient(DeviceConfiguration deviceConfiguration, Spool spool, boolean
validateAndSetMqttPublishConfiguration();
}

private TlsContextOptions getTlsContextOptions(String rootCaPath) {
public static TlsContextOptions getTlsContextOptions(String rootCaPath) {
return Utils.isNotEmpty(rootCaPath)
? TlsContextOptions.createDefaultClient().withCertificateAuthorityFromPath(null, rootCaPath)
: TlsContextOptions.createDefaultClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.mqttclient.MqttClient;
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -90,6 +91,10 @@ class DeploymentConfigMergerTest {
@Mock
private DynamicComponentConfigurationValidator validator;
@Mock
private MqttClient mqttClient;
@Mock
private ThingGroupHelper thingGroupHelper;
@Mock
private Context context;

@BeforeEach
Expand Down Expand Up @@ -307,7 +312,7 @@ void GIVEN_deployment_WHEN_check_safety_selected_THEN_check_safety_before_update
when(deploymentActivatorFactory.getDeploymentActivator(any())).thenReturn(deploymentActivator);
when(context.get(DeploymentActivatorFactory.class)).thenReturn(deploymentActivatorFactory);

DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);

DeploymentDocument doc = new DeploymentDocument();
doc.setConfigurationArn("NoSafetyCheckDeploy");
Expand Down Expand Up @@ -345,7 +350,7 @@ void GIVEN_deployment_WHEN_task_cancelled_THEN_update_is_cancelled() throws Thro
});

// GIVEN
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
DeploymentDocument doc = mock(DeploymentDocument.class);
when(doc.getDeploymentId()).thenReturn("DeploymentId");
when(doc.getComponentUpdatePolicy()).thenReturn(
Expand Down Expand Up @@ -381,7 +386,7 @@ void GIVEN_deployment_WHEN_task_not_cancelled_THEN_update_is_continued() throws
when(context.get(DefaultActivator.class)).thenReturn(defaultActivator);

// GIVEN
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
DeploymentDocument doc = mock(DeploymentDocument.class);
when(doc.getDeploymentId()).thenReturn("DeploymentId");
when(doc.getComponentUpdatePolicy()).thenReturn(
Expand Down Expand Up @@ -437,7 +442,7 @@ void GIVEN_deployment_activate_WHEN_deployment_has_new_config_THEN_new_config_is
newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3);
newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2);
// GIVEN
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
DeploymentDocument doc = mock(DeploymentDocument.class);
when(doc.getDeploymentId()).thenReturn("DeploymentId");
when(doc.getComponentUpdatePolicy()).thenReturn(
Expand Down Expand Up @@ -498,7 +503,7 @@ void GIVEN_deployment_activate_WHEN_deployment_has_some_new_config_THEN_old_conf
newConfig2.put(DEFAULT_NUCLEUS_COMPONENT_NAME, newConfig3);
newConfig.put(SERVICES_NAMESPACE_TOPIC, newConfig2);
// GIVEN
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator);
DeploymentConfigMerger merger = new DeploymentConfigMerger(kernel, deviceConfiguration, validator, mqttClient, thingGroupHelper);
DeploymentDocument doc = mock(DeploymentDocument.class);
when(doc.getDeploymentId()).thenReturn("DeploymentId");
when(doc.getComponentUpdatePolicy()).thenReturn(
Expand Down

0 comments on commit 8f9f232

Please sign in to comment.