Skip to content

Commit

Permalink
Fixes + unit tests for streaming PubSub implementation (#1415)
Browse files Browse the repository at this point in the history
* Added null check - the proto suggests this shouldn't ever be null, but there's an issue reporting as much, so this fixes that

Signed-off-by: Whit Waldo <[email protected]>

* Removed the Task.WhenAll making the operation non-blocking

Signed-off-by: Whit Waldo <[email protected]>

* Added unit test to validate that the subscription is no longer blocking

Signed-off-by: Whit Waldo <[email protected]>

* Removed unused line from previous test, added another test

Signed-off-by: Whit Waldo <[email protected]>

* Added another test

Signed-off-by: Whit Waldo <[email protected]>

* More unit tests

Signed-off-by: Whit Waldo <[email protected]>

* Added more unit tests

Signed-off-by: Whit Waldo <[email protected]>

* Updated to make DaprPublishSubscribeClientBuilder configurable via a registered IConfiguration

Signed-off-by: Whit Waldo <[email protected]>

* Added missing copyright statements

Signed-off-by: Whit Waldo <[email protected]>

* Added missing package reference

Signed-off-by: Whit Waldo <[email protected]>

* Fixed bad reference (missed in merge)

Signed-off-by: Whit Waldo <[email protected]>

* Fixed failing unit test

Signed-off-by: Whit Waldo <[email protected]>

* Tweak to only pass along EventMessage payloads to developers as it's expected that the initial response will be null if EventMessage is populated

Signed-off-by: Whit Waldo <[email protected]>

* Was missing assignment of the Data property in the TopicMessage. Shout out to both @tommorvolloriddle and @Aimless321 for catching this!

Signed-off-by: Whit Waldo <[email protected]>

* Fix - return would be bad. Continue is the right move.

Signed-off-by: Whit Waldo <[email protected]>

* Added a simple test

Signed-off-by: Whit Waldo <[email protected]>

* Fixed unit tests

Signed-off-by: Whit Waldo <[email protected]>

* Merged in tweaks from #1422

Signed-off-by: Whit Waldo <[email protected]>

---------

Signed-off-by: Whit Waldo <[email protected]>
  • Loading branch information
WhitWaldo authored Dec 11, 2024
1 parent 3a930c2 commit 3d500e8
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 17 deletions.
18 changes: 18 additions & 0 deletions src/Dapr.Messaging/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Dapr.Messaging.Test, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b1f597635c44597fcecb493e2b1327033b29b1a98ac956a1a538664b68f87d45fbaada0438a15a6265e62864947cc067d8da3a7d93c5eb2fcbb850e396c8684dba74ea477d82a1bbb18932c0efb30b64ff1677f85ae833818707ac8b49ad8062ca01d2c89d8ab1843ae73e8ba9649cd28666b539444dcdee3639f95e2a099bb2")]


Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClien
/// </summary>
public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient, string? daprApiToken)
{
Client = client;
this.Client = client;
this.HttpClient = httpClient;
this.DaprApiToken = daprApiToken;
}
Expand All @@ -63,7 +63,7 @@ public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient
/// <returns></returns>
public override async Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default)
{
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, Client);
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, this.Client);
await receiver.SubscribeAsync(cancellationToken);
return receiver;
}
Expand Down
57 changes: 45 additions & 12 deletions src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable
/// </summary>
private bool isDisposed;

// Internal property for testing purposes
internal Task TopicMessagesChannelCompletion => topicMessagesChannel.Reader.Completion;
// Internal property for testing purposes
internal Task AcknowledgementsChannelCompletion => acknowledgementsChannel.Reader.Completion;

/// <summary>
/// Constructs a new instance of a <see cref="PublishSubscribeReceiver"/> instance.
/// </summary>
Expand Down Expand Up @@ -115,20 +120,40 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default

var stream = await GetStreamAsync(cancellationToken);

//Retrieve the messages from the sidecar and write to the messages channel
var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken);
//Retrieve the messages from the sidecar and write to the messages channel - start without awaiting so this isn't blocking
_ = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken)
.ContinueWith(HandleTaskCompletion, null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted,
TaskScheduler.Default);

//Process the messages as they're written to either channel
var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken);
var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken);
_ = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken).ContinueWith(HandleTaskCompletion,
null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
_ = ProcessTopicChannelMessagesAsync(cancellationToken).ContinueWith(HandleTaskCompletion, null,
cancellationToken,
TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
}

try
{
await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask);
}
catch (OperationCanceledException)
/// <summary>
/// Exposed for testing purposes only.
/// </summary>
/// <param name="message">The test message to write.</param>
internal async Task WriteMessageToChannelAsync(TopicMessage message)
{
await topicMessagesChannel.Writer.WriteAsync(message);
}

//Exposed for testing purposes only
internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement acknowledgement)
{
await acknowledgementsChannel.Writer.WriteAsync(acknowledgement);
}

//Exposed for testing purposes only
internal static void HandleTaskCompletion(Task task, object? state)
{
if (task.Exception != null)
{
// Will be cleaned up during DisposeAsync
throw task.Exception;
}
}

Expand Down Expand Up @@ -251,13 +276,21 @@ await stream.RequestStream.WriteAsync(
//Each time a message is received from the stream, push it into the topic messages channel
await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken))
{
//https://github.com/dapr/dotnet-sdk/issues/1412 reports that this is sometimes null
//Skip the initial response - we only want to pass along TopicMessage payloads to developers
if (response?.EventMessage is null)
{
continue;
}

var message =
new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type,
response.EventMessage.SpecVersion, response.EventMessage.DataContentType,
response.EventMessage.Topic, response.EventMessage.PubsubName)
{
Path = response.EventMessage.Path,
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value)
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value),
Data = response.EventMessage.Data.ToByteArray()
};

try
Expand Down Expand Up @@ -308,6 +341,6 @@ public async ValueTask DisposeAsync()
/// </summary>
/// <param name="MessageId">The identifier of the message.</param>
/// <param name="Action">The action to take on the message in the acknowledgement request.</param>
private sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
internal sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
}

1 change: 1 addition & 0 deletions test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<PackageReference Include="Grpc.Net.Client" />
<PackageReference Include="protobuf-net.Grpc.AspNetCore" />
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,90 @@
using Dapr.Messaging.PublishSubscribe;
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Messaging.PublishSubscribe;
using Dapr.Messaging.PublishSubscribe.Extensions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Moq;

namespace Dapr.Messaging.Test.Extensions;

public sealed class PublishSubscribeServiceCollectionExtensionsTests
{
[Fact]
public void AddDaprPubSubClient_RegistersIHttpClientFactory()
public void AddDaprMessagingClient_FromIConfiguration()
{
const string apiToken = "abc123";
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>
{
{"DAPR_API_TOKEN", apiToken }
})
.Build();

var services = new ServiceCollection();

services.AddSingleton<IConfiguration>(configuration);

services.AddDaprPubSubClient();

var app = services.BuildServiceProvider();

var pubSubClient = app.GetRequiredService<DaprPublishSubscribeClient>() as DaprPublishSubscribeGrpcClient;

Assert.NotNull(pubSubClient!);
Assert.Equal(apiToken, pubSubClient.DaprApiToken);
}

[Fact]
public void AddDaprPubSubClient_RegistersIHttpClientFactory()
{
var services = new ServiceCollection();
services.AddDaprPubSubClient();

var serviceProvider = services.BuildServiceProvider();
var daprClient = serviceProvider.GetService<DaprPublishSubscribeClient>();
Assert.NotNull(daprClient);
}

[Fact]
public void AddDaprPubSubClient_CallsConfigureAction()
{
var services = new ServiceCollection();

var configureCalled = false;

services.AddDaprPubSubClient(Configure);

var serviceProvider = services.BuildServiceProvider();
var daprClient = serviceProvider.GetService<DaprPublishSubscribeClient>();
Assert.NotNull(daprClient);
Assert.True(configureCalled);
return;

void Configure(IServiceProvider sp, DaprPublishSubscribeClientBuilder builder)
{
configureCalled = true;
}
}

[Fact]
public void AddDaprPubSubClient_RegistersServicesCorrectly()
{
var services = new ServiceCollection();
services.AddDaprPubSubClient();
var serviceProvider = services.BuildServiceProvider();

var httpClientFactory = serviceProvider.GetService<IHttpClientFactory>();
Assert.NotNull(httpClientFactory);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
using Dapr.Messaging.PublishSubscribe;
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Messaging.PublishSubscribe;

namespace Dapr.Messaging.Test.PublishSubscribe
{
Expand Down
Loading

0 comments on commit 3d500e8

Please sign in to comment.