Skip to content

Commit

Permalink
Allow the ResourceReadyEvent to block waiters (#7163)
Browse files Browse the repository at this point in the history
* Allow the ResourceReadyEvent to block waiters
- This allows more easier coordination after a resource is healthy but before waiters are unblocked. This is useful for seeding etc.
  • Loading branch information
davidfowl authored Jan 22, 2025
1 parent 405a54b commit facc75f
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 12 deletions.
34 changes: 33 additions & 1 deletion playground/mongo/Mongo.AppHost/Program.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Bson;
using MongoDB.Driver;

var builder = DistributedApplication.CreateBuilder(args);

var db = builder.AddMongoDB("mongo")
.WithMongoExpress(c => c.WithHostPort(3022))
.AddDatabase("db");

builder.Eventing.Subscribe<ResourceReadyEvent>(db.Resource, async (@event, ct) =>
{
// Artificial delay to demonstrate the waiting
await Task.Delay(TimeSpan.FromSeconds(10), ct);

// Seed the database with some data
var cs = await db.Resource.ConnectionStringExpression.GetValueAsync(ct);
using var client = new MongoClient(cs);

const string collectionName = "entries";

var myDb = client.GetDatabase("db");
await myDb.CreateCollectionAsync(collectionName, cancellationToken: ct);

for (int i = 0; i < 10; i++)
{
await myDb.GetCollection<Entry>(collectionName).InsertOneAsync(new Entry(), cancellationToken: ct);
}
});

builder.AddProject<Projects.Mongo_ApiService>("api")
.WithExternalHttpEndpoints()
.WithReference(db).WaitFor(db);
.WithReference(db)
.WaitFor(db);

#if !SKIP_DASHBOARD_REFERENCE
// This project is only added in playground projects to support development/debugging
Expand All @@ -22,3 +47,10 @@
#endif

builder.Build().Run();

public sealed class Entry
{
[BsonId]
[BsonRepresentation(BsonType.ObjectId)]
public string? Id { get; set; }
}
11 changes: 11 additions & 0 deletions src/Aspire.Hosting/ApplicationModel/CustomResourceSnapshot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public ResourceStateSnapshot? State
/// </summary>
public int? ExitCode { get; init; }

/// <summary>
/// A snapshot of the event that indicates the resource is ready.
/// </summary>
internal EventSnapshot? ResourceReadyEvent { get; init; }

/// <summary>
/// Gets the health status of the resource.
/// </summary>
Expand Down Expand Up @@ -130,6 +135,12 @@ internal init
}
}

/// <summary>
/// A snapshot of an event.
/// </summary>
/// <param name="EventTask">The task the represents the result of executing the event.</param>
internal record EventSnapshot(Task EventTask);

/// <summary>
/// A snapshot of the resource state
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ private async Task WaitUntilHealthyAsync(IResource resource, IResource dependenc
await WaitForResourceHealthyAsync(dependency.Name, cancellationToken).ConfigureAwait(false);
}

// Now wait for the resource ready event to be executed.
resourceLogger.LogInformation("Waiting for resource ready to execute for '{Name}'.", dependency.Name);
resourceEvent = await WaitForResourceAsync(dependency.Name, re => re.Snapshot.ResourceReadyEvent is not null, cancellationToken: cancellationToken).ConfigureAwait(false);

// Observe the result of the resource ready event task
await resourceEvent.Snapshot.ResourceReadyEvent!.EventTask.WaitAsync(cancellationToken).ConfigureAwait(false);

resourceLogger.LogInformation("Finished waiting for resource '{Name}'.", dependency.Name);

static bool IsContinuableState(CustomResourceSnapshot snapshot) =>
Expand Down
36 changes: 26 additions & 10 deletions src/Aspire.Hosting/Health/ResourceHealthCheckService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,37 @@ private async Task MonitorResourceHealthAsync(ResourceEvent initialEvent, Cancel
var resource = initialEvent.Resource;
var resourceReadyEventFired = false;

void FireResourceReadyEvent()
{
// We don't want to block the monitoring loop while we fire the event.
_ = Task.Run(async () =>
{
var resourceReadyEvent = new ResourceReadyEvent(resource, services);

// Execute the publish and store the task so that waiters can await it and observe the result.
var task = eventing.PublishAsync(resourceReadyEvent, cancellationToken);

// Suppress exceptions, we just want to make sure that the event is completed.
await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);

await resourceNotificationService.PublishUpdateAsync(resource, s => s with
{
ResourceReadyEvent = new(task)
})
.ConfigureAwait(false);
},
cancellationToken);
}

if (!resource.TryGetAnnotationsIncludingAncestorsOfType<HealthCheckAnnotation>(out var annotations))
{
// NOTE: If there are no health check annotations then there
// is currently nothing to monitor. At this point in time we don't
// dynamically add health checks at runtime. If this changes then we
// would need to revisit this and scan for transitive health checks
// on a periodic basis (you wouldn't want to do it on every pass.
var resourceReadyEvent = new ResourceReadyEvent(resource, services);
await eventing.PublishAsync(
resourceReadyEvent,
EventDispatchBehavior.NonBlockingSequential,
cancellationToken).ConfigureAwait(false);
FireResourceReadyEvent();

return;
}

Expand All @@ -76,11 +95,8 @@ await eventing.PublishAsync(
if (!resourceReadyEventFired && report.Status == HealthStatus.Healthy)
{
resourceReadyEventFired = true;
var resourceReadyEvent = new ResourceReadyEvent(resource, services);
await eventing.PublishAsync(
resourceReadyEvent,
EventDispatchBehavior.NonBlockingSequential,
cancellationToken).ConfigureAwait(false);

FireResourceReadyEvent();
}

var latestEvent = _latestEvents.GetValueOrDefault(resource.Name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void Configure(DistributedApplicationOptions applicationOptions, HostApplication
public TestDistributedApplicationBuilder WithTestAndResourceLogging(ITestOutputHelper testOutputHelper)
{
Services.AddXunitLogging(testOutputHelper);
Services.AddHostedService<ResourceLoggerForwarderService>();
Services.Insert(0, ServiceDescriptor.Singleton<IHostedService, ResourceLoggerForwarderService>());
Services.AddLogging(builder =>
{
builder.AddFilter("Aspire.Hosting", LogLevel.Trace);
Expand Down
64 changes: 64 additions & 0 deletions tests/Aspire.Hosting.Tests/WaitForTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Aspire.Hosting.Utils;
using Microsoft.AspNetCore.InternalTesting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -216,6 +217,69 @@ await rns.PublishUpdateAsync(dependency.Resource, s => s with
await app.StopAsync();
}

[Fact]
[RequiresDocker]
public async Task WaitForObservedResultOfResourceReadyEvent()
{
using var builder = TestDistributedApplicationBuilder.Create().WithTestAndResourceLogging(testOutputHelper);

builder.Services.AddLogging(b =>
{
b.AddFakeLogging();
});

var resourceReadyTcs = new TaskCompletionSource();
var dependency = builder.AddResource(new CustomResource("test"));
var nginx = builder.AddContainer("nginx", "mcr.microsoft.com/cbl-mariner/base/nginx", "1.22")
.WithReference(dependency)
.WaitFor(dependency);

builder.Eventing.Subscribe<ResourceReadyEvent>(dependency.Resource, (e, ct) => resourceReadyTcs.Task);

using var app = builder.Build();

// StartAsync will currently block until the dependency resource moves
// into a Finished state, so rather than awaiting it we'll hold onto the
// task so we can inspect the state of the Nginx resource which should
// be in a waiting state if everything is working correctly.
var startupCts = AsyncTestHelpers.CreateDefaultTimeoutTokenSource(TestConstants.LongTimeoutDuration);
var startTask = app.StartAsync(startupCts.Token);

// We don't want to wait forever for Nginx to move into a waiting state,
// it should be super quick, but we'll allow 60 seconds just in case the
// CI machine is chugging (also useful when collecting code coverage).
var waitingStateCts = AsyncTestHelpers.CreateDefaultTimeoutTokenSource(TestConstants.LongTimeoutDuration);

var rns = app.Services.GetRequiredService<ResourceNotificationService>();
await rns.WaitForResourceAsync(nginx.Resource.Name, "Waiting", waitingStateCts.Token);

// Now that we know we successfully entered the Waiting state, we can swap
// the dependency into a running state which will unblock startup and
// we can continue executing.
await rns.PublishUpdateAsync(dependency.Resource, s => s with
{
State = KnownResourceStates.Running
});

resourceReadyTcs.SetException(new InvalidOperationException("The resource ready event failed!"));

// This time we want to wait for Nginx to move into a Running state to verify that
// it successfully started after we moved the dependency resource into the Finished, but
// we need to give it more time since we have to download the image in CI.
var runningStateCts = AsyncTestHelpers.CreateDefaultTimeoutTokenSource(TestConstants.LongTimeoutDuration);
await rns.WaitForResourceAsync(nginx.Resource.Name, KnownResourceStates.FailedToStart, runningStateCts.Token);

await startTask;

var collector = app.Services.GetFakeLogCollector();
var logs = collector.GetSnapshot();

// Just looking for a common message in Docker build output.
Assert.Contains(logs, log => log.Message.Contains("The resource ready event failed!"));

await app.StopAsync();
}

[Fact]
[RequiresDocker]
public async Task EnsureDependencyResourceThatReturnsNonMatchingExitCodeResultsInDependentResourceFailingToStart()
Expand Down

0 comments on commit facc75f

Please sign in to comment.