Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Discussion] Implementing a FusionCache Backplane with Azure Service Bus #370

Open
cyrildurand opened this issue Jan 30, 2025 · 2 comments

Comments

@cyrildurand
Copy link

cyrildurand commented Jan 30, 2025

My app is hosted on Azure, and I already use Azure Service Bus and Cosmos DB. Since Azure Redis is costly and my app doesn’t require its level of performance, I’m exploring Cosmos DB for distributed caching and Azure Service Bus as a backplane.

From my research, it seems that Redis is the only officially supported backplane implementation, and I couldn't find an existing implementation using Azure Service Bus.

Questions:

  1. Does using Azure Service Bus as a backplane make sense?
  2. Would there be any drawbacks compared to Redis?

My Implementation

I started working on a Service Bus-based backplane, but I have some concerns—particularly with the Subscribe method.

Here’s my current (dummy/basic/work-in-progress) implementation:

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using ZiggyCreatures.Caching.Fusion;
using ZiggyCreatures.Caching.Fusion.Backplane;

namespace ZiggyCreatures.FusionCache.Backplane.AzureServiceBus
{
	public class ServiceBusBackplane : IFusionCacheBackplane
	{
		private const String topicName = "fusionCache-dev";

		public ServiceBusBackplane(ServiceBusAdministrationClient serviceBusAdministrationClient, ServiceBusClient serviceBusClient)
		{
			this._serviceBusAdministrationClient = serviceBusAdministrationClient;
			this._serviceBusClient = serviceBusClient;
			this._serviceBusSender = new Lazy<ServiceBusSender>(() =>
			{
				return this._serviceBusClient.CreateSender(topicName);
			});
		}

		private readonly ServiceBusAdministrationClient _serviceBusAdministrationClient;
		private readonly ServiceBusClient _serviceBusClient;
		private readonly Lazy<ServiceBusSender> _serviceBusSender;
		private ServiceBusProcessor _serviceBusProcessor;

		public void Publish(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
		{
			this.PublishAsync(message, options, token).GetAwaiter().GetResult();
		}

		public async ValueTask PublishAsync(BackplaneMessage message, FusionCacheEntryOptions options, CancellationToken token = default)
		{
			await this._serviceBusSender.Value.SendMessageAsync(new ServiceBusMessage() { Body = new BinaryData(BackplaneMessage.ToByteArray(message)) }, token);
		}

		public void Subscribe(BackplaneSubscriptionOptions options)
		{
			this.SubscribeAsync(options).GetAwaiter().GetResult();
		}

		public void Unsubscribe()
		{
			this.UnsubscribeAsync().GetAwaiter().GetResult();
		}

		private async Task SubscribeAsync(BackplaneSubscriptionOptions options)
		{
			if (!await this._serviceBusAdministrationClient.TopicExistsAsync(topicName))
			{
				await this._serviceBusAdministrationClient.CreateTopicAsync(topicName);
			}
			var subcriptionName = $"fusion-{options.CacheInstanceId}";
			if (!await this._serviceBusAdministrationClient.SubscriptionExistsAsync(topicName, subcriptionName))
			{
				await this._serviceBusAdministrationClient.CreateSubscriptionAsync(topicName, subcriptionName);
			}
			this._serviceBusProcessor = this._serviceBusClient.CreateProcessor(topicName, subcriptionName);
			this._serviceBusProcessor.ProcessErrorAsync += async (args) =>
			{
                             // TODO
			};
			this._serviceBusProcessor.ProcessMessageAsync += async (args) =>
			{
				var data = args.Message.Body.ToArray();
				var msg = BackplaneMessage.FromByteArray(data);
				if (options.IncomingMessageHandlerAsync != null)
				{
					await options.IncomingMessageHandlerAsync(msg);
				}
			};
			await this._serviceBusProcessor.StartProcessingAsync();
		}

		private async Task UnsubscribeAsync()
		{
			await this._serviceBusProcessor.StopProcessingAsync();
			await this._serviceBusProcessor.DisposeAsync();
			// TODO delete subscription 
		}
	}
}

Concerns

Subscribe Method and Sync-over-Async Pattern

The Subscribe method is synchronous, which forces me to use a sync-over-async pattern (.GetAwaiter().GetResult()). I’d like to avoid this because it can lead to deadlocks or thread pool exhaustion in certain scenarios.

  • Is there a specific reason why this method is sync-only?
  • Would it make sense to introduce an async version of this method?
  • I checked the Redis-based backplane and noticed similar concerns — was there a design decision behind this?

Unsubscribe Method Behavior

I noticed that in the FusionCache Simulator App, the Unsubscribe method is never triggered.

  • Is this expected behavior?
  • If so, how does FusionCache typically handle backplane unsubscription?

Next Steps

If this approach makes sense, I’m happy to open a Pull Request (PR) to contribute this as an alternative backplane provider for FusionCache.

Would love to hear your thoughts! 🚀

@cyrildurand cyrildurand changed the title Discussion: Implementing a FusionCache Backplane with Azure Service Bus [Discussion] Implementing a FusionCache Backplane with Azure Service Bus Jan 30, 2025
@jodydonetti
Copy link
Collaborator

Hi @cyrildurand
I'll answer about the first part later, but for now:

Concerns

Subscribe Method and Sync-over-Async Pattern

The Subscribe method is synchronous, which forces me to use a sync-over-async pattern (.GetAwaiter().GetResult()). I’d like to avoid this because it can lead to deadlocks or thread pool exhaustion in certain scenarios.
[...]

You're right about this, and in fact since last week I've been working exactly on this.
Mind you, this never cause any issue that I'm aware of, but still it would be better to go full-async on all the code paths.

The next version (coming out very soon) will have this change in it.

@jodydonetti
Copy link
Collaborator

Hi @cyrildurand , v2.1 is out 🥳

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants