diff --git a/.github/workflows/uat.yaml b/.github/workflows/uat.yaml
new file mode 100644
index 0000000..5f944e8
--- /dev/null
+++ b/.github/workflows/uat.yaml
@@ -0,0 +1,32 @@
+name: OTF UATS
+
+on:
+ pull_request:
+ branches: '*'
+
+env:
+ AWS_REGION : "us-west-2"
+ CODE_BUILD_PROJECT_LINUX: "DiskSpoolerUatCodeBuildLinux"
+ AWS_ROLE_TO_ASSUME: "arn:aws:iam::686385081908:role/aws-greengrass-disk-spooler-codebuild-uat-role-amazonlinux"
+
+jobs:
+ uat-linux:
+ permissions:
+ id-token: write
+ contents: read
+ runs-on: ${{ matrix.os }}
+ strategy:
+ matrix:
+ os: [ ubuntu-latest ]
+ steps:
+ - name: configure aws credentials
+ uses: aws-actions/configure-aws-credentials@v1
+ with:
+ role-to-assume: ${{ env.AWS_ROLE_TO_ASSUME }}
+ role-session-name: nucleusCI
+ aws-region: ${{ env.AWS_REGION }}
+ - name: Run UAT on linux
+ uses: aws-actions/aws-codebuild-run-build@v1
+ with:
+ project-name: ${{ env.CODE_BUILD_PROJECT_LINUX }}
+ buildspec-override: uat/codebuild/uat_linux_buildspec.yaml
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..5cc9a89
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+# UATs
+uat-results
+greengrass-nucleus-latest.zip
+
+# GDK
+greengrass-build
diff --git a/pom.xml b/pom.xml
index 8cb064f..f1c289a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,6 +94,7 @@
*
codestyle/**
+ uat-results/**
src/*/resources/**
diff --git a/run-uats.sh b/run-uats.sh
new file mode 100755
index 0000000..c2c43ec
--- /dev/null
+++ b/run-uats.sh
@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+#
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+set -e
+curl -s https://d2s8p88vqu9w66.cloudfront.net/releases/greengrass-nucleus-latest.zip > greengrass-nucleus-latest.zip
+mvn -U -ntp clean package -DskipTests
+mvn -U -ntp clean verify -f uat/pom.xml
+sudo java -Dggc.archive=greengrass-nucleus-latest.zip -Dtest.log.path=uat-results -Dtags=DiskSpooler -jar uat/testing-features/target/greengrass-disk-spooler-testing-features.jar
diff --git a/uat/README.md b/uat/README.md
new file mode 100644
index 0000000..286eec0
--- /dev/null
+++ b/uat/README.md
@@ -0,0 +1,49 @@
+## Disk Spooler User Acceptance Tests
+
+User Acceptance Tests for Disk Spooler run using `aws-greengrass-testing-standalone` as a library. They execute E2E
+tests which will spin up an instance of Greengrass on your device and execute different sets of tests, by installing
+the `aws.greengrass.DiskSpooler` component.
+
+## Running UATs locally
+
+Ensure credentials are available by setting them in environment variables. In unix based systems:
+
+```bash
+export AWS_ACCESS_KEY_ID=
+export AWS_SECRET_ACCESS_KEY=
+```
+
+on Windows Powershell
+
+```bash
+$Env:AWS_ACCESS_KEY_ID=
+$Env:AWS_SECRET_ACCESS_KEY=
+```
+
+For UATs to run you will need to package your entire application along with `aws-greengrass-testing-standalone` into
+an uber jar. To do run (from the root of the project)
+
+```
+mvn -U -ntp clean verify -f uat/pom.xml
+```
+
+Note: Everytime you make changes to the codebase you will have to rebuild the uber jar for those changes to be present
+on the final artifact.
+
+Finally, download the zip containing the latest version of the Nucleus, which will be used to provision Greengrass for
+the UATs.
+
+```bash
+curl -s https://d2s8p88vqu9w66.cloudfront.net/releases/greengrass-nucleus-latest.zip > greengrass-nucleus-latest.zip
+```
+
+Execute the UATs by running the following command from the root of the project.
+
+```
+sudo java -Dggc.archive= -Dtest.log.path= -Dtags=DiskSpooler -jar uat/testing-features/target/greengrass-disk-spooler-testing-features.jar
+```
+
+Command arguments:
+
+Dggc.archive - path to the nucleus zip that was downloaded
+Dtest.log.path - path where you would like the test results to be stored
diff --git a/uat/codebuild/uat_linux_buildspec.yaml b/uat/codebuild/uat_linux_buildspec.yaml
new file mode 100644
index 0000000..d3bbc82
--- /dev/null
+++ b/uat/codebuild/uat_linux_buildspec.yaml
@@ -0,0 +1,29 @@
+#
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+
+version: 0.2
+phases:
+ install:
+ runtime-versions:
+ java: corretto11
+ build:
+ commands:
+ - curl -s https://d2s8p88vqu9w66.cloudfront.net/releases/greengrass-nucleus-latest.zip > /tmp/greengrass-nucleus-latest.zip
+ - mvn -U -ntp verify -DskipTests=true
+ - mvn -U -ntp clean verify -f uat/pom.xml
+ - java -Dggc.archive=/tmp/greengrass-nucleus-latest.zip
+ -Dtags='DiskSpooler' -Dggc.install.root=$CODEBUILD_SRC_DIR -Dggc.log.level=INFO -Daws.region=$AWS_REGION
+ -jar uat/testing-features/target/greengrass-disk-spooler-testing-features.jar
+
+artifacts:
+ files:
+ - 'testResults/**/*'
+ name: 'DiskSpoolerUatLinuxLogs.zip'
+
+reports:
+ uat-reports:
+ files:
+ - "TEST-greengrass-results.xml"
+ file-format: "JUNITXML"
diff --git a/uat/custom-components/pom.xml b/uat/custom-components/pom.xml
new file mode 100644
index 0000000..042c7dc
--- /dev/null
+++ b/uat/custom-components/pom.xml
@@ -0,0 +1,74 @@
+
+
+
+
+ 4.0.0
+ custom-components
+ com.aws.greengrass
+ 1.0-SNAPSHOT
+
+ 8
+ 8
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ package
+
+ shade
+
+
+ custom-components
+
+
+ com.aws.greengrass.Main
+
+
+
+
+
+
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.12.7.1
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.30
+ compile
+
+
+ software.amazon.awssdk.iotdevicesdk
+ aws-iot-device-sdk
+ 1.17.5
+
+
+ org.slf4j
+ slf4j-simple
+ 1.7.30
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.13
+ compile
+
+
+
diff --git a/uat/custom-components/src/main/java/com/aws/greengrass/Main.java b/uat/custom-components/src/main/java/com/aws/greengrass/Main.java
new file mode 100644
index 0000000..e2cf405
--- /dev/null
+++ b/uat/custom-components/src/main/java/com/aws/greengrass/Main.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass;
+
+import java.util.function.Consumer;
+
+public final class Main {
+ private Main(){}
+
+ @SuppressWarnings({"unchecked", "deprecation"})
+ public static void main(String[] args) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ ((Consumer) Class.forName("com.aws.greengrass.artifacts." + System.getProperty("componentName"))
+ .newInstance()).accept(args);
+ }
+}
diff --git a/uat/custom-components/src/main/java/com/aws/greengrass/artifacts/IotMqttPublisher.java b/uat/custom-components/src/main/java/com/aws/greengrass/artifacts/IotMqttPublisher.java
new file mode 100644
index 0000000..fba8730
--- /dev/null
+++ b/uat/custom-components/src/main/java/com/aws/greengrass/artifacts/IotMqttPublisher.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.artifacts;
+
+import com.aws.greengrass.utils.Client;
+import com.aws.greengrass.utils.IPCTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPC;
+import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
+import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
+import software.amazon.awssdk.aws.greengrass.model.QOS;
+import software.amazon.awssdk.aws.greengrass.model.ReportedLifecycleState;
+import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+public class IotMqttPublisher implements Consumer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IotMqttPublisher.class);
+ private static final String SPOOL_SIZE_ERROR = "Message is larger than the size of message spool.";
+ private static final String SPOOL_FULL_ERROR = "Message spool is full. Message could not be added.";
+
+ @Override
+ public void accept(String[] args) {
+ try (Client assertionClient = new Client();
+ GreengrassCoreIPCClientV2 eventStreamRpcConnection = IPCTestUtils.getGreengrassClient()) {
+ GreengrassCoreIPC greengrassCoreIPCClient = eventStreamRpcConnection.getClient();
+
+ String topic = System.getenv("topic");
+ String payload = System.getenv("payload");
+ QOS qos = IPCTestUtils.getQOSFromValue(Integer.parseInt(System.getenv("qos")));
+
+ PublishToIoTCoreRequest request = new PublishToIoTCoreRequest();
+ request.setTopicName(topic);
+ request.setPayload(payload.getBytes(StandardCharsets.UTF_8));
+ request.setQos(qos);
+
+ try {
+ greengrassCoreIPCClient.publishToIoTCore(request, Optional.empty()).getResponse().get();
+ assertionClient.sendAssertion(true, "Successfully published to IoT topic " + topic, "");
+ } catch (ExecutionException e) {
+ LOGGER.error("Error occurred while publishing to IoT topic", e);
+ try {
+ String errorMessage = e.getCause().getMessage();
+ if (errorMessage.contains(SPOOL_SIZE_ERROR) || errorMessage.contains(SPOOL_FULL_ERROR)) {
+ assertionClient.sendAssertion(true, "SPOOL_FULL_ERROR", "Spooler is full");
+ return;
+ }
+ if (e.getCause() instanceof UnauthorizedError) {
+ assertionClient.sendAssertion(true, e.getCause().getMessage(), "Unauthorized error while publishing to IoT topic " + topic);
+ return;
+ }
+ } catch (IOException e2) {
+ LOGGER.error("Failed to send assertion", e2);
+ }
+ IPCTestUtils.reportState(greengrassCoreIPCClient, ReportedLifecycleState.ERRORED);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ System.exit(1);
+ } catch (Exception e) {
+ LOGGER.error("Service errored", e);
+ System.exit(1);
+ }
+ }
+}
diff --git a/uat/custom-components/src/main/java/com/aws/greengrass/utils/Client.java b/uat/custom-components/src/main/java/com/aws/greengrass/utils/Client.java
new file mode 100644
index 0000000..5658190
--- /dev/null
+++ b/uat/custom-components/src/main/java/com/aws/greengrass/utils/Client.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.utils;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class Client implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
+
+ private final CloseableHttpClient httpClient;
+
+ public Client() {
+ httpClient = HttpClients.custom()
+ .setRetryHandler(new DefaultHttpRequestRetryHandler(5, false)).build();
+ }
+
+ @Override
+ public void close() {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ LOGGER.warn("unable to close http client", e);
+ }
+ }
+ }
+
+ public void sendAssertion(boolean success, String context, String message) throws IOException {
+ int defaultPort = (int) Double.parseDouble(System.getProperty("serverPort"));
+ sendAssertionWithCustomizedPort(success, context, message, defaultPort);
+ }
+
+ public void sendAssertionWithCustomizedPort(boolean success, String context, String message, int port)
+ throws IOException {
+ HttpPost httpPost = new HttpPost(
+ "http://localhost:" + port + "/assert");
+ httpPost.setEntity(new ByteArrayEntity(
+ (String.format("{\"success\": %s, \"context\": \"%s\", \"message\": \"%s\"}", success, context,
+ message)).getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON));
+ try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
+ response.getStatusLine().getStatusCode();
+ }
+ }
+}
diff --git a/uat/custom-components/src/main/java/com/aws/greengrass/utils/IPCTestUtils.java b/uat/custom-components/src/main/java/com/aws/greengrass/utils/IPCTestUtils.java
new file mode 100644
index 0000000..0a76da0
--- /dev/null
+++ b/uat/custom-components/src/main/java/com/aws/greengrass/utils/IPCTestUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.utils;
+
+import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPC;
+import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
+import software.amazon.awssdk.aws.greengrass.model.QOS;
+import software.amazon.awssdk.aws.greengrass.model.ReportedLifecycleState;
+import software.amazon.awssdk.aws.greengrass.model.UpdateStateRequest;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+public final class IPCTestUtils {
+ private IPCTestUtils() {
+ }
+
+ public static GreengrassCoreIPCClientV2 getGreengrassClient() throws InterruptedException, IOException {
+ return GreengrassCoreIPCClientV2.builder().build();
+ }
+
+ public static QOS getQOSFromValue(int qos) {
+ if (qos == 1) {
+ return QOS.AT_LEAST_ONCE;
+ } else if (qos == 0) {
+ return QOS.AT_MOST_ONCE;
+ }
+ return QOS.AT_LEAST_ONCE; //default value
+ }
+
+ public static void reportState(GreengrassCoreIPC greengrassCoreIPCClient, ReportedLifecycleState state)
+ throws ExecutionException, InterruptedException {
+ UpdateStateRequest updateStateRequest = new UpdateStateRequest();
+ updateStateRequest.setState(state);
+ greengrassCoreIPCClient.updateState(updateStateRequest, Optional.empty()).getResponse().get();
+ }
+}
+
diff --git a/uat/custom-components/src/main/resources/recipes/IotMqttPublisher.yaml b/uat/custom-components/src/main/resources/recipes/IotMqttPublisher.yaml
new file mode 100644
index 0000000..b0ddbfb
--- /dev/null
+++ b/uat/custom-components/src/main/resources/recipes/IotMqttPublisher.yaml
@@ -0,0 +1,21 @@
+#
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+
+---
+RecipeFormatVersion: '2020-01-25'
+ComponentName: IotMqttPublisher
+ComponentVersion: '0.0.0'
+ComponentDescription: Publishes MQTT messages to IoT Core via Nucleus' spooler
+ComponentPublisher: Amazon
+Manifests:
+ - Artifacts:
+ - URI: classpath:/local-store/artifacts/custom-components.jar
+ Lifecycle:
+ Setenv:
+ topic: "{configuration:/topic}"
+ payload: "{configuration:/payload}"
+ qos: "{configuration:/qos}"
+ Run: >-
+ java -Dlog.level=INFO -DserverPort={configuration:/assertionServerPort} -DcomponentName="IotMqttPublisher" -jar {artifacts:path}/custom-components.jar
diff --git a/uat/pom.xml b/uat/pom.xml
new file mode 100644
index 0000000..e3dbe19
--- /dev/null
+++ b/uat/pom.xml
@@ -0,0 +1,27 @@
+
+
+
+ 4.0.0
+ com.aws.greengrass
+ disk-spooler-uat
+ 1.0-SNAPSHOT
+ pom
+
+
+ custom-components
+
+
+ testing-features
+
+
+
+ 8
+ 8
+
+
diff --git a/uat/testing-features/pom.xml b/uat/testing-features/pom.xml
new file mode 100644
index 0000000..ff7e35c
--- /dev/null
+++ b/uat/testing-features/pom.xml
@@ -0,0 +1,219 @@
+
+
+
+
+ 4.0.0
+ com.aws.greengrass
+ disk-spooler-testing-features
+ 1.0-SNAPSHOT
+
+ 1.8
+ 1.8
+ 2.20.157
+ false
+
+
+
+ greengrass-common
+ greengrass common
+
+ https://d2jrmugq4soldf.cloudfront.net/snapshots
+
+
+
+
+
+ software.amazon.awssdk
+ bom
+ ${aws.sdk.version}
+ pom
+ import
+
+
+
+
+
+ com.aws.greengrass
+ aws-greengrass-testing-standalone
+ 1.2.0-SNAPSHOT
+ compile
+
+
+ org.projectlombok
+ lombok
+ 1.18.26
+ compile
+
+
+ com.google.auto.service
+ auto-service
+ 1.0.1
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.8.1
+ compile
+
+
+ software.amazon.awssdk.iotdevicesdk
+ aws-iot-device-sdk
+ 1.17.5
+
+
+ software.amazon.awssdk
+ iotdataplane
+ compile
+
+
+
+ org.immutables
+ value
+ 2.9.3
+ provided
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ 3.0.0
+
+
+ copy-artifact-to-classpath
+ process-classes
+
+ run
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-pmd-plugin
+ 3.13.0
+
+
+ 0
+ true
+
+ ../../codestyle/pmd-eg-ruleset.xml
+ ../../codestyle/pmd-eg-tests-ruleset.xml
+
+ true
+ ${skipTests}
+
+
+
+ test
+
+ check
+
+
+
+
+
+ maven-checkstyle-plugin
+ 3.1.0
+
+
+ com.puppycrawl.tools
+ checkstyle
+ 8.29
+
+
+
+ true
+ ../../codestyle/checkstyle.xml
+ warning
+ 0
+ ${skipTests}
+
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+
+ maven-resources-plugin
+ 2.6
+
+
+ copy-local-store-recipe-resources
+ process-classes
+
+ copy-resources
+
+
+ ${basedir}/target/classes/local-store/recipes
+
+
+ ${basedir}/../../uat/custom-components/target/classes/recipes
+
+
+
+
+
+ copy-local-store-artifact-resources
+ process-classes
+
+ copy-resources
+
+
+ ${basedir}/target/classes/local-store/artifacts/
+
+
+ ${basedir}/../../uat/custom-components/target/
+
+ custom-components.jar
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.2
+
+
+ package
+
+ shade
+
+
+ greengrass-disk-spooler-testing-features
+
+
+
+
+ com.aws.greengrass.testing.launcher.TestLauncher
+
+
+
+
+
+
+
+
+
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/IotCoreClient.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/IotCoreClient.java
new file mode 100644
index 0000000..c577f50
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/IotCoreClient.java
@@ -0,0 +1,298 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.iot;
+
+import com.aws.greengrass.util.CoreDevice;
+import com.google.inject.Inject;
+import lombok.extern.log4j.Log4j2;
+import software.amazon.awssdk.crt.CRT;
+import software.amazon.awssdk.crt.CrtRuntimeException;
+import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
+import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions;
+import software.amazon.awssdk.crt.mqtt5.OnAttemptingConnectReturn;
+import software.amazon.awssdk.crt.mqtt5.OnConnectionFailureReturn;
+import software.amazon.awssdk.crt.mqtt5.OnConnectionSuccessReturn;
+import software.amazon.awssdk.crt.mqtt5.OnDisconnectionReturn;
+import software.amazon.awssdk.crt.mqtt5.OnStoppedReturn;
+import software.amazon.awssdk.crt.mqtt5.PublishResult;
+import software.amazon.awssdk.crt.mqtt5.PublishReturn;
+import software.amazon.awssdk.crt.mqtt5.packets.ConnAckPacket;
+import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;
+import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket;
+import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket;
+import software.amazon.awssdk.crt.mqtt5.packets.SubAckPacket;
+import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket;
+import software.amazon.awssdk.iot.AwsIotMqtt5ClientBuilder;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+@Log4j2
+@SuppressWarnings("MissingJavadocMethod")
+public class IotCoreClient {
+ private final Set subscriptions = Collections.synchronizedSet(new HashSet<>());
+ private final PublishHandler publishHandler = new PublishHandler();
+ private final LifecycleHandler lifecycleHandler = new LifecycleHandler();
+ private final CountDownLatch started = new CountDownLatch(1);
+ private Mqtt5Client client;
+ private Future> resubscribeTask;
+
+ private final CoreDevice coreDevice;
+ private final ExecutorService executorService = Executors.newFixedThreadPool(1);
+
+ @Inject
+ public IotCoreClient(CoreDevice coreDevice) {
+ this.coreDevice = coreDevice;
+ }
+
+ public void publish(PublishRequest request)
+ throws ExecutionException, InterruptedException, TimeoutException, IOException {
+ start();
+ PublishResult result = client.publish(asPublishPacket(request)).get();
+ log.trace("publish result: topic={}; reason={}; reasonCode={}; message={};",
+ request.getTopic(),
+ result.getResultPubAck().getReasonString(),
+ result.getResultPubAck().getReasonCode(),
+ request.getMessage().getMessage());
+ }
+
+ public void resubscribeAsync() {
+ if (resubscribeTask != null) {
+ resubscribeTask.cancel(true);
+ }
+ resubscribeTask = executorService.submit(() -> {
+ Set subscribeRequests = getSubscriptions().stream()
+ .map(Subscription::getRequest)
+ .collect(Collectors.toSet());
+ for (SubscribeRequest request : subscribeRequests) {
+ try {
+ subscribe(request);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (ExecutionException | TimeoutException | IOException e) {
+ log.error("unable to resubscribe to topic {}", request.getTopic(), e);
+ }
+ }
+ });
+ }
+
+ public void subscribe(SubscribeRequest request)
+ throws ExecutionException, InterruptedException, TimeoutException, IOException {
+ start();
+
+ SubscribePacket subscribePacket = new SubscribePacket.SubscribePacketBuilder()
+ .withSubscription(request.getTopic(), request.getQos())
+ .build();
+ Subscription subscription = Subscription.builder()
+ .topic(request.getTopic())
+ .callback(request.getCallback())
+ .request(request)
+ .build();
+ boolean newSubscription = subscriptions.add(subscription);
+
+ try {
+ SubAckPacket subAckPacket = client.subscribe(subscribePacket).get();
+ log.debug("subscription result: topic={}; reason={}; reasonCode={};",
+ request.getTopic(),
+ subAckPacket.getReasonString(),
+ subAckPacket.getReasonCodes().get(0));
+ if (subAckPacket.getReasonCodes().get(0).getValue() > 2) {
+ if (newSubscription) {
+ // only remove if not resubscribing
+ subscriptions.remove(subscription);
+ }
+ request.getErrorCallback().accept(subAckPacket, null);
+ throw new RuntimeException("Unable to subscribe reasonCode="
+ + subAckPacket.getReasonCodes().get(0));
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ if (newSubscription) {
+ // only remove if not resubscribing
+ subscriptions.remove(subscription);
+ }
+ request.getErrorCallback().accept(null, e);
+ throw e;
+ }
+ }
+
+ private synchronized void start() throws InterruptedException, TimeoutException {
+ if (client == null) {
+ client = createMqttClient();
+ client.start();
+ log.info("Waiting for client to connect...");
+ if (!started.await(10L, TimeUnit.SECONDS)) {
+ throw new TimeoutException("Timed-out waiting for client connect on start");
+ }
+ }
+ }
+
+ public synchronized void close() {
+ if (client != null) {
+ try {
+ client.stop(new DisconnectPacket.DisconnectPacketBuilder()
+ .withReasonCode(DisconnectPacket.DisconnectReasonCode.NORMAL_DISCONNECTION)
+ .build());
+ } catch (CrtRuntimeException e) {
+ log.error("Unable to stop mqtt client", e);
+ client.close();
+ }
+ }
+ executorService.shutdownNow();
+ }
+
+ private Set getSubscriptions() {
+ Set subscriptions;
+ synchronized (this.subscriptions) {
+ subscriptions = new HashSet<>(this.subscriptions);
+ }
+ return subscriptions;
+ }
+
+ private static PublishPacket asPublishPacket(PublishRequest request) {
+ return new PublishPacket.PublishPacketBuilder()
+ .withQOS(request.getQos())
+ .withTopic(request.getTopic())
+ .withResponseTopic(request.getMessage().getResponseTopic())
+ .withPayload(request.getMessage().getPayload())
+ .withPayloadFormat(request.getMessage().getPayloadFormat() == null ? null
+ : PublishPacket.PayloadFormatIndicator.getEnumValueFromInteger(
+ request.getMessage().getPayloadFormat().getValue()))
+ .withRetain(request.getMessage().isRetain())
+ .withContentType(request.getMessage().getContentType())
+ .withCorrelationData(request.getMessage().getCorrelationData())
+ .withMessageExpiryIntervalSeconds(request.getMessage().getMessageExpiryIntervalSeconds())
+ .withUserProperties(request.getMessage().getUserProperties() == null ? null
+ : request.getMessage().getUserProperties().stream()
+ .map(p -> new software.amazon.awssdk.crt.mqtt5.packets.UserProperty(p.getKey(), p.getValue()))
+ .collect(Collectors.toList()))
+ .build();
+ }
+
+ private Mqtt5Client createMqttClient() {
+ try (AwsIotMqtt5ClientBuilder builder = AwsIotMqtt5ClientBuilder.newDirectMqttBuilderWithMtlsFromMemory(
+ coreDevice.getIotCoreDataEndpoint(),
+ coreDevice.getSpec().resource().certificate().certificatePem(),
+ coreDevice.getSpec().resource().certificate().keyPair().privateKey())) {
+ return builder
+ .withCertificateAuthority(coreDevice.getRootCA())
+ .withLifeCycleEvents(lifecycleHandler)
+ .withPublishEvents(publishHandler)
+ .withSessionBehavior(Mqtt5ClientOptions.ClientSessionBehavior.REJOIN_POST_SUCCESS)
+ .withOfflineQueueBehavior(Mqtt5ClientOptions.ClientOfflineQueueBehavior.FAIL_ALL_ON_DISCONNECT)
+ .withConnectProperties(new ConnectPacket.ConnectPacketBuilder()
+ .withRequestProblemInformation(true)
+ .withClientId(getClass().getSimpleName() + UUID.randomUUID())
+ .withSessionExpiryIntervalSeconds(10_080L) // overriding default value to extend session
+ // must be between 30 and 1200 seconds
+ .withKeepAliveIntervalSeconds(30L))
+ .withPingTimeoutMs(Duration.ofSeconds(3).toMillis())
+ .withMinReconnectDelayMs(Duration.ofSeconds(1).toMillis())
+ .withMaxReconnectDelayMs(Duration.ofSeconds(2).toMillis())
+ .build();
+ }
+ }
+
+ private class LifecycleHandler implements Mqtt5ClientOptions.LifecycleEvents {
+ @Override
+ public void onAttemptingConnect(Mqtt5Client mqtt5Client,
+ OnAttemptingConnectReturn onAttemptingConnectReturn) {
+ log.debug("Connect in progress");
+ }
+
+ @Override
+ public void onConnectionSuccess(Mqtt5Client mqtt5Client,
+ OnConnectionSuccessReturn onConnectionSuccessReturn) {
+ boolean sessionPresent = onConnectionSuccessReturn.getConnAckPacket().getSessionPresent();
+ log.info("Connected, sessionPresent={}", sessionPresent);
+ started.countDown();
+ if (!sessionPresent) {
+ // don't block crt thread
+ resubscribeAsync();
+ }
+ }
+
+ @Override
+ public void onConnectionFailure(Mqtt5Client mqtt5Client,
+ OnConnectionFailureReturn onConnectionFailureReturn) {
+ ConnAckPacket connAck = onConnectionFailureReturn.getConnAckPacket();
+ log.warn("Connect failed. errorCode={}; reasonCode={}; reason={};",
+ CRT.awsErrorString(onConnectionFailureReturn.getErrorCode()),
+ connAck == null ? null : connAck.getReasonCode(),
+ connAck == null ? null : connAck.getReasonString());
+ }
+
+ @Override
+ public void onDisconnection(Mqtt5Client mqtt5Client, OnDisconnectionReturn onDisconnectionReturn) {
+ DisconnectPacket discon = onDisconnectionReturn.getDisconnectPacket();
+ log.info("Disconnected. errorCode={}; reasonCode={}; reason={};",
+ CRT.awsErrorString(onDisconnectionReturn.getErrorCode()),
+ discon == null ? null : discon.getReasonCode(),
+ discon == null ? null : discon.getReasonString());
+ }
+
+ @Override
+ public void onStopped(Mqtt5Client mqtt5Client, OnStoppedReturn onStoppedReturn) {
+ log.debug("Stopped");
+ mqtt5Client.close();
+ }
+ }
+
+ private class PublishHandler implements Mqtt5ClientOptions.PublishEvents {
+
+ @Override
+ public void onMessageReceived(Mqtt5Client mqtt5Client, PublishReturn publishReturn) {
+ String topic = publishReturn.getPublishPacket().getTopic();
+ log.trace("Message received on topic {} with payload {}",
+ topic,
+ publishReturn.getPublishPacket().getPayload() == null
+ ? null
+ : new String(publishReturn.getPublishPacket().getPayload(), StandardCharsets.UTF_8));
+ Message message = asMessage(publishReturn);
+ forwardMessageToSubscribers(topic, message);
+ }
+
+ private Message asMessage(PublishReturn publishReturn) {
+ PublishPacket publishPacket = publishReturn.getPublishPacket();
+ return Message.builder()
+ .payload(publishPacket.getPayload())
+ .contentType(publishPacket.getContentType())
+ .correlationData(publishPacket.getCorrelationData())
+ .messageExpiryIntervalSeconds(publishPacket.getMessageExpiryIntervalSeconds())
+ .payloadFormat(publishPacket.getPayloadFormat() == null ? null
+ : PayloadFormatIndicator.fromIndicator(publishPacket.getPayloadFormat().getValue()))
+ .responseTopic(publishPacket.getResponseTopic())
+ .retain(publishPacket.getRetain())
+ .subscriptionIdentifiers(publishPacket.getSubscriptionIdentifiers())
+ .userProperties(publishPacket.getUserProperties() == null ? null
+ : publishPacket.getUserProperties().stream()
+ .map(p -> UserProperty.builder().key(p.key).value(p.value).build())
+ .collect(Collectors.toList()))
+ .build();
+ }
+
+ private void forwardMessageToSubscribers(String topic, Message message) {
+ getSubscriptions().stream()
+ .filter(s -> Objects.equals(s.getTopic(), topic))
+ .map(Subscription::getCallback)
+ .forEach(c -> c.accept(message));
+ }
+ }
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/Message.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/Message.java
new file mode 100644
index 0000000..a47f6e2
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/Message.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.iot;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Value;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+@Value
+@Builder
+@SuppressWarnings({"PMD.AvoidFieldNameMatchingTypeName", "checkstyle:VisibilityModifier", "MissingJavadocMethod"})
+public class Message {
+
+ @Getter(lazy = true)
+ String message = messageFromPayload();
+ byte[] payload;
+ String contentType;
+ Long topicAlias;
+ byte[] correlationData;
+ Long messageExpiryIntervalSeconds;
+ PayloadFormatIndicator payloadFormat;
+ String responseTopic;
+ boolean retain;
+ List subscriptionIdentifiers;
+ List userProperties;
+
+ String messageFromPayload() {
+ return payload == null ? null : new String(payload, StandardCharsets.UTF_8);
+ }
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/PayloadFormatIndicator.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/PayloadFormatIndicator.java
new file mode 100644
index 0000000..8ae1ad2
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/PayloadFormatIndicator.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.iot;
+
+import java.util.Arrays;
+
+@SuppressWarnings("MissingJavadocMethod")
+public enum PayloadFormatIndicator {
+ BYTES(0),
+ UTF8(1);
+
+ private final int indicator;
+
+ PayloadFormatIndicator(int value) {
+ this.indicator = value;
+ }
+
+ public int getValue() {
+ return this.indicator;
+ }
+
+ public static PayloadFormatIndicator fromIndicator(int indicator) {
+ return Arrays.stream(PayloadFormatIndicator.values())
+ .filter(i -> i.getValue() == indicator)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Unexpected payload format indicator " + indicator));
+ }
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/PublishRequest.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/PublishRequest.java
new file mode 100644
index 0000000..2904525
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/PublishRequest.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.iot;
+
+import lombok.Builder;
+import lombok.Value;
+import software.amazon.awssdk.crt.mqtt5.QOS;
+
+@Value
+@Builder
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class PublishRequest {
+ String topic;
+ Message message;
+ @Builder.Default
+ QOS qos = QOS.AT_LEAST_ONCE;
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/SubscribeRequest.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/SubscribeRequest.java
new file mode 100644
index 0000000..a2fe06d
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/SubscribeRequest.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.iot;
+
+import lombok.Builder;
+import lombok.Value;
+import software.amazon.awssdk.crt.mqtt5.QOS;
+import software.amazon.awssdk.crt.mqtt5.packets.SubAckPacket;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+@Value
+@Builder
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class SubscribeRequest {
+ String topic;
+ @Builder.Default
+ Consumer callback = m -> {
+ };
+ @Builder.Default
+ BiConsumer errorCallback = (p, e) -> {
+ };
+ @Builder.Default
+ QOS qos = QOS.AT_LEAST_ONCE;
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/Subscription.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/Subscription.java
new file mode 100644
index 0000000..6eef168
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/Subscription.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.iot;
+
+import lombok.Builder;
+import lombok.Value;
+
+import java.util.function.Consumer;
+
+@Value
+@Builder
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class Subscription {
+ String topic;
+ Consumer callback;
+ SubscribeRequest request;
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/UserProperty.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/UserProperty.java
new file mode 100644
index 0000000..30f70f0
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/UserProperty.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.iot;
+
+import lombok.Builder;
+import lombok.Value;
+
+@Value
+@Builder
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class UserProperty {
+ String key;
+ String value;
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/AssertionSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/AssertionSteps.java
new file mode 100644
index 0000000..0004dd0
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/AssertionSteps.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+
+package com.aws.greengrass.steps;
+
+import com.aws.greengrass.testing.features.WaitSteps;
+import com.aws.greengrass.testing.model.ScenarioContext;
+import com.aws.greengrass.testing.model.TestContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpServer;
+import com.sun.net.httpserver.spi.HttpServerProvider;
+import io.cucumber.guice.ScenarioScoped;
+import io.cucumber.java.After;
+import io.cucumber.java.en.Given;
+import io.cucumber.java.en.Then;
+import io.cucumber.java.en.When;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.log4j.Log4j2;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@Log4j2
+@ScenarioScoped
+@SuppressWarnings("MissingJavadocMethod")
+public class AssertionSteps implements Closeable {
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final Logger LOGGER = LogManager.getLogger(AssertionSteps.class);
+ @Getter
+ private final List assertionList = new CopyOnWriteArrayList<>();
+ private final ScenarioContext scenarioContext;
+ private final TestContext testContext;
+ private final WaitSteps waits;
+ private HttpServer server;
+ @Getter
+ private int port;
+
+ @Inject
+ @SuppressWarnings("MissingJavadocMethod")
+ public AssertionSteps(TestContext testContext,
+ ScenarioContext scenarioContext,
+ WaitSteps waits) {
+
+ this.testContext = testContext;
+ this.scenarioContext = scenarioContext;
+ this.waits = waits;
+ }
+
+ private void assertionHandler(HttpExchange httpExchange) {
+ try {
+ Assertion val = MAPPER.readValue(httpExchange.getRequestBody(), Assertion.class);
+ assertionList.add(val);
+ LOGGER.debug("Got assertion value {}", val);
+ httpExchange.sendResponseHeaders(200, 0);
+ httpExchange.getResponseBody().flush();
+ } catch (IOException e) {
+ LOGGER.error("Error decoding assertion", e);
+ } finally {
+ httpExchange.close();
+ }
+ }
+
+ @Given("I start an assertion server")
+ @SuppressWarnings("MissingJavadocMethod")
+ public void start() throws IOException {
+ if (server != null) {
+ throw new IllegalStateException("Server already exists");
+ }
+
+ server = HttpServerProvider.provider().createHttpServer(new InetSocketAddress("localhost", 0),
+ 0);
+ server.createContext("/assert", this::assertionHandler);
+ LOGGER.debug("Starting HTTP assertion server");
+ server.setExecutor(Executors.newCachedThreadPool(runnable -> {
+ // Daemon-ize to allow main thread to end
+ final Thread thread = Executors.defaultThreadFactory().newThread(runnable);
+ thread.setName(String.format("AssertionServer-%d-%s", server.getAddress().getPort(),
+ testContext.testId().id()));
+ thread.setDaemon(true);
+ return thread;
+ }));
+ server.start();
+ port = server.getAddress().getPort();
+ LOGGER.info("Started HTTP assertion server at port {}", port);
+ scenarioContext.put("assertionServerPort", Integer.toString(port));
+ }
+
+
+ @Then("I get {int} assertion(s) with context {string}")
+ public void assertionsWithContext(int assertionCount, String context) throws Throwable {
+ assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context), null,
+ 100, true);
+ }
+
+
+ @Then("I get at least {int} assertion(s) with context {string}")
+ public void atLeastNAssertionsWithContext(int assertionCount, String context) throws Throwable {
+ assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context), null,
+ 100, false);
+ }
+
+
+ @Then("I get at least {int} assertion(s) with context {string} within {long} seconds")
+ public void atLeastNAssertionsWithContext(int assertionCount, String context, long timeoutSeconds)
+ throws Throwable {
+ assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context),
+ null, timeoutSeconds, false);
+ }
+
+ @Then("I get {int} assertion(s) with context {string} within {long} seconds")
+ public void assertionsWithContextTimeout(int assertionCount, String context, long timeoutSeconds)
+ throws Throwable {
+ assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context),
+ null, timeoutSeconds, true);
+ }
+
+
+ @Then("I get {int} assertion(s) with context {string} and message {string} within {long} seconds")
+ public void assertionsWithContextTimeout(int assertionCount, String context, String message,
+ long timeoutSeconds) throws Throwable {
+
+ assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context),
+ message, timeoutSeconds, true);
+ }
+
+ @SuppressWarnings("PMD.AvoidCatchingThrowable")
+ private void assertionsWithContextTimeout(int assertionCount, String context, String message,
+ long timeoutSeconds, boolean failOnTooManyAssertions)
+ throws Throwable {
+ String exceptionMessage = (message == null)
+ ? String.format("did not receive %d assertions for '%s' in time (%ds)", assertionCount,
+ context, timeoutSeconds)
+ : String.format("did not receive %d assertions for '%s' with message '%s' in time (%ds)",
+ assertionCount, context, message, timeoutSeconds);
+
+ class Wrapper {
+ @Getter
+ @Setter
+ private T wrapped;
+ }
+
+ Wrapper wrapper = new Wrapper<>();
+ boolean finished = waits.untilTrue(() -> {
+ try {
+ return assertionsReceived(assertionCount, context, message, failOnTooManyAssertions, exceptionMessage);
+ } catch (Throwable e) {
+ wrapper.setWrapped(e);
+ return true;
+ }
+ }, (int) timeoutSeconds, TimeUnit.SECONDS);
+
+ if (!finished) {
+ throw new TimeoutException("Timeout: " + exceptionMessage);
+ }
+
+ Throwable e = wrapper.getWrapped();
+ if (e != null) {
+ throw e;
+ }
+ }
+
+ private boolean assertionsReceived(int assertionCount, String context, String message,
+ boolean failOnTooManyAssertions, String exceptionMessage) {
+ List assertions = assertionList.stream()
+ .filter(assertion -> assertion.getContext() != null)
+ .filter(assertion -> context.equalsIgnoreCase(assertion.getContext()))
+ .filter(assertion -> message == null || message.equalsIgnoreCase(assertion.getMessage()))
+ .collect(Collectors.toList());
+ for (Assertion assertion : assertions) {
+ if (!assertion.isSuccess()) {
+ fail(String.format("Assertion with message %s and context %s FAILED", assertion.getMessage(),
+ assertion.getContext()));
+ }
+ }
+ if (failOnTooManyAssertions && assertions.size() > assertionCount) {
+ fail(exceptionMessage);
+ }
+ return assertions.size() >= assertionCount;
+ }
+
+ @After
+ @Override
+ public void close() {
+ if (server != null) {
+ server.stop(0);
+ ((ExecutorService) server.getExecutor()).shutdownNow();
+ LOGGER.info("Shut down HTTP assertion server");
+ }
+ }
+
+ @When("I clear the assertions")
+ public void clearTheAssertions() {
+ assertionList.clear();
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ @ToString
+ static class Assertion {
+ private boolean success;
+ private String context;
+ private String message;
+ }
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/CommonSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/CommonSteps.java
new file mode 100644
index 0000000..afe6c09
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/CommonSteps.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.steps;
+
+import com.aws.greengrass.testing.model.ScenarioContext;
+import com.google.inject.Inject;
+import io.cucumber.guice.ScenarioScoped;
+import io.cucumber.java.en.When;
+
+import java.util.UUID;
+
+@ScenarioScoped
+public class CommonSteps {
+ private final ScenarioContext scenarioContext;
+
+ @Inject
+ public CommonSteps(ScenarioContext scenarioContext) {
+ this.scenarioContext = scenarioContext;
+ }
+
+ @When("I create a random name as {word}")
+ public void storeRandomNameInContext(String key) {
+ scenarioContext.put(key, randomName());
+ }
+
+ private static String randomName() {
+ return String.format("e2e-%d-%s", System.currentTimeMillis(), UUID.randomUUID());
+ }
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/IotSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/IotSteps.java
new file mode 100644
index 0000000..1695e00
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/IotSteps.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.steps;
+
+import com.aws.greengrass.iot.IotCoreClient;
+import com.aws.greengrass.iot.Message;
+import com.aws.greengrass.iot.SubscribeRequest;
+import com.aws.greengrass.testing.features.WaitSteps;
+import com.aws.greengrass.testing.model.ScenarioContext;
+import com.google.inject.Inject;
+import io.cucumber.guice.ScenarioScoped;
+import io.cucumber.java.After;
+import io.cucumber.java.en.Then;
+import io.cucumber.java.en.When;
+import lombok.extern.log4j.Log4j2;
+import software.amazon.awssdk.crt.mqtt5.QOS;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Log4j2
+@ScenarioScoped
+@SuppressWarnings("MissingJavadocMethod")
+public class IotSteps {
+ private final ScenarioContext scenarioContext;
+ private final WaitSteps waitSteps;
+ private final IotCoreClient iotCoreClient;
+ private final Map> messagesByTopic = Collections.synchronizedMap(new HashMap<>());
+
+ @Inject
+ public IotSteps(ScenarioContext scenarioContext,
+ WaitSteps waitSteps,
+ IotCoreClient iotCoreClient) {
+ this.scenarioContext = scenarioContext;
+ this.waitSteps = waitSteps;
+ this.iotCoreClient = iotCoreClient;
+ }
+
+ @After
+ public void close() {
+ iotCoreClient.close();
+ }
+
+ @When("I subscribe to cloud topics")
+ public void subscribeToCloudTopics(List topics)
+ throws IOException, ExecutionException, InterruptedException, TimeoutException {
+ for (String topic : topics) {
+ String resolvedTopic = scenarioContext.applyInline(topic);
+ iotCoreClient.subscribe(SubscribeRequest.builder()
+ .topic(resolvedTopic)
+ .qos(QOS.AT_LEAST_ONCE)
+ .callback(message -> {
+ messagesByTopic.compute(resolvedTopic, (s, messages) -> {
+ if (messages == null) {
+ messages = new CopyOnWriteArrayList<>();
+ }
+ messages.add(message);
+ return messages;
+ });
+ })
+ .build());
+ }
+ }
+
+ @Then("the cloud topic {word} receives the following messages within {int} seconds")
+ public void checkMessagesOnTopic(String topic, int timeoutSeconds, List messages)
+ throws InterruptedException {
+ String resolvedTopic = scenarioContext.applyInline(topic);
+ assertTrue(waitSteps.untilTrue(
+ () -> {
+ List receivedMessages = receivedMessagePayloadsByTopic(resolvedTopic);
+ return Objects.equals(messages, receivedMessages);
+ },
+ timeoutSeconds, TimeUnit.SECONDS));
+ }
+
+ private List receivedMessagePayloadsByTopic(String topic) {
+ List messages = messagesByTopic.get(topic);
+ if (messages == null) {
+ return Collections.emptyList();
+ }
+ return messages.stream().map(Message::getMessage).collect(Collectors.toList());
+ }
+}
diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/util/CoreDevice.java b/uat/testing-features/src/main/java/com/aws/greengrass/util/CoreDevice.java
new file mode 100644
index 0000000..82642b1
--- /dev/null
+++ b/uat/testing-features/src/main/java/com/aws/greengrass/util/CoreDevice.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.aws.greengrass.util;
+
+import com.aws.greengrass.testing.model.RegistrationContext;
+import com.aws.greengrass.testing.model.TestContext;
+import com.aws.greengrass.testing.resources.AWSResources;
+import com.aws.greengrass.testing.resources.iot.IotLifecycle;
+import com.aws.greengrass.testing.resources.iot.IotThingSpec;
+import com.google.inject.Inject;
+import lombok.Getter;
+
+/**
+ * Helper class that provides information related to the core device under test.
+ */
+@SuppressWarnings("MissingJavadocMethod")
+public class CoreDevice {
+
+ private final AWSResources awsResources;
+ private final TestContext testContext;
+ @Getter
+ private final String rootCA;
+ @Getter
+ private final String iotCoreDataEndpoint;
+
+ @Inject
+ public CoreDevice(TestContext testContext,
+ RegistrationContext registrationContext,
+ AWSResources awsResources) {
+ this.awsResources = awsResources;
+ this.testContext = testContext;
+ this.rootCA = registrationContext.rootCA();
+ this.iotCoreDataEndpoint = awsResources.lifecycle(IotLifecycle.class).dataEndpoint();
+ }
+
+ public IotThingSpec getSpec() {
+ return awsResources.trackingSpecs(IotThingSpec.class)
+ .filter(s -> s.thingName().equals(testContext.coreThingName()))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("no core thing found"));
+ }
+}
diff --git a/uat/testing-features/src/main/resources/greengrass/features/disk-spooler-1.feature b/uat/testing-features/src/main/resources/greengrass/features/disk-spooler-1.feature
new file mode 100644
index 0000000..f43dd10
--- /dev/null
+++ b/uat/testing-features/src/main/resources/greengrass/features/disk-spooler-1.feature
@@ -0,0 +1,66 @@
+@DiskSpooler
+Feature: DiskSpooler-1
+
+ As a customer, I can spool outbound MQTT messages so that they persist across Nucleus restarts.
+
+ Background:
+ Given my device is registered as a Thing
+ And my device is running Greengrass
+ And I start an assertion server
+
+ Scenario: DiskSpooler-1-T1: MQTT messages are published to IoT Core
+ Then I create a random name as cloud_topic
+ When I subscribe to cloud topics
+ | ${cloud_topic} |
+ Given I create a Greengrass deployment with components
+ | aws.greengrass.Cli | LATEST |
+ | aws.greengrass.DiskSpooler | classpath:/greengrass/recipes/recipe.yaml |
+ And I update my Greengrass deployment configuration, setting the component aws.greengrass.Nucleus configuration to:
+ """
+ {
+ "MERGE": {
+ "mqtt": {
+ "spooler": {
+ "storageType": "Disk"
+ }
+ },
+ "logging": {
+ "level": "DEBUG"
+ }
+ }
+ }
+ """
+ And I deploy the Greengrass deployment configuration
+ Then the Greengrass deployment is COMPLETED on the device after 3 minutes
+ Then I verify the aws.greengrass.DiskSpooler component is RUNNING using the greengrass-cli
+ Then I wait 20 seconds
+ When I install the component IotMqttPublisher from local store with configuration
+ """
+ {
+ "MERGE": {
+ "assertionServerPort": ${assertionServerPort},
+ "topic": "${cloud_topic}",
+ "payload": "Hello world",
+ "qos": "1",
+ "accessControl": {
+ "aws.greengrass.ipc.mqttproxy": {
+ "policyId1": {
+ "policyDescription": "access to publish to mqtt topics",
+ "operations": [
+ "aws.greengrass#PublishToIoTCore"
+ ],
+ "resources": [
+ "${cloud_topic}"
+ ]
+ }
+ }
+ }
+ }
+ }
+ """
+ And I get 1 assertion with context "Successfully published to IoT topic ${cloud_topic}"
+ Then the cloud topic ${cloud_topic} receives the following messages within 10 seconds
+ | Hello world |
+
+
+# TODO convert remaining UATs
diff --git a/uat/testing-features/src/main/resources/greengrass/recipes/recipe.yaml b/uat/testing-features/src/main/resources/greengrass/recipes/recipe.yaml
new file mode 100644
index 0000000..55f5343
--- /dev/null
+++ b/uat/testing-features/src/main/resources/greengrass/recipes/recipe.yaml
@@ -0,0 +1,18 @@
+#
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+
+---
+RecipeFormatVersion: '2020-01-25'
+ComponentName: aws.greengrass.DiskSpooler
+ComponentDescription: AWS Greengrass Disk Spooler
+ComponentPublisher: AWS
+ComponentVersion: '2.4.0'
+ComponentType: 'aws.greengrass.plugin'
+Manifests:
+ - Artifacts:
+ - URI: "classpath:/greengrass/artifacts/aws.greengrass.DiskSpooler.jar"
+ Permission:
+ Read: ALL
+ Execute: ALL