diff --git a/src/console/MeasurementCollectionToFhir/Processor.cs b/src/console/MeasurementCollectionToFhir/Processor.cs index 20eebe88..3a1495fa 100644 --- a/src/console/MeasurementCollectionToFhir/Processor.cs +++ b/src/console/MeasurementCollectionToFhir/Processor.cs @@ -13,9 +13,9 @@ using Microsoft.Health.Events.EventConsumers; using Microsoft.Health.Events.Model; using Microsoft.Health.Events.Telemetry; -using Microsoft.Health.Fhir.Ingest.Console.Template; using Microsoft.Health.Fhir.Ingest.Host; using Microsoft.Health.Fhir.Ingest.Service; +using Microsoft.Health.Fhir.Ingest.Template; using Microsoft.Health.Logging.Telemetry; using Polly; diff --git a/src/console/Normalize/Processor.cs b/src/console/Normalize/Processor.cs index 105b791b..88d20e5f 100644 --- a/src/console/Normalize/Processor.cs +++ b/src/console/Normalize/Processor.cs @@ -13,7 +13,6 @@ using Microsoft.Health.Events.EventConsumers; using Microsoft.Health.Events.Model; using Microsoft.Health.Events.Telemetry; -using Microsoft.Health.Fhir.Ingest.Console.Template; using Microsoft.Health.Fhir.Ingest.Data; using Microsoft.Health.Fhir.Ingest.Service; using Microsoft.Health.Fhir.Ingest.Telemetry; diff --git a/src/console/Program.cs b/src/console/Program.cs index 849b46f2..4d7a364a 100644 --- a/src/console/Program.cs +++ b/src/console/Program.cs @@ -3,7 +3,6 @@ // Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. // ------------------------------------------------------------------------------------------------- -using Azure.Messaging.EventHubs; using Microsoft.ApplicationInsights; using Microsoft.ApplicationInsights.Extensibility; using Microsoft.Extensions.Configuration; diff --git a/src/console/Startup.cs b/src/console/Startup.cs index af06df33..3b7b75c9 100644 --- a/src/console/Startup.cs +++ b/src/console/Startup.cs @@ -17,9 +17,7 @@ using Microsoft.Health.Events.EventConsumers.Service; using Microsoft.Health.Events.EventHubProcessor; using Microsoft.Health.Events.EventProducers; -using Microsoft.Health.Events.Repository; using Microsoft.Health.Expressions; -using Microsoft.Health.Fhir.Ingest.Console.Template; using Microsoft.Health.Fhir.Ingest.Data; using Microsoft.Health.Fhir.Ingest.Host; using Microsoft.Health.Fhir.Ingest.Service; @@ -64,8 +62,7 @@ public virtual TemplateManager ResolveTemplateManager(IServiceProvider servicePr var containerOptions = new BlobContainerClientOptions(); Configuration.GetSection("TemplateStorage").Bind(containerOptions); var containerClient = blobClientFactory.CreateStorageClient(containerOptions); - var storageManager = new StorageManager(containerClient); - var templateManager = new TemplateManager(storageManager); + var templateManager = new TemplateManager(containerClient); return templateManager; } @@ -84,7 +81,7 @@ public virtual List ResolveEventConsumers(IServiceProvider servi var collector = ResolveEventCollector(serviceProvider); var collectionContentFactory = serviceProvider.GetRequiredService>(); var exceptionTelemetryProcessor = serviceProvider.GetRequiredService(); - var deviceDataNormalization = new Normalize.Processor(template, templateManager, collector, logger, collectionContentFactory, exceptionTelemetryProcessor); + var deviceDataNormalization = new NormalizationEventConsumerService(new EventMessageJObjectConverter(), template, templateManager, collector, logger, collectionContentFactory, exceptionTelemetryProcessor); eventConsumers.Add(deviceDataNormalization); } else if (applicationType == _measurementToFhirAppType) @@ -137,7 +134,7 @@ public virtual IEventProcessingMetricMeters ResolveEventProcessingMetricMeters(I if (applicationType == _normalizationAppType) { Metric processingMetric = EventMetrics.EventsConsumed(EventMetricDefinition.DeviceIngressSizeBytes); - var meter = new Events.Common.EventProcessingMeter(processingMetric); + var meter = new Events.Common.IngressBytesEventProcessingMeter(processingMetric); var meters = new EventProcessingMetricMeters(new List() { meter }); return meters; } diff --git a/src/console/Template/TemplateManager.cs b/src/console/Template/TemplateManager.cs deleted file mode 100644 index d1d55278..00000000 --- a/src/console/Template/TemplateManager.cs +++ /dev/null @@ -1,31 +0,0 @@ -// ------------------------------------------------------------------------------------------------- -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. -// ------------------------------------------------------------------------------------------------- - -using Microsoft.Health.Events.Repository; -using System.Text; - -namespace Microsoft.Health.Fhir.Ingest.Console.Template -{ - public class TemplateManager : ITemplateManager - { - private IRepositoryManager _respositoryManager; - public TemplateManager(IRepositoryManager repositoryManager) - { - _respositoryManager = repositoryManager; - } - - public byte[] GetTemplate(string templateName) - { - return _respositoryManager.GetItem(templateName); - } - - public string GetTemplateAsString(string templateName) - { - var templateBuffer = GetTemplate(templateName); - string templateContent = Encoding.UTF8.GetString(templateBuffer, 0, templateBuffer.Length); - return templateContent; - } - } -} diff --git a/src/lib/Microsoft.Health.Events/Common/EventProcessingMeter.cs b/src/lib/Microsoft.Health.Events/Common/IngressBytesEventProcessingMeter.cs similarity index 83% rename from src/lib/Microsoft.Health.Events/Common/EventProcessingMeter.cs rename to src/lib/Microsoft.Health.Events/Common/IngressBytesEventProcessingMeter.cs index b40e25d2..94d660ec 100644 --- a/src/lib/Microsoft.Health.Events/Common/EventProcessingMeter.cs +++ b/src/lib/Microsoft.Health.Events/Common/IngressBytesEventProcessingMeter.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------------------------------- +// ------------------------------------------------------------------------------------------------- // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. // ------------------------------------------------------------------------------------------------- @@ -12,13 +12,13 @@ namespace Microsoft.Health.Events.Common { - public class EventProcessingMeter : IEventProcessingMeter + public class IngressBytesEventProcessingMeter : IEventProcessingMeter { - public EventProcessingMeter() + public IngressBytesEventProcessingMeter() { } - public EventProcessingMeter(Metric metric) + public IngressBytesEventProcessingMeter(Metric metric) { EventsProcessedMetric = metric; } diff --git a/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs b/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs index 65544933..25d70933 100644 --- a/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs +++ b/src/lib/Microsoft.Health.Events/EventCheckpointing/StorageCheckpointClient.cs @@ -95,7 +95,7 @@ Task GetCheckpointAsync() { var checkpoint = new Checkpoint(); - foreach (BlobItem blob in _storageClient.GetBlobs(traits: BlobTraits.Metadata, states: BlobStates.All, prefix: prefix, cancellationToken: cancellationToken)) + foreach (BlobItem blob in _storageClient.GetBlobs(traits: BlobTraits.Metadata, states: BlobStates.None, prefix: prefix, cancellationToken: cancellationToken)) { var partitionId = blob.Name.Split('/').Last(); DateTimeOffset lastEventTimestamp = DateTime.MinValue; diff --git a/src/lib/Microsoft.Health.Events/Telemetry/Metrics/EventMetrics.cs b/src/lib/Microsoft.Health.Events/Telemetry/Metrics/EventMetrics.cs index 426824ff..3154bcf3 100644 --- a/src/lib/Microsoft.Health.Events/Telemetry/Metrics/EventMetrics.cs +++ b/src/lib/Microsoft.Health.Events/Telemetry/Metrics/EventMetrics.cs @@ -91,7 +91,7 @@ public static Metric EventWatermark(string partitionId) /// Signals the timestamp corresponding to the last processed event per partition. /// /// The partition id of the event hub - /// The trigger that caused the events to be flushed and processed + /// The trigger that caused the events to be flushed and processed + { + private const string BodyAttr = "Body"; + private const string PropertiesAttr = "Properties"; + private const string SystemPropertiesAttr = "SystemProperties"; + + private readonly JsonLoadSettings loadSettings = new () { LineInfoHandling = LineInfoHandling.Ignore }; + + private readonly JsonSerializer jsonSerializer = JsonSerializer.CreateDefault(); + + public JObject Convert(IEventMessage input) + { + EnsureArg.IsNotNull(input, nameof(input)); + + JObject token = new (); + JToken body = null; + + if (input.Body.Length > 0) + { + using StreamReader streamReader = new StreamReader(input.Body.AsStream(), Encoding.UTF8); + using JsonReader jsonReader = new JsonTextReader(streamReader); + body = JToken.ReadFrom(jsonReader, loadSettings); + } + + token[BodyAttr] = body; + token[PropertiesAttr] = JToken.FromObject(input.Properties, jsonSerializer); + token[SystemPropertiesAttr] = JToken.FromObject(input.SystemProperties, jsonSerializer); + + return token; + } + } +} diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Microsoft.Health.Fhir.Ingest.csproj b/src/lib/Microsoft.Health.Fhir.Ingest/Microsoft.Health.Fhir.Ingest.csproj index e7072fed..056f16b2 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Microsoft.Health.Fhir.Ingest.csproj +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Microsoft.Health.Fhir.Ingest.csproj @@ -31,7 +31,9 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Service/NormalizationEventConsumerService.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Service/NormalizationEventConsumerService.cs new file mode 100644 index 00000000..d765a227 --- /dev/null +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Service/NormalizationEventConsumerService.cs @@ -0,0 +1,218 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EnsureThat; +using Microsoft.Health.Common.Telemetry; +using Microsoft.Health.Events.EventConsumers; +using Microsoft.Health.Events.Model; +using Microsoft.Health.Events.Telemetry; +using Microsoft.Health.Fhir.Ingest.Data; +using Microsoft.Health.Fhir.Ingest.Telemetry; +using Microsoft.Health.Fhir.Ingest.Template; +using Microsoft.Health.Logging.Telemetry; +using Newtonsoft.Json.Linq; +using Polly; + +namespace Microsoft.Health.Fhir.Ingest.Service +{ + public class NormalizationEventConsumerService : IEventConsumer + { + private readonly IConverter _converter; + private readonly string _templateDefinition; + private readonly ITemplateManager _templateManager; + private readonly ITelemetryLogger _logger; + private readonly IEnumerableAsyncCollector _collector; + private readonly AsyncPolicy _retryPolicy; + private readonly CollectionTemplateFactory _collectionTemplateFactory; + private readonly IExceptionTelemetryProcessor _exceptionTelemetryProcessor; + + private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); + + public NormalizationEventConsumerService( + IConverter converter, + string templateDefinition, + ITemplateManager templateManager, + IEnumerableAsyncCollector collector, + ITelemetryLogger logger, + CollectionTemplateFactory collectionTemplateFactory, + NormalizationExceptionTelemetryProcessor exceptionTelemetryProcessor) + { + _converter = EnsureArg.IsNotNull(converter, nameof(converter)); + _templateDefinition = EnsureArg.IsNotNullOrWhiteSpace(templateDefinition, nameof(templateDefinition)); + _templateManager = EnsureArg.IsNotNull(templateManager, nameof(templateManager)); + _collector = EnsureArg.IsNotNull(collector, nameof(collector)); + _logger = EnsureArg.IsNotNull(logger, nameof(logger)); + _retryPolicy = CreateRetryPolicy(logger); + _collectionTemplateFactory = EnsureArg.IsNotNull(collectionTemplateFactory, nameof(collectionTemplateFactory)); + _exceptionTelemetryProcessor = EnsureArg.IsNotNull(exceptionTelemetryProcessor, nameof(exceptionTelemetryProcessor)); + + EventMetrics.SetConnectorOperation(ConnectorOperation.Normalization); + } + + private (IContentTemplate template, DateTimeOffset timestamp) NormalizationTemplate { get; set; } + + public async Task ConsumeAsync(IEnumerable events) + { + EnsureArg.IsNotNull(events); + + await _retryPolicy.ExecuteAsync(async () => await ConsumeAsyncImpl(events)); + } + + private async Task GetNormalizationTemplate() + { + await semaphore.WaitAsync(); + try + { + if (NormalizationTemplate.template == null) + { + _logger.LogTrace("Initializing normalization template from blob."); + DateTimeOffset timestamp = DateTimeOffset.UtcNow; + var content = await _templateManager.GetTemplateContentIfChangedSince(_templateDefinition); + var templateContext = _collectionTemplateFactory.Create(content); + templateContext.EnsureValid(); + NormalizationTemplate = (templateContext.Template, timestamp); + } + else + { + DateTimeOffset updatedTimestamp = DateTimeOffset.UtcNow; + var content = await _templateManager.GetTemplateContentIfChangedSince(_templateDefinition, NormalizationTemplate.timestamp); + + if (content != null) + { + _logger.LogTrace("New normalization template content detected, updating template."); + var templateContext = _collectionTemplateFactory.Create(content); + templateContext.EnsureValid(); + NormalizationTemplate = (templateContext.Template, updatedTimestamp); + } + } + + return NormalizationTemplate.template; + } + finally + { + semaphore.Release(); + } + } + + private async Task ConsumeAsyncImpl(IEnumerable events) + { + var template = await GetNormalizationTemplate(); + + var normalizationBatch = new List<(string sourcePartition, IMeasurement measurement)>(50); + + foreach (var evt in events) + { + ProcessEvent(evt, template, normalizationBatch); + } + + // Send normalized events + await _collector.AddAsync(items: normalizationBatch.Select(data => data.measurement), cancellationToken: CancellationToken.None); + + // Record Normalized Event Telemetry to correct partition + foreach (var item in normalizationBatch) + { + _logger.LogMetric(IomtMetrics.NormalizedEvent(item.sourcePartition), 1); + } + } + + private void ProcessEvent(IEventMessage evt, IContentTemplate template, IList<(string sourcePartition, IMeasurement measurement)> collector) + { + try + { + RecordIngressMetrics(evt, _logger); + + var token = _converter.Convert(evt); + + NormalizeMessage(token: token, evt: evt, template: template, collector: collector); + } + catch (Exception ex) + { + if (!_exceptionTelemetryProcessor.HandleException(ex, _logger)) + { + throw; // Immediately throw original exception if it is not handled + } + } + } + + private void NormalizeMessage(JObject token, IEventMessage evt, IContentTemplate template, IList<(string sourcePartition, IMeasurement measurement)> collector) + { + Stopwatch sw = Stopwatch.StartNew(); + int projections = 0; + + foreach (var measurement in template.GetMeasurements(token)) + { + try + { + measurement.IngestionTimeUtc = evt.EnqueuedTime.UtcDateTime; + collector.Add((evt.PartitionId, measurement)); + projections++; + } + catch (Exception ex) + { + // Translate all Normalization Mapping exceptions into a common type for easy identification. + throw new NormalizationDataMappingException(ex); + } + } + + sw.Stop(); + + RecordNormalizationEventMetrics(evt, projections, sw.Elapsed, _logger); + } + + private static void RecordIngressMetrics(IEventMessage evt, ITelemetryLogger log) + { + TimeSpan deviceEventProcessingLatency = DateTime.UtcNow - evt.EnqueuedTime.UtcDateTime; + + log.LogMetric( + IomtMetrics.DeviceEventProcessingLatency(evt.PartitionId), + deviceEventProcessingLatency.TotalSeconds); + + log.LogMetric( + IomtMetrics.DeviceEventProcessingLatencyMs(evt.PartitionId), + deviceEventProcessingLatency.TotalSeconds); + } + + private static void RecordNormalizationEventMetrics(IEventMessage evt, int projectedMessages, TimeSpan duration, ITelemetryLogger log) + { + log.LogMetric(IomtMetrics.NormalizedEventGenerationTimeMs(evt.PartitionId), duration.TotalMilliseconds); + + if (projectedMessages == 0) + { + log.LogTrace($"No measurements projected for event {evt.SequenceNumber}."); + log.LogMetric(IomtMetrics.DroppedEvent(evt.PartitionId), 1); + } + } + + private static AsyncPolicy CreateRetryPolicy(ITelemetryLogger logger) + { + // Retry on any unhandled exceptions. + // TODO (WI - 86288): Handled exceptions (eg: data errors) will not be retried upon indefinitely. + bool ExceptionRetryableFilter(Exception ee) + { + logger.LogTrace($"Encountered retryable/unhandled exception {ee.GetType()}"); + logger.LogError(ee); + TrackExceptionMetric(ee, logger); + return true; + } + + return Policy + .Handle(ExceptionRetryableFilter) + .WaitAndRetryForeverAsync(retryCount => TimeSpan.FromSeconds(Math.Min(30, Math.Pow(2, retryCount)))); + } + + private static void TrackExceptionMetric(Exception exception, ITelemetryLogger logger) + { + var type = exception.GetType().ToString(); + var metric = type.ToErrorMetric(ConnectorOperation.Normalization, ErrorType.DeviceMessageError, ErrorSeverity.Warning); + logger.LogMetric(metric, 1); + } + } +} diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs index 6e9e0411..fdf031b7 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetricDefinition.cs @@ -33,5 +33,7 @@ private IomtMetricDefinition(string metricName) public static IomtMetricDefinition MeasurementIngestionLatency { get; } = new IomtMetricDefinition(nameof(MeasurementIngestionLatency)); public static IomtMetricDefinition MeasurementIngestionLatencyMs { get; } = new IomtMetricDefinition(nameof(MeasurementIngestionLatencyMs)); + + public static IomtMetricDefinition DroppedEvent { get; } = new IomtMetricDefinition(nameof(DroppedEvent)); } } diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs index f180c42e..cfcbf325 100644 --- a/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Telemetry/Metrics/IomtMetrics.cs @@ -30,7 +30,7 @@ public static class IomtMetrics /// /// The latency between event ingestion and output to FHIR processor. /// - /// The partition id of the input events being consumed from the event hub partition + /// The partition id of the input events being consumed from the event hub partition /// The latency between event ingestion and output to FHIR processor, in milliseconds. /// - /// The partition id of the input events being consumed from the event hub partition + /// The partition id of the input events being consumed from the event hub partition /// The number of measurement groups generated by the FHIR processor based on provided input. /// - /// The partition id of the input events being consumed from the event hub partition + /// The partition id of the input events being consumed from the event hub partition /// The number of measurement readings to import to FHIR. /// - /// The partition id of the input events being consumed from the event hub partition + /// The partition id of the input events being consumed from the event hub partition /// The number of input events received. /// - /// The partition id of the events being consumed from the event hub partition + /// The partition id of the events being consumed from the event hub partition /// The number of normalized events generated for further processing. /// - /// The partition id of the events being consumed from the event hub partition + /// The partition id of the events being consumed from the event hub partition + /// The number of input device events with no normalized events. + /// + /// The partition id of the events being consumed from the event hub partition + public static Metric DroppedEvent(string partitionId = null) + { + return IomtMetricDefinition.DroppedEvent + .CreateBaseMetric(Category.Traffic, ConnectorOperation.Normalization) + .AddDimension(_partitionDimension, partitionId); + } + /// /// The latency between the event ingestion time and normalization processing. An increase here indicates a backlog of messages to process. /// - /// The partition id of the events being consumed from the event hub partition + /// The partition id of the events being consumed from the event hub partition /// The latency between the event ingestion time and normalization processing, in milliseconds. An increase here indicates a backlog of messages to process. /// - /// The partition id of the events being consumed from the event hub partition + /// The partition id of the events being consumed from the event hub partition /// The time it takes to generate a Normalized Event. /// - /// The partition id of the events being consumed from the event hub partition + /// The partition id of the events being consumed from the event hub partition GetTemplateContentIfChangedSince(string templateName, DateTimeOffset contentTimestamp = default, CancellationToken cancellationToken = default); } } diff --git a/src/lib/Microsoft.Health.Fhir.Ingest/Template/TemplateManager.cs b/src/lib/Microsoft.Health.Fhir.Ingest/Template/TemplateManager.cs new file mode 100644 index 00000000..5dc90f7a --- /dev/null +++ b/src/lib/Microsoft.Health.Fhir.Ingest/Template/TemplateManager.cs @@ -0,0 +1,69 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using EnsureThat; + +namespace Microsoft.Health.Fhir.Ingest.Template +{ + public class TemplateManager : ITemplateManager + { + public TemplateManager(BlobContainerClient blobContainer) + { + BlobContainer = EnsureArg.IsNotNull(blobContainer, nameof(blobContainer)); + } + + protected BlobContainerClient BlobContainer { get; } + + public byte[] GetTemplate(string templateName) + { + return GetBlobContent(templateName); + } + + public string GetTemplateAsString(string templateName) + { + var templateBuffer = GetTemplate(templateName); + string templateContent = Encoding.UTF8.GetString(templateBuffer, 0, templateBuffer.Length); + return templateContent; + } + + protected byte[] GetBlobContent(string itemName) + { + EnsureArg.IsNotNull(itemName); + + var blockBlob = BlobContainer.GetBlobClient(itemName); + + using var memoryStream = new MemoryStream(); + blockBlob.DownloadTo(memoryStream); + byte[] itemContent = memoryStream.ToArray(); + return itemContent; + } + + public async Task GetTemplateContentIfChangedSince(string templateName, DateTimeOffset contentTimestamp = default, CancellationToken cancellationToken = default) + { + EnsureArg.IsNotNullOrWhiteSpace(templateName, nameof(templateName)); + + var blobClient = BlobContainer.GetBlobClient(templateName); + BlobRequestConditions conditions = new () { IfModifiedSince = contentTimestamp }; + + using var ms = new MemoryStream(); + await blobClient.DownloadToAsync(ms, conditions: conditions, cancellationToken: cancellationToken); + + if (ms.Length == 0) + { + // No new content found, return null. + return null; + } + + return Encoding.UTF8.GetString(ms.ToArray()); + } + } +}