Skip to content

Commit

Permalink
Personal/duburson/large msg perf improvements v2 (#209)
Browse files Browse the repository at this point in the history
Updated approach to the large perf message improvements. Instead of streaming, keep existing batch service but update settings to lower thresholds (buffer size of 10 instead of 100 and wait time of 10 seconds instead of 30). This will reduce overhead while still allowing for batches on egress for the perf benefits.

- Add new Normalization service, NormalizationEventConsumerService. This replaces the MeasurementEventNormalizationService + Normalize.Processor
- Fixed bug where checkpoint client will find delete blobs
- Added new Converter, EventMessageJObjectCoverter to add properties directly to the JObject rather then new object declarations.
- Add additional metrics to NormalizationEventConsumerService
  • Loading branch information
dustinburson authored Jun 24, 2022
1 parent 1087bf8 commit 022b7c3
Show file tree
Hide file tree
Showing 15 changed files with 375 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/console/MeasurementCollectionToFhir/Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
using Microsoft.Health.Events.EventConsumers;
using Microsoft.Health.Events.Model;
using Microsoft.Health.Events.Telemetry;
using Microsoft.Health.Fhir.Ingest.Console.Template;
using Microsoft.Health.Fhir.Ingest.Host;
using Microsoft.Health.Fhir.Ingest.Service;
using Microsoft.Health.Fhir.Ingest.Template;
using Microsoft.Health.Logging.Telemetry;
using Polly;

Expand Down
1 change: 0 additions & 1 deletion src/console/Normalize/Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using Microsoft.Health.Events.EventConsumers;
using Microsoft.Health.Events.Model;
using Microsoft.Health.Events.Telemetry;
using Microsoft.Health.Fhir.Ingest.Console.Template;
using Microsoft.Health.Fhir.Ingest.Data;
using Microsoft.Health.Fhir.Ingest.Service;
using Microsoft.Health.Fhir.Ingest.Telemetry;
Expand Down
1 change: 0 additions & 1 deletion src/console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using Azure.Messaging.EventHubs;
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Extensions.Configuration;
Expand Down
9 changes: 3 additions & 6 deletions src/console/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
using Microsoft.Health.Events.EventConsumers.Service;
using Microsoft.Health.Events.EventHubProcessor;
using Microsoft.Health.Events.EventProducers;
using Microsoft.Health.Events.Repository;
using Microsoft.Health.Expressions;
using Microsoft.Health.Fhir.Ingest.Console.Template;
using Microsoft.Health.Fhir.Ingest.Data;
using Microsoft.Health.Fhir.Ingest.Host;
using Microsoft.Health.Fhir.Ingest.Service;
Expand Down Expand Up @@ -64,8 +62,7 @@ public virtual TemplateManager ResolveTemplateManager(IServiceProvider servicePr
var containerOptions = new BlobContainerClientOptions();
Configuration.GetSection("TemplateStorage").Bind(containerOptions);
var containerClient = blobClientFactory.CreateStorageClient(containerOptions);
var storageManager = new StorageManager(containerClient);
var templateManager = new TemplateManager(storageManager);
var templateManager = new TemplateManager(containerClient);
return templateManager;
}

Expand All @@ -84,7 +81,7 @@ public virtual List<IEventConsumer> ResolveEventConsumers(IServiceProvider servi
var collector = ResolveEventCollector(serviceProvider);
var collectionContentFactory = serviceProvider.GetRequiredService<CollectionTemplateFactory<IContentTemplate, IContentTemplate>>();
var exceptionTelemetryProcessor = serviceProvider.GetRequiredService<NormalizationExceptionTelemetryProcessor>();
var deviceDataNormalization = new Normalize.Processor(template, templateManager, collector, logger, collectionContentFactory, exceptionTelemetryProcessor);
var deviceDataNormalization = new NormalizationEventConsumerService(new EventMessageJObjectConverter(), template, templateManager, collector, logger, collectionContentFactory, exceptionTelemetryProcessor);
eventConsumers.Add(deviceDataNormalization);
}
else if (applicationType == _measurementToFhirAppType)
Expand Down Expand Up @@ -137,7 +134,7 @@ public virtual IEventProcessingMetricMeters ResolveEventProcessingMetricMeters(I
if (applicationType == _normalizationAppType)
{
Metric processingMetric = EventMetrics.EventsConsumed(EventMetricDefinition.DeviceIngressSizeBytes);
var meter = new Events.Common.EventProcessingMeter(processingMetric);
var meter = new Events.Common.IngressBytesEventProcessingMeter(processingMetric);
var meters = new EventProcessingMetricMeters(new List<IEventProcessingMeter>() { meter });
return meters;
}
Expand Down
31 changes: 0 additions & 31 deletions src/console/Template/TemplateManager.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// -------------------------------------------------------------------------------------------------
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------
Expand All @@ -12,13 +12,13 @@

namespace Microsoft.Health.Events.Common
{
public class EventProcessingMeter : IEventProcessingMeter
public class IngressBytesEventProcessingMeter : IEventProcessingMeter
{
public EventProcessingMeter()
public IngressBytesEventProcessingMeter()
{
}

public EventProcessingMeter(Metric metric)
public IngressBytesEventProcessingMeter(Metric metric)
{
EventsProcessedMetric = metric;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Task<Checkpoint> GetCheckpointAsync()
{
var checkpoint = new Checkpoint();

foreach (BlobItem blob in _storageClient.GetBlobs(traits: BlobTraits.Metadata, states: BlobStates.All, prefix: prefix, cancellationToken: cancellationToken))
foreach (BlobItem blob in _storageClient.GetBlobs(traits: BlobTraits.Metadata, states: BlobStates.None, prefix: prefix, cancellationToken: cancellationToken))
{
var partitionId = blob.Name.Split('/').Last();
DateTimeOffset lastEventTimestamp = DateTime.MinValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static Metric EventWatermark(string partitionId)
/// Signals the timestamp corresponding to the last processed event per partition.
/// </summary>
/// <param name="partitionId">The partition id of the event hub</param>
/// <param name="triggerReason">The trigger that caused the events to be flushed and processed </param>
/// <param name="triggerReason">The trigger that caused the events to be flushed and processed</param
public static Metric EventTimestampLastProcessedPerPartition(string partitionId, string triggerReason)
{
return EventMetricDefinition.EventTimestampLastProcessedPerPartition
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.IO;
using System.Text;
using EnsureThat;
using Microsoft.Health.Events.Model;
using Microsoft.Toolkit.HighPerformance;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Health.Fhir.Ingest.Data
{
public class EventMessageJObjectConverter : IConverter<IEventMessage, JObject>
{
private const string BodyAttr = "Body";
private const string PropertiesAttr = "Properties";
private const string SystemPropertiesAttr = "SystemProperties";

private readonly JsonLoadSettings loadSettings = new () { LineInfoHandling = LineInfoHandling.Ignore };

private readonly JsonSerializer jsonSerializer = JsonSerializer.CreateDefault();

public JObject Convert(IEventMessage input)
{
EnsureArg.IsNotNull(input, nameof(input));

JObject token = new ();
JToken body = null;

if (input.Body.Length > 0)
{
using StreamReader streamReader = new StreamReader(input.Body.AsStream(), Encoding.UTF8);
using JsonReader jsonReader = new JsonTextReader(streamReader);
body = JToken.ReadFrom(jsonReader, loadSettings);
}

token[BodyAttr] = body;
token[PropertiesAttr] = JToken.FromObject(input.Properties, jsonSerializer);
token[SystemPropertiesAttr] = JToken.FromObject(input.SystemProperties, jsonSerializer);

return token;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Toolkit.HighPerformance" Version="7.1.2" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="Polly" Version="7.2.2" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Loading

0 comments on commit 022b7c3

Please sign in to comment.