From 070cb9edbb931ef3a0958d141ae3a36531c71d2b Mon Sep 17 00:00:00 2001 From: Joseph Cosentino Date: Tue, 14 Nov 2023 10:38:09 -0800 Subject: [PATCH] feat: spooler uats --- .github/workflows/uat.yaml | 32 ++ .gitignore | 6 + pom.xml | 1 + run-uats.sh | 10 + uat/README.md | 49 +++ uat/codebuild/uat_linux_buildspec.yaml | 29 ++ uat/custom-components/pom.xml | 74 +++++ .../main/java/com/aws/greengrass/Main.java | 18 ++ .../artifacts/IotMqttPublisher.java | 73 +++++ .../java/com/aws/greengrass/utils/Client.java | 58 ++++ .../aws/greengrass/utils/IPCTestUtils.java | 42 +++ .../resources/recipes/IotMqttPublisher.yaml | 21 ++ uat/pom.xml | 27 ++ uat/testing-features/pom.xml | 219 +++++++++++++ .../com/aws/greengrass/iot/IotCoreClient.java | 293 ++++++++++++++++++ .../java/com/aws/greengrass/iot/Message.java | 36 +++ .../iot/PayloadFormatIndicator.java | 31 ++ .../aws/greengrass/iot/PublishRequest.java | 20 ++ .../aws/greengrass/iot/SubscribeRequest.java | 29 ++ .../com/aws/greengrass/iot/Subscription.java | 20 ++ .../com/aws/greengrass/iot/UserProperty.java | 17 + .../aws/greengrass/steps/AssertionSteps.java | 228 ++++++++++++++ .../com/aws/greengrass/steps/CommonSteps.java | 30 ++ .../com/aws/greengrass/steps/IotSteps.java | 95 ++++++ .../com/aws/greengrass/util/CoreDevice.java | 45 +++ .../features/disk-spooler-1.feature | 66 ++++ .../resources/greengrass/recipes/recipe.yaml | 18 ++ 27 files changed, 1587 insertions(+) create mode 100644 .github/workflows/uat.yaml create mode 100644 .gitignore create mode 100755 run-uats.sh create mode 100644 uat/README.md create mode 100644 uat/codebuild/uat_linux_buildspec.yaml create mode 100644 uat/custom-components/pom.xml create mode 100644 uat/custom-components/src/main/java/com/aws/greengrass/Main.java create mode 100644 uat/custom-components/src/main/java/com/aws/greengrass/artifacts/IotMqttPublisher.java create mode 100644 uat/custom-components/src/main/java/com/aws/greengrass/utils/Client.java create mode 100644 uat/custom-components/src/main/java/com/aws/greengrass/utils/IPCTestUtils.java create mode 100644 uat/custom-components/src/main/resources/recipes/IotMqttPublisher.yaml create mode 100644 uat/pom.xml create mode 100644 uat/testing-features/pom.xml create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/iot/IotCoreClient.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/iot/Message.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/iot/PayloadFormatIndicator.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/iot/PublishRequest.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/iot/SubscribeRequest.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/iot/Subscription.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/iot/UserProperty.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/steps/AssertionSteps.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/steps/CommonSteps.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/steps/IotSteps.java create mode 100644 uat/testing-features/src/main/java/com/aws/greengrass/util/CoreDevice.java create mode 100644 uat/testing-features/src/main/resources/greengrass/features/disk-spooler-1.feature create mode 100644 uat/testing-features/src/main/resources/greengrass/recipes/recipe.yaml diff --git a/.github/workflows/uat.yaml b/.github/workflows/uat.yaml new file mode 100644 index 0000000..5f944e8 --- /dev/null +++ b/.github/workflows/uat.yaml @@ -0,0 +1,32 @@ +name: OTF UATS + +on: + pull_request: + branches: '*' + +env: + AWS_REGION : "us-west-2" + CODE_BUILD_PROJECT_LINUX: "DiskSpoolerUatCodeBuildLinux" + AWS_ROLE_TO_ASSUME: "arn:aws:iam::686385081908:role/aws-greengrass-disk-spooler-codebuild-uat-role-amazonlinux" + +jobs: + uat-linux: + permissions: + id-token: write + contents: read + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ ubuntu-latest ] + steps: + - name: configure aws credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + role-to-assume: ${{ env.AWS_ROLE_TO_ASSUME }} + role-session-name: nucleusCI + aws-region: ${{ env.AWS_REGION }} + - name: Run UAT on linux + uses: aws-actions/aws-codebuild-run-build@v1 + with: + project-name: ${{ env.CODE_BUILD_PROJECT_LINUX }} + buildspec-override: uat/codebuild/uat_linux_buildspec.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5cc9a89 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +# UATs +uat-results +greengrass-nucleus-latest.zip + +# GDK +greengrass-build diff --git a/pom.xml b/pom.xml index 8cb064f..f1c289a 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ * codestyle/** + uat-results/** src/*/resources/** diff --git a/run-uats.sh b/run-uats.sh new file mode 100755 index 0000000..c2c43ec --- /dev/null +++ b/run-uats.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +# +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# +set -e +curl -s https://d2s8p88vqu9w66.cloudfront.net/releases/greengrass-nucleus-latest.zip > greengrass-nucleus-latest.zip +mvn -U -ntp clean package -DskipTests +mvn -U -ntp clean verify -f uat/pom.xml +sudo java -Dggc.archive=greengrass-nucleus-latest.zip -Dtest.log.path=uat-results -Dtags=DiskSpooler -jar uat/testing-features/target/greengrass-disk-spooler-testing-features.jar diff --git a/uat/README.md b/uat/README.md new file mode 100644 index 0000000..286eec0 --- /dev/null +++ b/uat/README.md @@ -0,0 +1,49 @@ +## Disk Spooler User Acceptance Tests + +User Acceptance Tests for Disk Spooler run using `aws-greengrass-testing-standalone` as a library. They execute E2E +tests which will spin up an instance of Greengrass on your device and execute different sets of tests, by installing +the `aws.greengrass.DiskSpooler` component. + +## Running UATs locally + +Ensure credentials are available by setting them in environment variables. In unix based systems: + +```bash +export AWS_ACCESS_KEY_ID= +export AWS_SECRET_ACCESS_KEY= +``` + +on Windows Powershell + +```bash +$Env:AWS_ACCESS_KEY_ID= +$Env:AWS_SECRET_ACCESS_KEY= +``` + +For UATs to run you will need to package your entire application along with `aws-greengrass-testing-standalone` into +an uber jar. To do run (from the root of the project) + +``` +mvn -U -ntp clean verify -f uat/pom.xml +``` + +Note: Everytime you make changes to the codebase you will have to rebuild the uber jar for those changes to be present +on the final artifact. + +Finally, download the zip containing the latest version of the Nucleus, which will be used to provision Greengrass for +the UATs. + +```bash +curl -s https://d2s8p88vqu9w66.cloudfront.net/releases/greengrass-nucleus-latest.zip > greengrass-nucleus-latest.zip +``` + +Execute the UATs by running the following command from the root of the project. + +``` +sudo java -Dggc.archive= -Dtest.log.path= -Dtags=DiskSpooler -jar uat/testing-features/target/greengrass-disk-spooler-testing-features.jar +``` + +Command arguments: + +Dggc.archive - path to the nucleus zip that was downloaded +Dtest.log.path - path where you would like the test results to be stored diff --git a/uat/codebuild/uat_linux_buildspec.yaml b/uat/codebuild/uat_linux_buildspec.yaml new file mode 100644 index 0000000..d3bbc82 --- /dev/null +++ b/uat/codebuild/uat_linux_buildspec.yaml @@ -0,0 +1,29 @@ +# +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# + +version: 0.2 +phases: + install: + runtime-versions: + java: corretto11 + build: + commands: + - curl -s https://d2s8p88vqu9w66.cloudfront.net/releases/greengrass-nucleus-latest.zip > /tmp/greengrass-nucleus-latest.zip + - mvn -U -ntp verify -DskipTests=true + - mvn -U -ntp clean verify -f uat/pom.xml + - java -Dggc.archive=/tmp/greengrass-nucleus-latest.zip + -Dtags='DiskSpooler' -Dggc.install.root=$CODEBUILD_SRC_DIR -Dggc.log.level=INFO -Daws.region=$AWS_REGION + -jar uat/testing-features/target/greengrass-disk-spooler-testing-features.jar + +artifacts: + files: + - 'testResults/**/*' + name: 'DiskSpoolerUatLinuxLogs.zip' + +reports: + uat-reports: + files: + - "TEST-greengrass-results.xml" + file-format: "JUNITXML" diff --git a/uat/custom-components/pom.xml b/uat/custom-components/pom.xml new file mode 100644 index 0000000..042c7dc --- /dev/null +++ b/uat/custom-components/pom.xml @@ -0,0 +1,74 @@ + + + + + 4.0.0 + custom-components + com.aws.greengrass + 1.0-SNAPSHOT + + 8 + 8 + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + custom-components + + + com.aws.greengrass.Main + + + + + + + + + + + com.fasterxml.jackson.core + jackson-databind + 2.12.7.1 + + + org.slf4j + slf4j-api + 1.7.30 + compile + + + software.amazon.awssdk.iotdevicesdk + aws-iot-device-sdk + 1.17.5 + + + org.slf4j + slf4j-simple + 1.7.30 + + + org.apache.httpcomponents + httpclient + 4.5.13 + compile + + + diff --git a/uat/custom-components/src/main/java/com/aws/greengrass/Main.java b/uat/custom-components/src/main/java/com/aws/greengrass/Main.java new file mode 100644 index 0000000..e2cf405 --- /dev/null +++ b/uat/custom-components/src/main/java/com/aws/greengrass/Main.java @@ -0,0 +1,18 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass; + +import java.util.function.Consumer; + +public final class Main { + private Main(){} + + @SuppressWarnings({"unchecked", "deprecation"}) + public static void main(String[] args) throws ClassNotFoundException, IllegalAccessException, InstantiationException { + ((Consumer) Class.forName("com.aws.greengrass.artifacts." + System.getProperty("componentName")) + .newInstance()).accept(args); + } +} diff --git a/uat/custom-components/src/main/java/com/aws/greengrass/artifacts/IotMqttPublisher.java b/uat/custom-components/src/main/java/com/aws/greengrass/artifacts/IotMqttPublisher.java new file mode 100644 index 0000000..fba8730 --- /dev/null +++ b/uat/custom-components/src/main/java/com/aws/greengrass/artifacts/IotMqttPublisher.java @@ -0,0 +1,73 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.artifacts; + +import com.aws.greengrass.utils.Client; +import com.aws.greengrass.utils.IPCTestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPC; +import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; +import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; +import software.amazon.awssdk.aws.greengrass.model.QOS; +import software.amazon.awssdk.aws.greengrass.model.ReportedLifecycleState; +import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +public class IotMqttPublisher implements Consumer { + private static final Logger LOGGER = LoggerFactory.getLogger(IotMqttPublisher.class); + private static final String SPOOL_SIZE_ERROR = "Message is larger than the size of message spool."; + private static final String SPOOL_FULL_ERROR = "Message spool is full. Message could not be added."; + + @Override + public void accept(String[] args) { + try (Client assertionClient = new Client(); + GreengrassCoreIPCClientV2 eventStreamRpcConnection = IPCTestUtils.getGreengrassClient()) { + GreengrassCoreIPC greengrassCoreIPCClient = eventStreamRpcConnection.getClient(); + + String topic = System.getenv("topic"); + String payload = System.getenv("payload"); + QOS qos = IPCTestUtils.getQOSFromValue(Integer.parseInt(System.getenv("qos"))); + + PublishToIoTCoreRequest request = new PublishToIoTCoreRequest(); + request.setTopicName(topic); + request.setPayload(payload.getBytes(StandardCharsets.UTF_8)); + request.setQos(qos); + + try { + greengrassCoreIPCClient.publishToIoTCore(request, Optional.empty()).getResponse().get(); + assertionClient.sendAssertion(true, "Successfully published to IoT topic " + topic, ""); + } catch (ExecutionException e) { + LOGGER.error("Error occurred while publishing to IoT topic", e); + try { + String errorMessage = e.getCause().getMessage(); + if (errorMessage.contains(SPOOL_SIZE_ERROR) || errorMessage.contains(SPOOL_FULL_ERROR)) { + assertionClient.sendAssertion(true, "SPOOL_FULL_ERROR", "Spooler is full"); + return; + } + if (e.getCause() instanceof UnauthorizedError) { + assertionClient.sendAssertion(true, e.getCause().getMessage(), "Unauthorized error while publishing to IoT topic " + topic); + return; + } + } catch (IOException e2) { + LOGGER.error("Failed to send assertion", e2); + } + IPCTestUtils.reportState(greengrassCoreIPCClient, ReportedLifecycleState.ERRORED); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.exit(1); + } catch (Exception e) { + LOGGER.error("Service errored", e); + System.exit(1); + } + } +} diff --git a/uat/custom-components/src/main/java/com/aws/greengrass/utils/Client.java b/uat/custom-components/src/main/java/com/aws/greengrass/utils/Client.java new file mode 100644 index 0000000..5658190 --- /dev/null +++ b/uat/custom-components/src/main/java/com/aws/greengrass/utils/Client.java @@ -0,0 +1,58 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.utils; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultHttpRequestRetryHandler; +import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class Client implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(Client.class); + + private final CloseableHttpClient httpClient; + + public Client() { + httpClient = HttpClients.custom() + .setRetryHandler(new DefaultHttpRequestRetryHandler(5, false)).build(); + } + + @Override + public void close() { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException e) { + LOGGER.warn("unable to close http client", e); + } + } + } + + public void sendAssertion(boolean success, String context, String message) throws IOException { + int defaultPort = (int) Double.parseDouble(System.getProperty("serverPort")); + sendAssertionWithCustomizedPort(success, context, message, defaultPort); + } + + public void sendAssertionWithCustomizedPort(boolean success, String context, String message, int port) + throws IOException { + HttpPost httpPost = new HttpPost( + "http://localhost:" + port + "/assert"); + httpPost.setEntity(new ByteArrayEntity( + (String.format("{\"success\": %s, \"context\": \"%s\", \"message\": \"%s\"}", success, context, + message)).getBytes(StandardCharsets.UTF_8), ContentType.APPLICATION_JSON)); + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + response.getStatusLine().getStatusCode(); + } + } +} diff --git a/uat/custom-components/src/main/java/com/aws/greengrass/utils/IPCTestUtils.java b/uat/custom-components/src/main/java/com/aws/greengrass/utils/IPCTestUtils.java new file mode 100644 index 0000000..0a76da0 --- /dev/null +++ b/uat/custom-components/src/main/java/com/aws/greengrass/utils/IPCTestUtils.java @@ -0,0 +1,42 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.utils; + +import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPC; +import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; +import software.amazon.awssdk.aws.greengrass.model.QOS; +import software.amazon.awssdk.aws.greengrass.model.ReportedLifecycleState; +import software.amazon.awssdk.aws.greengrass.model.UpdateStateRequest; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +public final class IPCTestUtils { + private IPCTestUtils() { + } + + public static GreengrassCoreIPCClientV2 getGreengrassClient() throws InterruptedException, IOException { + return GreengrassCoreIPCClientV2.builder().build(); + } + + public static QOS getQOSFromValue(int qos) { + if (qos == 1) { + return QOS.AT_LEAST_ONCE; + } else if (qos == 0) { + return QOS.AT_MOST_ONCE; + } + return QOS.AT_LEAST_ONCE; //default value + } + + public static void reportState(GreengrassCoreIPC greengrassCoreIPCClient, ReportedLifecycleState state) + throws ExecutionException, InterruptedException { + UpdateStateRequest updateStateRequest = new UpdateStateRequest(); + updateStateRequest.setState(state); + greengrassCoreIPCClient.updateState(updateStateRequest, Optional.empty()).getResponse().get(); + } +} + diff --git a/uat/custom-components/src/main/resources/recipes/IotMqttPublisher.yaml b/uat/custom-components/src/main/resources/recipes/IotMqttPublisher.yaml new file mode 100644 index 0000000..b0ddbfb --- /dev/null +++ b/uat/custom-components/src/main/resources/recipes/IotMqttPublisher.yaml @@ -0,0 +1,21 @@ +# +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# + +--- +RecipeFormatVersion: '2020-01-25' +ComponentName: IotMqttPublisher +ComponentVersion: '0.0.0' +ComponentDescription: Publishes MQTT messages to IoT Core via Nucleus' spooler +ComponentPublisher: Amazon +Manifests: + - Artifacts: + - URI: classpath:/local-store/artifacts/custom-components.jar + Lifecycle: + Setenv: + topic: "{configuration:/topic}" + payload: "{configuration:/payload}" + qos: "{configuration:/qos}" + Run: >- + java -Dlog.level=INFO -DserverPort={configuration:/assertionServerPort} -DcomponentName="IotMqttPublisher" -jar {artifacts:path}/custom-components.jar diff --git a/uat/pom.xml b/uat/pom.xml new file mode 100644 index 0000000..e3dbe19 --- /dev/null +++ b/uat/pom.xml @@ -0,0 +1,27 @@ + + + + 4.0.0 + com.aws.greengrass + disk-spooler-uat + 1.0-SNAPSHOT + pom + + + custom-components + + + testing-features + + + + 8 + 8 + + diff --git a/uat/testing-features/pom.xml b/uat/testing-features/pom.xml new file mode 100644 index 0000000..ff7e35c --- /dev/null +++ b/uat/testing-features/pom.xml @@ -0,0 +1,219 @@ + + + + + 4.0.0 + com.aws.greengrass + disk-spooler-testing-features + 1.0-SNAPSHOT + + 1.8 + 1.8 + 2.20.157 + false + + + + greengrass-common + greengrass common + + https://d2jrmugq4soldf.cloudfront.net/snapshots + + + + + + software.amazon.awssdk + bom + ${aws.sdk.version} + pom + import + + + + + + com.aws.greengrass + aws-greengrass-testing-standalone + 1.2.0-SNAPSHOT + compile + + + org.projectlombok + lombok + 1.18.26 + compile + + + com.google.auto.service + auto-service + 1.0.1 + + + org.junit.jupiter + junit-jupiter-api + 5.8.1 + compile + + + software.amazon.awssdk.iotdevicesdk + aws-iot-device-sdk + 1.17.5 + + + software.amazon.awssdk + iotdataplane + compile + + + + org.immutables + value + 2.9.3 + provided + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 3.0.0 + + + copy-artifact-to-classpath + process-classes + + run + + + + + + + + + + + org.apache.maven.plugins + maven-pmd-plugin + 3.13.0 + + + 0 + true + + ../../codestyle/pmd-eg-ruleset.xml + ../../codestyle/pmd-eg-tests-ruleset.xml + + true + ${skipTests} + + + + test + + check + + + + + + maven-checkstyle-plugin + 3.1.0 + + + com.puppycrawl.tools + checkstyle + 8.29 + + + + true + ../../codestyle/checkstyle.xml + warning + 0 + ${skipTests} + + + + validate + validate + + check + + + + + + maven-resources-plugin + 2.6 + + + copy-local-store-recipe-resources + process-classes + + copy-resources + + + ${basedir}/target/classes/local-store/recipes + + + ${basedir}/../../uat/custom-components/target/classes/recipes + + + + + + copy-local-store-artifact-resources + process-classes + + copy-resources + + + ${basedir}/target/classes/local-store/artifacts/ + + + ${basedir}/../../uat/custom-components/target/ + + custom-components.jar + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.2 + + + package + + shade + + + greengrass-disk-spooler-testing-features + + + + + com.aws.greengrass.testing.launcher.TestLauncher + + + + + + + + + diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/IotCoreClient.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/IotCoreClient.java new file mode 100644 index 0000000..e1075a3 --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/IotCoreClient.java @@ -0,0 +1,293 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.iot; + +import com.aws.greengrass.util.CoreDevice; +import com.google.inject.Inject; +import lombok.extern.log4j.Log4j2; +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.crt.mqtt5.Mqtt5Client; +import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions; +import software.amazon.awssdk.crt.mqtt5.OnAttemptingConnectReturn; +import software.amazon.awssdk.crt.mqtt5.OnConnectionFailureReturn; +import software.amazon.awssdk.crt.mqtt5.OnConnectionSuccessReturn; +import software.amazon.awssdk.crt.mqtt5.OnDisconnectionReturn; +import software.amazon.awssdk.crt.mqtt5.OnStoppedReturn; +import software.amazon.awssdk.crt.mqtt5.PublishResult; +import software.amazon.awssdk.crt.mqtt5.PublishReturn; +import software.amazon.awssdk.crt.mqtt5.packets.ConnAckPacket; +import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket; +import software.amazon.awssdk.crt.mqtt5.packets.DisconnectPacket; +import software.amazon.awssdk.crt.mqtt5.packets.PublishPacket; +import software.amazon.awssdk.crt.mqtt5.packets.SubAckPacket; +import software.amazon.awssdk.crt.mqtt5.packets.SubscribePacket; +import software.amazon.awssdk.iot.AwsIotMqtt5ClientBuilder; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +@Log4j2 +@SuppressWarnings("MissingJavadocMethod") +public class IotCoreClient { + private final Set subscriptions = Collections.synchronizedSet(new HashSet<>()); + private final PublishHandler publishHandler = new PublishHandler(); + private final LifecycleHandler lifecycleHandler = new LifecycleHandler(); + private final CountDownLatch started = new CountDownLatch(1); + private Mqtt5Client client; + private Future resubscribeTask; + + private final CoreDevice coreDevice; + private final ExecutorService executorService = Executors.newFixedThreadPool(1); + + @Inject + public IotCoreClient(CoreDevice coreDevice) { + this.coreDevice = coreDevice; + } + + public void publish(PublishRequest request) + throws ExecutionException, InterruptedException, TimeoutException, IOException { + start(); + PublishResult result = client.publish(asPublishPacket(request)).get(); + log.trace("publish result: topic={}; reason={}; reasonCode={}; message={};", + request.getTopic(), + result.getResultPubAck().getReasonString(), + result.getResultPubAck().getReasonCode(), + request.getMessage().getMessage()); + } + + public void resubscribeAsync() { + if (resubscribeTask != null) { + resubscribeTask.cancel(true); + } + resubscribeTask = executorService.submit(() -> { + Set subscribeRequests = getSubscriptions().stream() + .map(Subscription::getRequest) + .collect(Collectors.toSet()); + for (SubscribeRequest request : subscribeRequests) { + try { + subscribe(request); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (ExecutionException | TimeoutException | IOException e) { + log.error("unable to resubscribe to topic {}", request.getTopic(), e); + } + } + }); + } + + public void subscribe(SubscribeRequest request) + throws ExecutionException, InterruptedException, TimeoutException, IOException { + start(); + + SubscribePacket subscribePacket = new SubscribePacket.SubscribePacketBuilder() + .withSubscription(request.getTopic(), request.getQos()) + .build(); + Subscription subscription = Subscription.builder() + .topic(request.getTopic()) + .callback(request.getCallback()) + .request(request) + .build(); + boolean newSubscription = subscriptions.add(subscription); + + try { + SubAckPacket subAckPacket = client.subscribe(subscribePacket).get(); + log.debug("subscription result: topic={}; reason={}; reasonCode={};", + request.getTopic(), + subAckPacket.getReasonString(), + subAckPacket.getReasonCodes().get(0)); + if (subAckPacket.getReasonCodes().get(0).getValue() > 2) { + if (newSubscription) { + // only remove if not resubscribing + subscriptions.remove(subscription); + } + request.getErrorCallback().accept(subAckPacket, null); + throw new RuntimeException("Unable to subscribe reasonCode=" + + subAckPacket.getReasonCodes().get(0)); + } + } catch (ExecutionException | InterruptedException e) { + if (newSubscription) { + // only remove if not resubscribing + subscriptions.remove(subscription); + } + request.getErrorCallback().accept(null, e); + throw e; + } + } + + private synchronized void start() throws InterruptedException, TimeoutException { + if (client == null) { + client = createMqttClient(); + client.start(); + log.info("Waiting for client to connect..."); + if (!started.await(10L, TimeUnit.SECONDS)) { + throw new TimeoutException("Timed-out waiting for client connect on start"); + } + } + } + + public synchronized void close() { + if (client != null) { + try { + client.stop(new DisconnectPacket.DisconnectPacketBuilder() + .withReasonCode(DisconnectPacket.DisconnectReasonCode.NORMAL_DISCONNECTION) + .build()); + } catch (CrtRuntimeException e) { + log.error("Unable to stop mqtt client", e); + client.close(); + } + } + executorService.shutdownNow(); + } + + private Set getSubscriptions() { + Set subscriptions; + synchronized (this.subscriptions) { + subscriptions = new HashSet<>(this.subscriptions); + } + return subscriptions; + } + + private static PublishPacket asPublishPacket(PublishRequest request) { + return new PublishPacket.PublishPacketBuilder() + .withQOS(request.getQos()) + .withTopic(request.getTopic()) + .withResponseTopic(request.getMessage().getResponseTopic()) + .withPayload(request.getMessage().getPayload()) + .withPayloadFormat(request.getMessage().getPayloadFormat() == null ? null + : PublishPacket.PayloadFormatIndicator.getEnumValueFromInteger( + request.getMessage().getPayloadFormat().getValue())) + .withRetain(request.getMessage().isRetain()) + .withContentType(request.getMessage().getContentType()) + .withCorrelationData(request.getMessage().getCorrelationData()) + .withMessageExpiryIntervalSeconds(request.getMessage().getMessageExpiryIntervalSeconds()) + .withUserProperties(request.getMessage().getUserProperties() == null ? null + : request.getMessage().getUserProperties().stream() + .map(p -> new software.amazon.awssdk.crt.mqtt5.packets.UserProperty(p.getKey(), p.getValue())) + .collect(Collectors.toList())) + .build(); + } + + private Mqtt5Client createMqttClient() { + try (AwsIotMqtt5ClientBuilder builder = AwsIotMqtt5ClientBuilder.newDirectMqttBuilderWithMtlsFromMemory( + coreDevice.getIotCoreDataEndpoint(), + coreDevice.getSpec().resource().certificate().certificatePem(), + coreDevice.getSpec().resource().certificate().keyPair().privateKey())) { + return builder + .withCertificateAuthority(coreDevice.getRootCA()) + .withLifeCycleEvents(lifecycleHandler) + .withPublishEvents(publishHandler) + .withSessionBehavior(Mqtt5ClientOptions.ClientSessionBehavior.REJOIN_POST_SUCCESS) + .withOfflineQueueBehavior(Mqtt5ClientOptions.ClientOfflineQueueBehavior.FAIL_ALL_ON_DISCONNECT) + .withConnectProperties(new ConnectPacket.ConnectPacketBuilder() + .withRequestProblemInformation(true) + .withClientId(getClass().getSimpleName() + UUID.randomUUID()) + .withSessionExpiryIntervalSeconds(10_080L) // overriding default value to extend session + // must be between 30 and 1200 seconds + .withKeepAliveIntervalSeconds(30L)) + .withPingTimeoutMs(Duration.ofSeconds(3).toMillis()) + .withMinReconnectDelayMs(Duration.ofSeconds(1).toMillis()) + .withMaxReconnectDelayMs(Duration.ofSeconds(2).toMillis()) + .build(); + } + } + + private class LifecycleHandler implements Mqtt5ClientOptions.LifecycleEvents { + @Override + public void onAttemptingConnect(Mqtt5Client mqtt5Client, + OnAttemptingConnectReturn onAttemptingConnectReturn) { + log.debug("Connect in progress"); + } + + @Override + public void onConnectionSuccess(Mqtt5Client mqtt5Client, + OnConnectionSuccessReturn onConnectionSuccessReturn) { + boolean sessionPresent = onConnectionSuccessReturn.getConnAckPacket().getSessionPresent(); + log.info("Connected, sessionPresent={}", sessionPresent); + started.countDown(); + if (!sessionPresent) { + // don't block crt thread + resubscribeAsync(); + } + } + + @Override + public void onConnectionFailure(Mqtt5Client mqtt5Client, + OnConnectionFailureReturn onConnectionFailureReturn) { + ConnAckPacket connAck = onConnectionFailureReturn.getConnAckPacket(); + log.warn("Connect failed. errorCode={}; reasonCode={}; reason={};", + CRT.awsErrorString(onConnectionFailureReturn.getErrorCode()), + connAck == null ? null : connAck.getReasonCode(), + connAck == null ? null : connAck.getReasonString()); + } + + @Override + public void onDisconnection(Mqtt5Client mqtt5Client, OnDisconnectionReturn onDisconnectionReturn) { + DisconnectPacket discon = onDisconnectionReturn.getDisconnectPacket(); + log.info("Disconnected. errorCode={}; reasonCode={}; reason={};", + CRT.awsErrorString(onDisconnectionReturn.getErrorCode()), + discon == null ? null : discon.getReasonCode(), + discon == null ? null : discon.getReasonString()); + } + + @Override + public void onStopped(Mqtt5Client mqtt5Client, OnStoppedReturn onStoppedReturn) { + log.debug("Stopped"); + mqtt5Client.close(); + } + } + + private class PublishHandler implements Mqtt5ClientOptions.PublishEvents { + + @Override + public void onMessageReceived(Mqtt5Client mqtt5Client, PublishReturn publishReturn) { + String topic = publishReturn.getPublishPacket().getTopic(); + Message message = asMessage(publishReturn); + log.trace("Message received on topic {} with payload {}", topic, message.getMessage()); + forwardMessageToSubscribers(topic, message); + } + + private Message asMessage(PublishReturn publishReturn) { + PublishPacket publishPacket = publishReturn.getPublishPacket(); + return Message.builder() + .payload(publishPacket.getPayload()) + .contentType(publishPacket.getContentType()) + .correlationData(publishPacket.getCorrelationData()) + .messageExpiryIntervalSeconds(publishPacket.getMessageExpiryIntervalSeconds()) + .payloadFormat(publishPacket.getPayloadFormat() == null ? null + : PayloadFormatIndicator.fromIndicator(publishPacket.getPayloadFormat().getValue())) + .responseTopic(publishPacket.getResponseTopic()) + .retain(publishPacket.getRetain()) + .subscriptionIdentifiers(publishPacket.getSubscriptionIdentifiers()) + .userProperties(publishPacket.getUserProperties() == null ? null + : publishPacket.getUserProperties().stream() + .map(p -> UserProperty.builder().key(p.key).value(p.value).build()) + .collect(Collectors.toList())) + .build(); + } + + private void forwardMessageToSubscribers(String topic, Message message) { + getSubscriptions().stream() + .filter(s -> Objects.equals(s.getTopic(), topic)) + .map(Subscription::getCallback) + .forEach(c -> c.accept(message)); + } + } +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/Message.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/Message.java new file mode 100644 index 0000000..a47f6e2 --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/Message.java @@ -0,0 +1,36 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.iot; + +import lombok.Builder; +import lombok.Getter; +import lombok.Value; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +@Value +@Builder +@SuppressWarnings({"PMD.AvoidFieldNameMatchingTypeName", "checkstyle:VisibilityModifier", "MissingJavadocMethod"}) +public class Message { + + @Getter(lazy = true) + String message = messageFromPayload(); + byte[] payload; + String contentType; + Long topicAlias; + byte[] correlationData; + Long messageExpiryIntervalSeconds; + PayloadFormatIndicator payloadFormat; + String responseTopic; + boolean retain; + List subscriptionIdentifiers; + List userProperties; + + String messageFromPayload() { + return payload == null ? null : new String(payload, StandardCharsets.UTF_8); + } +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/PayloadFormatIndicator.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/PayloadFormatIndicator.java new file mode 100644 index 0000000..8ae1ad2 --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/PayloadFormatIndicator.java @@ -0,0 +1,31 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.iot; + +import java.util.Arrays; + +@SuppressWarnings("MissingJavadocMethod") +public enum PayloadFormatIndicator { + BYTES(0), + UTF8(1); + + private final int indicator; + + PayloadFormatIndicator(int value) { + this.indicator = value; + } + + public int getValue() { + return this.indicator; + } + + public static PayloadFormatIndicator fromIndicator(int indicator) { + return Arrays.stream(PayloadFormatIndicator.values()) + .filter(i -> i.getValue() == indicator) + .findFirst() + .orElseThrow(() -> new RuntimeException("Unexpected payload format indicator " + indicator)); + } +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/PublishRequest.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/PublishRequest.java new file mode 100644 index 0000000..2904525 --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/PublishRequest.java @@ -0,0 +1,20 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.iot; + +import lombok.Builder; +import lombok.Value; +import software.amazon.awssdk.crt.mqtt5.QOS; + +@Value +@Builder +@SuppressWarnings("checkstyle:VisibilityModifier") +public class PublishRequest { + String topic; + Message message; + @Builder.Default + QOS qos = QOS.AT_LEAST_ONCE; +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/SubscribeRequest.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/SubscribeRequest.java new file mode 100644 index 0000000..a2fe06d --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/SubscribeRequest.java @@ -0,0 +1,29 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.iot; + +import lombok.Builder; +import lombok.Value; +import software.amazon.awssdk.crt.mqtt5.QOS; +import software.amazon.awssdk.crt.mqtt5.packets.SubAckPacket; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +@Value +@Builder +@SuppressWarnings("checkstyle:VisibilityModifier") +public class SubscribeRequest { + String topic; + @Builder.Default + Consumer callback = m -> { + }; + @Builder.Default + BiConsumer errorCallback = (p, e) -> { + }; + @Builder.Default + QOS qos = QOS.AT_LEAST_ONCE; +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/Subscription.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/Subscription.java new file mode 100644 index 0000000..6eef168 --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/Subscription.java @@ -0,0 +1,20 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.iot; + +import lombok.Builder; +import lombok.Value; + +import java.util.function.Consumer; + +@Value +@Builder +@SuppressWarnings("checkstyle:VisibilityModifier") +public class Subscription { + String topic; + Consumer callback; + SubscribeRequest request; +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/iot/UserProperty.java b/uat/testing-features/src/main/java/com/aws/greengrass/iot/UserProperty.java new file mode 100644 index 0000000..30f70f0 --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/iot/UserProperty.java @@ -0,0 +1,17 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.iot; + +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +@SuppressWarnings("checkstyle:VisibilityModifier") +public class UserProperty { + String key; + String value; +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/AssertionSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/AssertionSteps.java new file mode 100644 index 0000000..0004dd0 --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/AssertionSteps.java @@ -0,0 +1,228 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + + +package com.aws.greengrass.steps; + +import com.aws.greengrass.testing.features.WaitSteps; +import com.aws.greengrass.testing.model.ScenarioContext; +import com.aws.greengrass.testing.model.TestContext; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import com.sun.net.httpserver.spi.HttpServerProvider; +import io.cucumber.guice.ScenarioScoped; +import io.cucumber.java.After; +import io.cucumber.java.en.Given; +import io.cucumber.java.en.Then; +import io.cucumber.java.en.When; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.log4j.Log4j2; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.fail; + + +@Log4j2 +@ScenarioScoped +@SuppressWarnings("MissingJavadocMethod") +public class AssertionSteps implements Closeable { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final Logger LOGGER = LogManager.getLogger(AssertionSteps.class); + @Getter + private final List assertionList = new CopyOnWriteArrayList<>(); + private final ScenarioContext scenarioContext; + private final TestContext testContext; + private final WaitSteps waits; + private HttpServer server; + @Getter + private int port; + + @Inject + @SuppressWarnings("MissingJavadocMethod") + public AssertionSteps(TestContext testContext, + ScenarioContext scenarioContext, + WaitSteps waits) { + + this.testContext = testContext; + this.scenarioContext = scenarioContext; + this.waits = waits; + } + + private void assertionHandler(HttpExchange httpExchange) { + try { + Assertion val = MAPPER.readValue(httpExchange.getRequestBody(), Assertion.class); + assertionList.add(val); + LOGGER.debug("Got assertion value {}", val); + httpExchange.sendResponseHeaders(200, 0); + httpExchange.getResponseBody().flush(); + } catch (IOException e) { + LOGGER.error("Error decoding assertion", e); + } finally { + httpExchange.close(); + } + } + + @Given("I start an assertion server") + @SuppressWarnings("MissingJavadocMethod") + public void start() throws IOException { + if (server != null) { + throw new IllegalStateException("Server already exists"); + } + + server = HttpServerProvider.provider().createHttpServer(new InetSocketAddress("localhost", 0), + 0); + server.createContext("/assert", this::assertionHandler); + LOGGER.debug("Starting HTTP assertion server"); + server.setExecutor(Executors.newCachedThreadPool(runnable -> { + // Daemon-ize to allow main thread to end + final Thread thread = Executors.defaultThreadFactory().newThread(runnable); + thread.setName(String.format("AssertionServer-%d-%s", server.getAddress().getPort(), + testContext.testId().id())); + thread.setDaemon(true); + return thread; + })); + server.start(); + port = server.getAddress().getPort(); + LOGGER.info("Started HTTP assertion server at port {}", port); + scenarioContext.put("assertionServerPort", Integer.toString(port)); + } + + + @Then("I get {int} assertion(s) with context {string}") + public void assertionsWithContext(int assertionCount, String context) throws Throwable { + assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context), null, + 100, true); + } + + + @Then("I get at least {int} assertion(s) with context {string}") + public void atLeastNAssertionsWithContext(int assertionCount, String context) throws Throwable { + assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context), null, + 100, false); + } + + + @Then("I get at least {int} assertion(s) with context {string} within {long} seconds") + public void atLeastNAssertionsWithContext(int assertionCount, String context, long timeoutSeconds) + throws Throwable { + assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context), + null, timeoutSeconds, false); + } + + @Then("I get {int} assertion(s) with context {string} within {long} seconds") + public void assertionsWithContextTimeout(int assertionCount, String context, long timeoutSeconds) + throws Throwable { + assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context), + null, timeoutSeconds, true); + } + + + @Then("I get {int} assertion(s) with context {string} and message {string} within {long} seconds") + public void assertionsWithContextTimeout(int assertionCount, String context, String message, + long timeoutSeconds) throws Throwable { + + assertionsWithContextTimeout(assertionCount, scenarioContext.applyInline(context), + message, timeoutSeconds, true); + } + + @SuppressWarnings("PMD.AvoidCatchingThrowable") + private void assertionsWithContextTimeout(int assertionCount, String context, String message, + long timeoutSeconds, boolean failOnTooManyAssertions) + throws Throwable { + String exceptionMessage = (message == null) + ? String.format("did not receive %d assertions for '%s' in time (%ds)", assertionCount, + context, timeoutSeconds) + : String.format("did not receive %d assertions for '%s' with message '%s' in time (%ds)", + assertionCount, context, message, timeoutSeconds); + + class Wrapper { + @Getter + @Setter + private T wrapped; + } + + Wrapper wrapper = new Wrapper<>(); + boolean finished = waits.untilTrue(() -> { + try { + return assertionsReceived(assertionCount, context, message, failOnTooManyAssertions, exceptionMessage); + } catch (Throwable e) { + wrapper.setWrapped(e); + return true; + } + }, (int) timeoutSeconds, TimeUnit.SECONDS); + + if (!finished) { + throw new TimeoutException("Timeout: " + exceptionMessage); + } + + Throwable e = wrapper.getWrapped(); + if (e != null) { + throw e; + } + } + + private boolean assertionsReceived(int assertionCount, String context, String message, + boolean failOnTooManyAssertions, String exceptionMessage) { + List assertions = assertionList.stream() + .filter(assertion -> assertion.getContext() != null) + .filter(assertion -> context.equalsIgnoreCase(assertion.getContext())) + .filter(assertion -> message == null || message.equalsIgnoreCase(assertion.getMessage())) + .collect(Collectors.toList()); + for (Assertion assertion : assertions) { + if (!assertion.isSuccess()) { + fail(String.format("Assertion with message %s and context %s FAILED", assertion.getMessage(), + assertion.getContext())); + } + } + if (failOnTooManyAssertions && assertions.size() > assertionCount) { + fail(exceptionMessage); + } + return assertions.size() >= assertionCount; + } + + @After + @Override + public void close() { + if (server != null) { + server.stop(0); + ((ExecutorService) server.getExecutor()).shutdownNow(); + LOGGER.info("Shut down HTTP assertion server"); + } + } + + @When("I clear the assertions") + public void clearTheAssertions() { + assertionList.clear(); + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + @ToString + static class Assertion { + private boolean success; + private String context; + private String message; + } +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/CommonSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/CommonSteps.java new file mode 100644 index 0000000..eb95440 --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/CommonSteps.java @@ -0,0 +1,30 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.steps; + +import com.aws.greengrass.testing.model.ScenarioContext; +import com.google.inject.Inject; +import io.cucumber.java.en.When; + +import java.util.UUID; + +public class CommonSteps { + private final ScenarioContext scenarioContext; + + @Inject + public CommonSteps(ScenarioContext scenarioContext) { + this.scenarioContext = scenarioContext; + } + + @When("I create a random name as {word}") + public void storeRandomNameInContext(String key) { + scenarioContext.put(key, randomName()); + } + + private static String randomName() { + return String.format("e2e-%d-%s", System.currentTimeMillis(), UUID.randomUUID()); + } +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/IotSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/IotSteps.java new file mode 100644 index 0000000..7f99c0b --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/IotSteps.java @@ -0,0 +1,95 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.steps; + +import com.aws.greengrass.iot.IotCoreClient; +import com.aws.greengrass.iot.Message; +import com.aws.greengrass.iot.SubscribeRequest; +import com.aws.greengrass.testing.features.WaitSteps; +import com.aws.greengrass.testing.model.ScenarioContext; +import com.google.inject.Inject; +import io.cucumber.java.After; +import io.cucumber.java.en.Then; +import io.cucumber.java.en.When; +import lombok.extern.log4j.Log4j2; +import software.amazon.awssdk.crt.mqtt5.QOS; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Log4j2 +@SuppressWarnings("MissingJavadocMethod") +public class IotSteps { + private final ScenarioContext scenarioContext; + private final WaitSteps waitSteps; + private final IotCoreClient iotCoreClient; + private final Map> messagesByTopic = new ConcurrentHashMap<>(); + + @Inject + public IotSteps(ScenarioContext scenarioContext, + WaitSteps waitSteps, + IotCoreClient iotCoreClient) { + this.scenarioContext = scenarioContext; + this.waitSteps = waitSteps; + this.iotCoreClient = iotCoreClient; + } + + @After + public void close() { + iotCoreClient.close(); + } + + @When("I subscribe to cloud topics") + public void subscribeToCloudTopics(List topics) + throws IOException, ExecutionException, InterruptedException, TimeoutException { + for (String topic : topics) { + String resolvedTopic = scenarioContext.applyInline(topic); + iotCoreClient.subscribe(SubscribeRequest.builder() + .topic(resolvedTopic) + .qos(QOS.AT_LEAST_ONCE) + .callback(message -> + messagesByTopic.compute(resolvedTopic, (s, messages) -> { + if (messages == null) { + messages = new ArrayList<>(); + } + messages.add(message); + return messages; + })) + .build()); + } + } + + @Then("the cloud topic {word} receives the following messages within {int} seconds") + public void checkMessagesOnTopic(String topic, int timeoutSeconds, List messages) + throws InterruptedException { + String resolvedTopic = scenarioContext.applyInline(topic); + assertTrue(waitSteps.untilTrue( + () -> { + List receivedMessages = receivedMessagePayloadsByTopic(resolvedTopic); + return Objects.equals(messages, receivedMessages); + }, + timeoutSeconds, TimeUnit.SECONDS)); + } + + private List receivedMessagePayloadsByTopic(String topic) { + List messages = messagesByTopic.get(topic); + if (messages == null) { + return Collections.emptyList(); + } + return messages.stream().map(Message::getMessage).collect(Collectors.toList()); + } +} diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/util/CoreDevice.java b/uat/testing-features/src/main/java/com/aws/greengrass/util/CoreDevice.java new file mode 100644 index 0000000..82642b1 --- /dev/null +++ b/uat/testing-features/src/main/java/com/aws/greengrass/util/CoreDevice.java @@ -0,0 +1,45 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.util; + +import com.aws.greengrass.testing.model.RegistrationContext; +import com.aws.greengrass.testing.model.TestContext; +import com.aws.greengrass.testing.resources.AWSResources; +import com.aws.greengrass.testing.resources.iot.IotLifecycle; +import com.aws.greengrass.testing.resources.iot.IotThingSpec; +import com.google.inject.Inject; +import lombok.Getter; + +/** + * Helper class that provides information related to the core device under test. + */ +@SuppressWarnings("MissingJavadocMethod") +public class CoreDevice { + + private final AWSResources awsResources; + private final TestContext testContext; + @Getter + private final String rootCA; + @Getter + private final String iotCoreDataEndpoint; + + @Inject + public CoreDevice(TestContext testContext, + RegistrationContext registrationContext, + AWSResources awsResources) { + this.awsResources = awsResources; + this.testContext = testContext; + this.rootCA = registrationContext.rootCA(); + this.iotCoreDataEndpoint = awsResources.lifecycle(IotLifecycle.class).dataEndpoint(); + } + + public IotThingSpec getSpec() { + return awsResources.trackingSpecs(IotThingSpec.class) + .filter(s -> s.thingName().equals(testContext.coreThingName())) + .findFirst() + .orElseThrow(() -> new RuntimeException("no core thing found")); + } +} diff --git a/uat/testing-features/src/main/resources/greengrass/features/disk-spooler-1.feature b/uat/testing-features/src/main/resources/greengrass/features/disk-spooler-1.feature new file mode 100644 index 0000000..f43dd10 --- /dev/null +++ b/uat/testing-features/src/main/resources/greengrass/features/disk-spooler-1.feature @@ -0,0 +1,66 @@ +@DiskSpooler +Feature: DiskSpooler-1 + + As a customer, I can spool outbound MQTT messages so that they persist across Nucleus restarts. + + Background: + Given my device is registered as a Thing + And my device is running Greengrass + And I start an assertion server + + Scenario: DiskSpooler-1-T1: MQTT messages are published to IoT Core + Then I create a random name as cloud_topic + When I subscribe to cloud topics + | ${cloud_topic} | + Given I create a Greengrass deployment with components + | aws.greengrass.Cli | LATEST | + | aws.greengrass.DiskSpooler | classpath:/greengrass/recipes/recipe.yaml | + And I update my Greengrass deployment configuration, setting the component aws.greengrass.Nucleus configuration to: + """ + { + "MERGE": { + "mqtt": { + "spooler": { + "storageType": "Disk" + } + }, + "logging": { + "level": "DEBUG" + } + } + } + """ + And I deploy the Greengrass deployment configuration + Then the Greengrass deployment is COMPLETED on the device after 3 minutes + Then I verify the aws.greengrass.DiskSpooler component is RUNNING using the greengrass-cli + Then I wait 20 seconds + When I install the component IotMqttPublisher from local store with configuration + """ + { + "MERGE": { + "assertionServerPort": ${assertionServerPort}, + "topic": "${cloud_topic}", + "payload": "Hello world", + "qos": "1", + "accessControl": { + "aws.greengrass.ipc.mqttproxy": { + "policyId1": { + "policyDescription": "access to publish to mqtt topics", + "operations": [ + "aws.greengrass#PublishToIoTCore" + ], + "resources": [ + "${cloud_topic}" + ] + } + } + } + } + } + """ + And I get 1 assertion with context "Successfully published to IoT topic ${cloud_topic}" + Then the cloud topic ${cloud_topic} receives the following messages within 10 seconds + | Hello world | + + +# TODO convert remaining UATs diff --git a/uat/testing-features/src/main/resources/greengrass/recipes/recipe.yaml b/uat/testing-features/src/main/resources/greengrass/recipes/recipe.yaml new file mode 100644 index 0000000..55f5343 --- /dev/null +++ b/uat/testing-features/src/main/resources/greengrass/recipes/recipe.yaml @@ -0,0 +1,18 @@ +# +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +# + +--- +RecipeFormatVersion: '2020-01-25' +ComponentName: aws.greengrass.DiskSpooler +ComponentDescription: AWS Greengrass Disk Spooler +ComponentPublisher: AWS +ComponentVersion: '2.4.0' +ComponentType: 'aws.greengrass.plugin' +Manifests: + - Artifacts: + - URI: "classpath:/greengrass/artifacts/aws.greengrass.DiskSpooler.jar" + Permission: + Read: ALL + Execute: ALL