diff --git a/samples/dapr-pubsub-dotnet/.static/img/overview.png b/samples/dapr-pubsub-dotnet/.static/img/overview.png new file mode 100644 index 0000000..3eaf413 Binary files /dev/null and b/samples/dapr-pubsub-dotnet/.static/img/overview.png differ diff --git a/samples/dapr-pubsub-dotnet/build.ps1 b/samples/dapr-pubsub-dotnet/build.ps1 new file mode 100644 index 0000000..cd85d01 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/build.ps1 @@ -0,0 +1,16 @@ +param ( + [Parameter(Mandatory=$True)] + [string]$Version + + [Parameter(Mandatory=$True)] + [string]$ContainerRegistry +) + +docker build .\src\TelemetryProcessor\TelemetryTransformer\. -f .\src\TelemetryProcessor\TelemetryTransformer\Dockerfile -t telemetrytransformer:$Version +docker build .\src\TelemetryProcessor\TelemetryPersister\. -f .\src\TelemetryProcessor\TelemetryPersister\Dockerfile -t telemetrypersister:$Version + +docker tag telemetrypersister:$Version $ContainerRegistry/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrypersister:$Version +docker tag telemetrytransformer:$Version $ContainerRegistry/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrytransformer:$Version + +docker push $ContainerRegistry/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrypersister:$Version +docker push $ContainerRegistry/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrytransformer:$Version \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/deploy.ps1 b/samples/dapr-pubsub-dotnet/deploy.ps1 new file mode 100644 index 0000000..4ca961c --- /dev/null +++ b/samples/dapr-pubsub-dotnet/deploy.ps1 @@ -0,0 +1,13 @@ +param ( + [Parameter(Mandatory=$True)] + [string]$ContainerRegistry, + + [Parameter(Mandatory=$True)] + [string]$Version +) + +$contents = (Get-Content .\deploy\telemetryprocessor.yaml) -Replace '#{container_registry}#', $ContainerRegistry + +$contents = $contents -replace '#{image_version}#', $Version + +$contents | kubectl apply -n azure-iot-operations -f - \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/deploy/telemetryprocessor.yaml b/samples/dapr-pubsub-dotnet/deploy/telemetryprocessor.yaml new file mode 100644 index 0000000..d53fa62 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/deploy/telemetryprocessor.yaml @@ -0,0 +1,202 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: aio-mq-pubsub + namespace: azure-iot-operations +spec: + type: pubsub.aio-mq-pubsub-pluggable + version: v1 + metadata: + - name: url + value: "aio-mq-dmqtt-frontend:8883" + - name: satTokenPath + value: "/var/run/secrets/tokens/mqtt-client-token" + - name: tlsEnabled + value: true + - name: caCertPath + value: "/var/run/certs/aio-mq-ca-cert/ca.crt" + - name: logLevel + value: "Info" +--- +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: aio-mq-statestore + namespace: azure-iot-operations +spec: + type: state.aio-mq-statestore-pluggable + version: v1 + metadata: + - name: url + value: "aio-mq-dmqtt-frontend:8883" + - name: satTokenPath + value: "/var/run/secrets/tokens/mqtt-client-token" + - name: tlsEnabled + value: true + - name: caCertPath + value: "/var/run/certs/aio-mq-ca-cert/ca.crt" + - name: logLevel + value: "Info" +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: dapr-client + namespace: azure-iot-operations + annotations: + aio-mq-broker-auth/group: dapr-workload +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: telemetrytransformer + namespace: azure-iot-operations + labels: + app: telemetrytransformer +spec: + replicas: 1 + selector: + matchLabels: + app: telemetrytransformer + template: + metadata: + labels: + app: telemetrytransformer + annotations: + dapr.io/enabled: "true" + dapr.io/unix-domain-socket-path: "/tmp/dapr-components-sockets" + dapr.io/app-id: "telemetrytransformer" + dapr.io/enable-api-logging: "true" + dapr.io/app-port: "5050" + dapr.io/app-protocol: "grpc" + dapr.io/log-level: "debug" + dapr.io/sidecar-liveness-probe-delay-seconds: "30" + dapr.io/sidecar-liveness-probe-timeout-seconds: "10" + spec: + serviceAccountName: dapr-client + volumes: + - name: dapr-unix-domain-socket + emptyDir: {} + - name: mqtt-client-token + projected: + sources: + - serviceAccountToken: + path: mqtt-client-token + audience: aio-mq + expirationSeconds: 86400 + - name: aio-ca-trust-bundle + configMap: + name: aio-ca-trust-bundle-test-only + imagePullSecrets: + - name: aio-pullsecret + containers: + - name: telemetrytransformer + image: #{container_registry}#/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrytransformer:#{image_version}# + imagePullPolicy: Always + env: + - name: Logging__LogLevel__Default + value: Debug + # Container for the pluggable component + - name: aio-mq-components + image: ghcr.io/azure/iot-mq-dapr-components:latest + volumeMounts: + - name: dapr-unix-domain-socket + mountPath: /tmp/dapr-components-sockets + - name: mqtt-client-token + mountPath: /var/run/secrets/tokens + - name: aio-ca-trust-bundle + mountPath: /var/run/certs/aio-mq-ca-cert/ +--- +kind: Service +apiVersion: v1 +metadata: + name: telemetrytransformer-workload-svc + namespace: azure-iot-operations + labels: + app: telemetrytransformer +spec: + selector: + app: telemetrytransformer + ports: + - protocol: TCP + port: 5050 #6001 + targetPort: 5050 # 6001 + type: ClusterIP +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: telemetrypersister + namespace: azure-iot-operations + labels: + app: telemetrypersister +spec: + replicas: 1 + selector: + matchLabels: + app: telemetrypersister + template: + metadata: + labels: + app: telemetrypersister + annotations: + dapr.io/enabled: "true" + dapr.io/unix-domain-socket-path: "/tmp/dapr-components-sockets" + dapr.io/app-id: "telemetrypersister" + dapr.io/enable-api-logging: "true" + dapr.io/app-port: "80" #"6001" + dapr.io/app-protocol: "http" + dapr.io/log-level: "debug" + dapr.io/sidecar-liveness-probe-delay-seconds: "30" + dapr.io/sidecar-liveness-probe-timeout-seconds: "10" + spec: + serviceAccountName: dapr-client + volumes: + - name: dapr-unix-domain-socket + emptyDir: {} + - name: mqtt-client-token + projected: + sources: + - serviceAccountToken: + path: mqtt-client-token + audience: aio-mq + expirationSeconds: 86400 + - name: aio-ca-trust-bundle + configMap: + name: aio-ca-trust-bundle-test-only + imagePullSecrets: + - name: aio-pullsecret + containers: + - name: telemetrypersister + image: #{container_registry}#/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrypersister:#{image_version}# + imagePullPolicy: Always + env: + - name: Logging__LogLevel__Default + value: Debug + # Container for the pluggable component + - name: aio-mq-components + image: ghcr.io/azure/iot-mq-dapr-components:latest + volumeMounts: + - name: dapr-unix-domain-socket + mountPath: /tmp/dapr-components-sockets + - name: mqtt-client-token + mountPath: /var/run/secrets/tokens + - name: aio-ca-trust-bundle + mountPath: /var/run/certs/aio-mq-ca-cert/ +--- +kind: Service +apiVersion: v1 +metadata: + name: telemetrypersister-workload-svc + namespace: azure-iot-operations + labels: + app: telemetrypersister +spec: + selector: + app: telemetrypersister + ports: + - protocol: TCP + port: 80 #6001 + targetPort: 80 # 6001 + type: ClusterIP + \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/readme.md b/samples/dapr-pubsub-dotnet/readme.md new file mode 100644 index 0000000..ecdd1ce --- /dev/null +++ b/samples/dapr-pubsub-dotnet/readme.md @@ -0,0 +1,120 @@ +## Introduction + +This sample shows how you can use Dapr in .NET projects to receive messages from an MQTT topic and publish messages to another MQTT topic. + +## Prerequisites + +- [Docker Desktop](https://www.docker.com/products/docker-desktop/) or [Rancher Desktop](https://rancherdesktop.io/) to be able to build the container images on your computer +- [Powershell](https://learn.microsoft.com/en-us/powershell/scripting/install/installing-powershell?view=powershell-7.3) installed on your computer to be able to use the build and deploy powershell scripts +- A Kubernetes cluster where Dapr and Azure IoT Operations is installed +- A Container Registry where docker images can be pushed to +- A pullsecret for the Container Registry must exist in the Kubernetes cluster. The pullsecret must have the name `aio-pullsecret`. Information on how to create a pull secret for an Azure Container Registry can be found [here](https://learn.microsoft.com/en-us/azure/container-registry/container-registry-auth-kubernetes#create-an-image-pull-secret). More generic information on how to create a pull secret can be found [here](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/). + +## Overview + +This sample consists of two .NET workloads that are communicating with each other via the IoT MQ broker. + +![Solution overview](.static/img/overview.png) + +- Raw JSON messages are published to the IoT MQ MQTT broker +- The `telemetrytransformer` component is implemented as a Console Application that uses Dapr pub/sub to receive (non cloud-event) raw messages from the IoT MQ broker +- The `telemetrytransformer` publishes cloud-event messages to the IoT MQ broker +- The `telemetrypersister` component is implemented as an ASP.NET Web API project that uses Dapr pub/sub to receive cloud-event messages from the IOT MQ broker +- The `telemetrypersister` has an additional subscription where Dapr is used to receive non cloud-event messages from an MQTT topic. + +## How to build and deploy + +- Build the .NET components using the `build.ps1` powershell script file. + This script takes 2 parameters: + - Version: this parameter is used to assign a tag to the container images that are being build for the `telemetrytransformer` and `telemetrypersister` component. + - ContainerRegistry: the URL of the container registry to where the images must be pushed. + +- Deploy the sample to your Kubernetes cluster by using the `deploy.ps1` powershell script. + This script takes 3 parameters: + - ContainerRegistry: the URL of the container registry where the images reside. + - Version: the version of the `telemetrytransformer` and `telemetrypersister` components that you want to deploy + +## Test the sample + +You can test the sample by publishing JSON messages to the `devices/+/#` MQTT topic on the IOT MQ broker. + +To do this, you first need to find out the external IP address that you can use to send messages to IoT MQ. You need the IP address of the IoT MQ aio-mq-dmqtt-frontend service. + +To do this, execute the `kubectl get svc -n azure-iot-operations` command. +This command will show all the services that exist in the namespace. One of those services is the `aio-mq-dmqtt-frontend` service. Use the external IP address of that service in the following command: + +> [!NOTE] +> The mosquitto pub needs to switch to a different auth, or we need to enable username/password. Maybe switch this to a mqtt client deployed onto the cluster similar to other tutorials + +``` +mosquitto_pub -h -t "devices/device1/sensor1" -m '{"Tag":"fan_speed","Value":35,"Timestamp":"2023-07-03T18:20:00Z"}' -q 1 -u client1 -P password +``` + +After you've published a message via the command above, you should see some information in the logs of the `telemetrytransformer` pod. For instance: + +To get the name of the pod for which to retrieve the logs, execute this command: + +``` +kubectl get pods -n azure-iot-operations +``` + +``` +kubectl logs -n azure-iot-operations telemetrytransformer-75cf9f8978-47fqm +``` +shows: +``` +<6>2023-07-04 07:54:17.711 +00:00 TelemetryTransformer.Services.DeviceTelemetryReceiver[0] OnTopicEvent called on topic devices/device1/sensor2 +<6>2023-07-04 07:54:17.711 +00:00 TelemetryTransformer.Services.DeviceTelemetryReceiver[0] payload = {"Tag":"fan_speed","Value":37,"Timestamp":"2023-07-04T18:20:00Z"} +Received message: {"Tag":"fan_speed","Value":37,"Timestamp":"2023-07-04T18:20:00Z"} +Message deserialized: +Timestamp = 07/04/2023 18:20:00 +00:00 +Tag = fan_speed +Value = 37 +<6>2023-07-04 07:54:17.712 +00:00 TelemetryTransformer.Services.DeviceTelemetryReceiver[0] Publishing message ... +<6>2023-07-04 07:54:17.760 +00:00 TelemetryTransformer.Services.DeviceTelemetryReceiver[0] Message published. +``` + +Since the `telemetrytransformer` worker has published a new message to the IoT MQ broker, and the `telemetrypersister` worker is listening to the specific topic where the message has been sent to, you should see that the `telemetrypersister` has done some work as well. + +Displaying the logs of the `telemetrypersister` should show this: + +Again, first get the exact name of the pod by executing this command: + +``` +kubectl get pods -n azure-iot-operations +``` + +Once you have the name of the pod, execute this command: + +``` +kubectl logs -n azure-iot-operations telemetrypersister-7fcf59885d-fd9qb +``` + +``` +<6>2023-07-04 07:54:17.784 +00:00 TelemetryPersister.Controllers.DeviceTelemetryController[0] DeviceTelemetry message received +<6>2023-07-04 07:54:17.784 +00:00 TelemetryPersister.Controllers.DeviceTelemetryController[0] Persisting telemetry for device: {"deviceId":"device1","timestamp":"2023-07-04T18:20:00+00:00","tag":"fan_speed","value":37} +``` + +The `telemetrypersister` also contains a subscription on a topic where raw payloads (no cloudevents) are expected. +This subscription is listening on the `commands/#` topic. + +To test this, publish a JSON message to the `commands/#` topic in the IoT MQ broker: + +> [!NOTE] +> The mosquitto pub needs to switch to a different auth, or we need to enable username/password. Maybe switch this to a mqtt client deployed onto the cluster similar to other tutorials + +``` +mosquitto_pub -h -t "commands/exec" -m '{"cmd":"reboot"}' -q 1 -u client1 -P password +``` + +The logs of the `telemetrypersister` should show this: + +``` +<6>2023-07-04 14:14:56.860 +00:00 TelemetryPersister.Controllers.DeviceTelemetryController[0] Command received: reboot +``` + +> Be aware that in this sample, all loglevels are set to 'Debug'. This means a lot of logs are generated, which is obviously not something you want to do by default in production scenarios! + +## Acknowledgements + +Big thanks to [Frederik Gheysels](https://github.com/fgheysels) for contributing this sample! diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Controllers/DeviceTelemetryController.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Controllers/DeviceTelemetryController.cs new file mode 100644 index 0000000..9a165ee --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Controllers/DeviceTelemetryController.cs @@ -0,0 +1,46 @@ +using Dapr; +using Microsoft.AspNetCore.Mvc; +using System.Text.Json; +using Dapr.Client; +using TelemetryPersister.Models; + +namespace TelemetryPersister.Controllers +{ + [Route("api/[controller]")] + [ApiController] + public class DeviceTelemetryController : ControllerBase + { + private readonly ILogger _logger; + private readonly DaprClient _daprClient; + + private static readonly JsonSerializerOptions SerializerOptions = new JsonSerializerOptions + { + WriteIndented = false + }; + + public DeviceTelemetryController(DaprClient daprClient, ILogger logger) + { + _daprClient = daprClient; + _logger = logger; + } + + [Topic(pubsubName: "telemetrypubsub", name: "devicetelemetry")] + [HttpPost("/devicetelemetery")] + public ActionResult ReceiveVesselTelemetry([FromBody] DeviceTelemetry message) + { + _logger.LogInformation("DeviceTelemetry message received"); + _logger.LogInformation($"Persisting telemetry for device: {JsonSerializer.Serialize(message, SerializerOptions)}"); + + return Ok(); + } + + [Topic(pubsubName: "telemetrypubsub", name: "commands/#", enableRawPayload: true)] + [HttpPost("/devicecommands")] + public ActionResult ReceiveVesselCommand([FromBody] CommandInfo cmd ) + { + _logger.LogInformation("CommandInfo received: " + cmd.Command); + + return Ok(); + } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Dockerfile b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Dockerfile new file mode 100644 index 0000000..8ecd6aa --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Dockerfile @@ -0,0 +1,17 @@ +FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base +WORKDIR /app +EXPOSE 80 + +FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build +WORKDIR /src +COPY ["TelemetryPersister.csproj", "TelemetryPersister/"] +RUN dotnet restore "TelemetryPersister/TelemetryPersister.csproj" +COPY . . + +FROM build AS publish +RUN dotnet publish "TelemetryPersister.csproj" -c Release -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "TelemetryPersister.dll"] \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Infrastructure/DaprRawPayloadInputFormatter.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Infrastructure/DaprRawPayloadInputFormatter.cs new file mode 100644 index 0000000..f77fd91 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Infrastructure/DaprRawPayloadInputFormatter.cs @@ -0,0 +1,41 @@ +using Microsoft.AspNetCore.Mvc.Formatters; + +namespace TelemetryPersister.Infrastructure +{ + /// + /// An InputFormatter that is responsible for formatting the raw byte-stream payloads that are transferred + /// by Dapr to a typed model that the ASP.NET controller expects. + /// + /// This InputFormatter is required to overcome the HTTP 415 'media type not supported' error. + /// See also this issue on Github: https://github.com/dapr/dotnet-sdk/issues/989 + /// + public class DaprRawPayloadInputFormatter : InputFormatter + { + public DaprRawPayloadInputFormatter() + { + SupportedMediaTypes.Add("application/octet-stream"); + } + + public override async Task ReadRequestBodyAsync(InputFormatterContext context) + { + using (MemoryStream str = new MemoryStream()) + { + try + { + await context.HttpContext.Request.Body.CopyToAsync(str); + + var jsonString = System.Text.Encoding.UTF8.GetString(str.ToArray()); + + var deserializedModel = System.Text.Json.JsonSerializer.Deserialize(jsonString, context.ModelType); + + return InputFormatterResult.Success(deserializedModel); + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); + return InputFormatterResult.Failure(); + } + } + } + } +} \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/CommandInfo.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/CommandInfo.cs new file mode 100644 index 0000000..ddf427e --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/CommandInfo.cs @@ -0,0 +1,10 @@ +using System.Text.Json.Serialization; + +namespace TelemetryPersister.Models +{ + public class CommandInfo + { + [JsonPropertyName("cmd")] + public string? Command { get; set; } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/DeviceTelemetry.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/DeviceTelemetry.cs new file mode 100644 index 0000000..d9fe5bd --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/DeviceTelemetry.cs @@ -0,0 +1,16 @@ +using System.Text.Json.Serialization; + +namespace TelemetryPersister.Models +{ + public class DeviceTelemetry + { + [JsonPropertyName("deviceId")] + public string? DeviceId{ get; set; } + [JsonPropertyName("timestamp")] + public DateTimeOffset Timestamp { get; set; } + [JsonPropertyName("tag")] + public string? Tag { get; set; } + [JsonPropertyName("value")] + public object? Value { get; set; } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Program.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Program.cs new file mode 100644 index 0000000..a4ff25b --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Program.cs @@ -0,0 +1,50 @@ +using TelemetryPersister.Infrastructure; + +namespace TelemetryPersister +{ + public class Program + { + public static void Main(string[] args) + { + var builder = WebApplication.CreateBuilder(args); + + // Add services to the container. + + builder.Services.AddControllers(options => options.InputFormatters.Add(new DaprRawPayloadInputFormatter())) + .AddDapr(); + builder.Services.AddDaprClient(); + // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle + builder.Services.AddEndpointsApiExplorer(); + builder.Services.AddSwaggerGen(); + + builder.Services.AddLogging(logBuilder => + { + logBuilder.SetMinimumLevel(LogLevel.Information); + logBuilder.AddSystemdConsole(options => + { + options.UseUtcTimestamp = true; + options.TimestampFormat = "yyyy-MM-dd HH:mm:ss.fff zzz "; + }); + }); + + var app = builder.Build(); + + // Configure the HTTP request pipeline. + if (app.Environment.IsDevelopment()) + { + app.UseSwagger(); + app.UseSwaggerUI(); + } + + app.UseAuthorization(); + + app.UseCloudEvents(); + + app.MapControllers(); + + app.MapSubscribeHandler(); + + app.Run(); + } + } +} \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Properties/launchSettings.json b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Properties/launchSettings.json new file mode 100644 index 0000000..c6454e2 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Properties/launchSettings.json @@ -0,0 +1,31 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "iisSettings": { + "windowsAuthentication": false, + "anonymousAuthentication": true, + "iisExpress": { + "applicationUrl": "http://localhost:64492", + "sslPort": 44394 + } + }, + "profiles": { + "TelemetryPersister": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "launchUrl": "swagger", + "applicationUrl": "https://localhost:7065;http://localhost:5030", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "IIS Express": { + "commandName": "IISExpress", + "launchBrowser": true, + "launchUrl": "swagger", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/TelemetryPersister.csproj b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/TelemetryPersister.csproj new file mode 100644 index 0000000..1f9db4e --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/TelemetryPersister.csproj @@ -0,0 +1,16 @@ + + + + net6.0 + enable + enable + Linux + + + + + + + + + diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/appsettings.json b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/appsettings.json new file mode 100644 index 0000000..10f68b8 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryProcessor.sln b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryProcessor.sln new file mode 100644 index 0000000..d29eca3 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryProcessor.sln @@ -0,0 +1,31 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.5.33530.505 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TelemetryTransformer", "TelemetryTransformer\TelemetryTransformer.csproj", "{B0B42D0B-909B-4546-B7ED-D991682693DA}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TelemetryPersister", "TelemetryPersister\TelemetryPersister.csproj", "{E3FCD9F6-D872-49D0-B9C8-3603F3339268}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {B0B42D0B-909B-4546-B7ED-D991682693DA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B0B42D0B-909B-4546-B7ED-D991682693DA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B0B42D0B-909B-4546-B7ED-D991682693DA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B0B42D0B-909B-4546-B7ED-D991682693DA}.Release|Any CPU.Build.0 = Release|Any CPU + {E3FCD9F6-D872-49D0-B9C8-3603F3339268}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E3FCD9F6-D872-49D0-B9C8-3603F3339268}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E3FCD9F6-D872-49D0-B9C8-3603F3339268}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E3FCD9F6-D872-49D0-B9C8-3603F3339268}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {BEE37979-4585-4BAF-A2EB-077035EB065C} + EndGlobalSection +EndGlobal diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Dockerfile b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Dockerfile new file mode 100644 index 0000000..6cda5ff --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Dockerfile @@ -0,0 +1,17 @@ +FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base +WORKDIR /app +EXPOSE 5050 + +FROM mcr.microsoft.com/dotnet/sdk:6.0-bullseye-slim-amd64 AS build +WORKDIR /src +COPY ["TelemetryTransformer.csproj", "TelemetryTransformer/"] +RUN dotnet restore "TelemetryTransformer/TelemetryTransformer.csproj" +COPY . . + +FROM build AS publish +RUN dotnet publish "TelemetryTransformer.csproj" -c Release -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "TelemetryTransformer.dll"] \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Models/DeviceTelemetry.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Models/DeviceTelemetry.cs new file mode 100644 index 0000000..dd0e2fd --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Models/DeviceTelemetry.cs @@ -0,0 +1,10 @@ +namespace TelemetryTransformer.Models +{ + public class DeviceTelemetry + { + public string DeviceId { get; set; } + public DateTimeOffset Timestamp { get; set; } + public string Tag { get; set; } + public object Value { get; set; } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Program.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Program.cs new file mode 100644 index 0000000..2fa9af0 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Program.cs @@ -0,0 +1,69 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using TelemetryTransformer.Services; + +namespace TelemetryTransformer +{ + internal class Program + { + static void Main(string[] args) + { + CreateHostBuilder(args).Build().Run(); + } + + public static IHostBuilder CreateHostBuilder(string[] args) + { + return Host + .CreateDefaultBuilder(args) + .ConfigureServices(s => + { + s.AddDaprClient(); + + }) + .ConfigureWebHostDefaults(webBuilder => + { + webBuilder.ConfigureServices(services => + { + services.AddGrpc(); + }); + + webBuilder.ConfigureKestrel(options => + { + // Setup a HTTP/2 endpoint without TLS. + options.ListenLocalhost(5050, o => o.Protocols = HttpProtocols.Http2); + }); + + webBuilder.Configure((ctx, app) => + { + app.UseRouting(); + + app.UseEndpoints(endpoints => + { + endpoints.MapGrpcService(); + + endpoints.MapGet("/", async context => + { + await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); + }); + }); + }); + + }) + .ConfigureLogging((hostingContext, logging) => + { + logging.ClearProviders(); + logging.SetMinimumLevel(LogLevel.Information); + logging.AddSystemdConsole(consoleLogging => + { + consoleLogging.UseUtcTimestamp = true; + consoleLogging.TimestampFormat = "yyyy-MM-dd HH:mm:ss.fff zzz "; + }); + }); + } + } +} \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Services/DeviceTelemetryReceiver.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Services/DeviceTelemetryReceiver.cs new file mode 100644 index 0000000..812448b --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Services/DeviceTelemetryReceiver.cs @@ -0,0 +1,161 @@ +using Dapr.AppCallback.Autogen.Grpc.v1; +using Dapr.Client; +using Dapr.Client.Autogen.Grpc.v1; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using System.Text; +using System.Text.Json; +using TelemetryTransformer.Models; + +namespace TelemetryTransformer.Services +{ + internal class DeviceTelemetryReceiver : AppCallback.AppCallbackBase + { + private readonly ILogger _logger; + private readonly DaprClient _daprClient; + + public DeviceTelemetryReceiver(DaprClient daprClient, ILogger logger) + { + _logger = logger; + _daprClient = daprClient; + } + + public override Task OnInvoke(InvokeRequest request, ServerCallContext context) + { + _logger.LogTrace("Request.Method: " + request.Method); + + _logger.LogTrace("Context.Method:" + context.Method); + _logger.LogTrace("Context.Host:" + context.Host); + _logger.LogTrace("Context.Peer:" + context.Peer); + + foreach (var h in context.RequestHeaders) + { + _logger.LogTrace(h.Key + "=" + h.Value); + } + + return Task.FromResult(new InvokeResponse()); + } + + public override Task ListTopicSubscriptions(Empty request, ServerCallContext context) + { + var result = new ListTopicSubscriptionsResponse(); + + var vesselTelemetrySubscription = new TopicSubscription + { + PubsubName = "telemetrypubsub", + Topic = "devices/+/#", + }; + vesselTelemetrySubscription.Metadata.Add("rawPayload", "true"); + + result.Subscriptions.Add(vesselTelemetrySubscription); + + return Task.FromResult(result); + } + + public override async Task OnTopicEvent(TopicEventRequest request, ServerCallContext context) + { + _logger.LogInformation("OnTopicEvent called on topic {0}", request.Topic); + _logger.LogInformation("payload = " + request.Data.ToStringUtf8()); + + await ProcessTelemetryAsync(request, _daprClient); + + + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Success }; + } + + private async Task ProcessTelemetryAsync(TopicEventRequest message, DaprClient daprClient) + { + string deviceId; + + try + { + deviceId = RetrieveDeviceIdFromTopic(message.Topic); + } + catch (InvalidOperationException ex) + { + _logger.LogError(ex, "Unable to process message"); + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop }; + } + + var deserializedMessage = DeserializeReceivedMessage(message.Data.ToByteArray()); + + if (deserializedMessage == null) + { + _logger.LogError("Unable to process message with body " + message.Data.ToStringUtf8()); + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop }; + } + + var vesselTelemetryMessage = new DeviceTelemetry() + { + DeviceId = deviceId, + Tag = deserializedMessage.Tag, + Value = deserializedMessage.Value, + Timestamp = deserializedMessage.Timestamp + }; + + try + { + _logger.LogInformation("Publishing message ..."); + + await daprClient.PublishEventAsync( + "telemetrypubsub", + "devicetelemetry", + data: vesselTelemetryMessage); + + var toCloudMetaData = new Dictionary() + { + ["rawPayload"] = "true" + }; + + await daprClient.PublishEventAsync( + "telemetrypubsub", + "telemetry_tocloud", + data: vesselTelemetryMessage, + metadata: toCloudMetaData); + + _logger.LogInformation("Message published."); + + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Success }; + } + catch (Exception ex) + { + _logger.LogError(ex, "Unexpected error occurred"); + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Retry }; + } + } + + private static string RetrieveDeviceIdFromTopic(string topic) + { + var parts = topic.Split('/'); + + if (parts.Length != 3) + { + throw new InvalidOperationException("Message received on invalid topic"); + } + + return parts[1]; + } + + private static ReceivedMessage? DeserializeReceivedMessage(byte[] message) + { + Console.WriteLine("Received message: " + Encoding.UTF8.GetString(message)); + + var deserializedMessage = JsonSerializer.Deserialize(message); + + Console.WriteLine("Message deserialized: "); + Console.WriteLine($"Timestamp = {deserializedMessage.Timestamp}"); + Console.WriteLine($"Tag = {deserializedMessage.Tag}"); + Console.WriteLine($"Value = {deserializedMessage.Value}"); + + return deserializedMessage; + } + + private class ReceivedMessage + { + public DateTimeOffset Timestamp { get; set; } + public string Tag { get; set; } + public object Value { get; set; } + } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/TelemetryTransformer.csproj b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/TelemetryTransformer.csproj new file mode 100644 index 0000000..279bbe8 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/TelemetryTransformer.csproj @@ -0,0 +1,21 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + + + +