Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Microsoft.Extensions.Configuration provider for azure queue streaming #8929

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System;
using System.Collections.Generic;
using Azure.Storage.Queues;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Orleans;
using Orleans.Configuration;
using Orleans.Hosting;
using Orleans.Providers;

[assembly: RegisterProvider("AzureQueueStorage", "Streaming", "Silo", typeof(AzureQueueStreamProviderBuilder))]
[assembly: RegisterProvider("AzureQueueStorage", "Streaming", "Client", typeof(AzureQueueStreamProviderBuilder))]

namespace Orleans.Hosting;

public sealed class AzureQueueStreamProviderBuilder : IProviderBuilder<ISiloBuilder>, IProviderBuilder<IClientBuilder>
{
public void Configure(ISiloBuilder builder, string name, IConfigurationSection configurationSection)
{
builder.AddAzureQueueStreams(name, GetQueueOptionBuilder(configurationSection));
}

public void Configure(IClientBuilder builder, string name, IConfigurationSection configurationSection)
{
builder.AddAzureQueueStreams(name, GetQueueOptionBuilder(configurationSection));
}

private static Action<OptionsBuilder<AzureQueueOptions>> GetQueueOptionBuilder(IConfigurationSection configurationSection)
{
return (OptionsBuilder<AzureQueueOptions> optionsBuilder) =>
{
optionsBuilder.Configure<IServiceProvider>((options, services) =>
{
var queueNames = configurationSection.GetSection("QueueNames")?.Get<List<string>>();
if (queueNames != null)
{
options.QueueNames = queueNames;
}

var visibilityTimeout = configurationSection["MessageVisibilityTimeout"];
if (TimeSpan.TryParse(visibilityTimeout, out var visibilityTimeoutTimeSpan))
{
options.MessageVisibilityTimeout = visibilityTimeoutTimeSpan;
}

var serviceKey = configurationSection["ServiceKey"];
if (!string.IsNullOrEmpty(serviceKey))
{
// Get a client by name.
options.QueueServiceClient = services.GetRequiredKeyedService<QueueServiceClient>(serviceKey);
}
else
{
// Construct a connection multiplexer from a connection string.
var connectionName = configurationSection["ConnectionName"];
var connectionString = configurationSection["ConnectionString"];
if (!string.IsNullOrEmpty(connectionName) && string.IsNullOrEmpty(connectionString))
{
var rootConfiguration = services.GetRequiredService<IConfiguration>();
connectionString = rootConfiguration.GetConnectionString(connectionName);
}

if (!string.IsNullOrEmpty(connectionString))
{
options.QueueServiceClient = Uri.TryCreate(connectionString, UriKind.Absolute, out var uri)
? new QueueServiceClient(uri)
: new QueueServiceClient(connectionString);
}
}
});
};
}
}
50 changes: 25 additions & 25 deletions test/Extensions/TesterAzureUtils/Streaming/AQStreamingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,30 @@ public class AQStreamingTests : TestClusterPerTest
protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
TestUtils.CheckForAzureStorage();
builder.AddSiloBuilderConfigurator<SiloBuilderConfigurator>();
builder.AddClientBuilderConfigurator<MyClientBuilderConfigurator>();
}

private class MyClientBuilderConfigurator : IClientBuilderConfigurator
{
public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
builder.ConfigureHostConfiguration(cb =>
{
clientBuilder
.AddAzureQueueStreams(AzureQueueStreamProviderName, b=>
b.ConfigureAzureQueue(ob=>ob.Configure<IOptions<ClusterOptions>>(
(options, dep) =>
{
options.ConfigureTestDefaults();
options.QueueNames = AzureQueueUtilities.GenerateQueueNames(dep.Value.ClusterId, queueCount);
})));
}
Dictionary<string, string> queueConfig = [];
void ConfigureStreaming(string option, string value)
{
var prefix = $"Orleans:Streaming:{AzureQueueStreamProviderName}:";
queueConfig[$"{prefix}{option}"] = value;
}

ConfigureStreaming("ProviderType", "AzureQueueStorage");
ConfigureStreaming("ConnectionString", TestDefaultConfiguration.UseAadAuthentication
? TestDefaultConfiguration.DataBlobUri.AbsoluteUri
: TestDefaultConfiguration.DataConnectionString);

var names = AzureQueueUtilities.GenerateQueueNames(builder.Options.ClusterId, queueCount);
for (var i = 0; i < names.Count; i++)
{
ConfigureStreaming($"QueueNames:{i}", names[i]);
}

cb.AddInMemoryCollection(queueConfig);
});

builder.AddSiloBuilderConfigurator<SiloBuilderConfigurator>();
}

private class SiloBuilderConfigurator : ISiloConfigurator
Expand All @@ -56,14 +63,7 @@ public void Configure(ISiloBuilder hostBuilder)
options.ConfigureTestDefaults();
options.DeleteStateOnClear = true;
}))
.AddMemoryGrainStorage("MemoryStore")
.AddAzureQueueStreams(AzureQueueStreamProviderName, c=>
c.ConfigureAzureQueue(ob => ob.Configure<IOptions<ClusterOptions>>(
(options, dep) =>
{
options.ConfigureTestDefaults();
options.QueueNames = AzureQueueUtilities.GenerateQueueNames(dep.Value.ClusterId, queueCount);
})));
.AddMemoryGrainStorage("MemoryStore");
}
}

Expand Down Expand Up @@ -129,7 +129,7 @@ public async Task AQ_06_ManyDifferent_ManyProducerGrainManyConsumerClients()
await runner.StreamTest_06_ManyDifferent_ManyProducerGrainManyConsumerClients();
}

[SkippableFact(Skip="https://github.com/dotnet/orleans/issues/5648"), TestCategory("Functional")]
[SkippableFact(Skip = "https://github.com/dotnet/orleans/issues/5648"), TestCategory("Functional")]
public async Task AQ_07_ManyDifferent_ManyProducerClientsManyConsumerGrains()
{
await runner.StreamTest_07_ManyDifferent_ManyProducerClientsManyConsumerGrains();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using System.Text;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Configuration;
using Xunit;

namespace Tester.AzureUtils;

public class AzureQueueStreamProviderBuilderTests
{
[Fact]
public void Missing_ConnectionString()
{
string json = """
{
"Orleans": {
"Streaming": {
"AzureQueueProvider": {
"ProviderType": "AzureQueueStorage",
"QueueNames": [
"q1"
]
}
}
}
}
""";

var queueOptions = ConfigureSilo(json).Services.BuildServiceProvider().GetOptionsByName<AzureQueueOptions>(null);

Assert.Null(queueOptions.QueueServiceClient);
}

[Fact]
public void Minimal_Configuration()
{
string json = """
{
"Orleans": {
"Streaming": {
"AzureQueueProvider": {
"ProviderType": "AzureQueueStorage",
"ConnectionString": "UseDevelopmentStorage=true",
"QueueNames": [
"q1"
]
}
}
}
}
""";

var queueOptions = ConfigureSilo(json).Services.BuildServiceProvider().GetOptionsByName<AzureQueueOptions>(null);

Assert.NotNull(queueOptions.QueueServiceClient);
Assert.Equal("devstoreaccount1", queueOptions.QueueServiceClient.AccountName);
Assert.Equal(["q1"], queueOptions.QueueNames);
Assert.Null(queueOptions.MessageVisibilityTimeout);
}

[Fact]
public void Full_Configuration()
{
string json = """
{
"Orleans": {
"Streaming": {
"AzureQueueProvider": {
"ProviderType": "AzureQueueStorage",
"ConnectionString": "UseDevelopmentStorage=true",
"MessageVisibilityTimeout": "00:00:37",
"QueueNames": [
"q1",
"q2"
]
}
}
}
}
""";

var queueOptions = ConfigureSilo(json).Services.BuildServiceProvider().GetOptionsByName<AzureQueueOptions>(null);

Assert.NotNull(queueOptions.QueueServiceClient);
Assert.Equal("devstoreaccount1", queueOptions.QueueServiceClient.AccountName);
Assert.Equal(["q1", "q2"], queueOptions.QueueNames);
Assert.Equal(TimeSpan.FromSeconds(37), queueOptions.MessageVisibilityTimeout);
}

static TestSiloBuilder ConfigureSilo(string json)
{
var siloBuilder = new TestSiloBuilder(json);
var aqsBuilder = new AzureQueueStreamProviderBuilder();
aqsBuilder.Configure(siloBuilder, null, siloBuilder.Configuration.GetSection("Orleans:Streaming:AzureQueueProvider"));
return siloBuilder;
}

class TestSiloBuilder(string json) : ISiloBuilder
{
public IServiceCollection Services { get; } = new ServiceCollection();

public IConfiguration Configuration { get; } = GetConfig(json);
}

static IConfigurationRoot GetConfig(string json) => new ConfigurationBuilder().AddJsonStream(new MemoryStream(Encoding.UTF8.GetBytes(json))).Build();
}