Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Streaming Subscription Support #1324

Closed
JoshVanL opened this issue Jul 9, 2024 · 25 comments · Fixed by #1381
Closed

Add Streaming Subscription Support #1324

JoshVanL opened this issue Jul 9, 2024 · 25 comments · Fixed by #1381
Assignees
Labels
Milestone

Comments

@JoshVanL
Copy link
Contributor

JoshVanL commented Jul 9, 2024

Dapr v1.14 adds support for streaming subscriptions which allows for dynamically subscribing to pubsub topics over a bi-directional gRPC stream. This API is exposed via the SubscribeTopicEventsAlpha1 gRPC API.

On first call, the client sends an InitialRequest containing the pubsub, topic name, etc. From then on, topic event messages are sent from daprd to the client over the stream. The client reports the processing status of this message back to daprd. A message will not be considered processed until the client has responded with this status. There can be multiple in-flight topic messages. Daprd unsubscribes from the topic when the stream is closed.

Here is a reference implementation for the go-sdk.

@JoshVanL JoshVanL added the kind/enhancement New feature or request label Jul 9, 2024
@philliphoff philliphoff modified the milestones: v1.15, v1.14.x Jul 9, 2024
@WhitWaldo
Copy link
Contributor

/assign

@philliphoff
Copy link
Collaborator

philliphoff commented Jul 10, 2024

Some initial thoughts on the shape of a possible implementation:

  • To align with the "component specific clients" approach, creation of a new Dapr.PublishSubscribe (Dapr.Messaging?) package
  • Creation of a new Dapr.DaprPublishSubscribeClient type
  • Client exposes a Subscribe() method that accepts a handler called for each topic event received
    • Subscription ends when cancellation token indicates cancellation requested
    • Subscribe() implementation may call handler concurrently but manages synchronization across gRPC channel
    • Potentially offer option to have buffered/synchronous handling
    • Method completes when either side of the gRPC channel indicates cancellation requested
    • Method throws if either side indicates an error
namespace Dapr.PublishSubscribe;

public sealed record DaprSubscriptionOptions
{
  public IReadOnlyDictionary<string, string> Metadata { get; init; } = Dictionary<string, string>();

  public string? DeadLetterTopic { get; init; }
}

public sealed record TopicRequest
{
    public required string Id { get; init; }
    public required string Source { get; init; }
    public required string Type { get; init; }
    public required string SpecVersion { get; init; }
    public required string DataContentType { get; init; }
    public required string Topic { get; init; }
    public required string PubSubName { get; init; }
    public string? Path { get; init; }
    public object? Extensions { get; init; } // TODO: Determine what this should look like.
}

public enum TopicResponse
{
  Success,
  Error,
  Drop
}

public delegate Task<TopicResponse> TopicHandler(TopicRequest request);

public abstract class DaprPublishSubscribeClient
{
    public DaprPublishSubscribeClient CreateClient();

    public Task SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);
}

@WhitWaldo
Copy link
Contributor

Looking at the protos, the extension is implemented as a Struct, so that might be most easily implemented as a Dictionary<string,string>, though I think it'd do developers a courtesy if we could identify what all the properties are and strongly type it as an Options record, then map it out ourselves. I hate seeing a library that provides this open-ended mapping and then leaves it to me to figure out what to put in it.

This might be worth a more substantive discussion elsewhere, but I was tentatively thinking of the following component-based naming scheme:

  • Two-part names for Dapr-provided services, e.g. Dapr.Jobs or Dapr.Workflow where there isn't really an underlying component swapped in that performs the ability.
  • Three-part names for component abstractions: Dapr.State.KeyValue, Dapr.State.Cache, Dapr.State.Relational, or here, Dapr.Messaging.PubSub (or Dapr.Messaging.PublishSubscribe).

This leaves the door open for other messaging building blocks down the road like an event bus or distributed event sender/processor under the same messaging subset.

@philliphoff
Copy link
Collaborator

Looking at the protos, the extension is implemented as a Struct, so that might be most easily implemented as a Dictionary<string,string>, though I think it'd do developers a courtesy if we could identify what all the properties are and strongly type it as an Options record, then map it out ourselves. I hate seeing a library that provides this open-ended mapping and then leaves it to me to figure out what to put in it.

@WhitWaldo I also prefer that, for well-known values, the APIs expose them in an appropriately typed way.

Three-part names for component abstractions: Dapr.State.KeyValue, Dapr.State.Cache, Dapr.State.Relational, or here, Dapr.Messaging.PubSub (or Dapr.Messaging.PublishSubscribe).

@WhitWaldo I feel like, from a packaging perspective, this might be too fine-grained. That said, I think it'd be fine to have a single component-category package (e.g. Dapr.State or Dapr.Messaging) that included multiple fine-grained clients (e.g. Dapr.State package contains both Dapr.State.KeyValue.DaprKeyValueClient as well as Dapr.State.Cache.DaprCacheClient).

@philliphoff
Copy link
Collaborator

philliphoff commented Jul 10, 2024

@WhitWaldo There's a start of my thinking in this branch:

https://github.com/dapr/dotnet-sdk/tree/philliphoff-streaming-subs

(I had started thinking about this a bit before you self-assigned the issue; feel free to borrow, build-upon, or completely ignore.)

@WhitWaldo
Copy link
Contributor

WhitWaldo commented Jul 11, 2024

@philliphoff I use a mono-repo for most of my own professional stuff and just leave it to the CI/CD to deal with the individual packages, so I'll typically lean towards more specific packages than jamming everything in one package. Pair that with a documentation perspective and I like the idea of saying that here are the docs for this building block, here are the docs to explain this specialty block within with a package purpose-built for this sub-concept. Namespaces are nice within the package itself, but just as I'd prefer to just install Dapr.Messaging.PublishSubscribe instead of Dapr.Client and get all that comes with it, the same holes for wanting to avoid installing just Dapr.Messaging and getting a whole distributed event processing system in it I didn't necessarily need to use, so it's just a consistently narrow focus all the way down. Paired with the lack of optional interfaces, it's a perfect abstraction of all the narrowed capabilities of components it ties into.

I meandered through your approach and think it frankly looks great. I'd build a sample or two off it just to play around with the experience, but generate the client using the shared project from my Jobs proposal (so it's bundling any available api-token) and I think it's frankly pretty much there.

Now, the big question would be whether, especially come 1.15, you'd be open to shuffling out all the pubsub bits to this package or if that's worth some sort of lower-level facelift first with a fresh start in the new package (like what I'm slowly wanting to do with state)?

@philliphoff
Copy link
Collaborator

@WhitWaldo For me it's finding a balance between individual purpose and repo/package maintainability. I'm not sure I'm convinced yet that having separate packages for, say, key/value store clients and cache clients are in that sweet spot of balance. (We always reserve the right to further split packages as necessary, too. Yes it's breaking, but it's a fairly minimal one.)

I've updated my branch switching to a IDisposable Subscribe(/* ... */) model which is a little more aligned with some eventing frameworks (like Rx.NET).

I'm running into some curious behavior in testing, though, where the Dapr sidecar seems to ignore success/drop responses and keeps re-delivering messages. From the logs, it appears the responses are getting back to the sidecar so I'm not sure what's happening.

I've also done a little testing with raw payloads; that seems to result in none of the normal topic request fields to be populated and the extensions field to contain the actual data; I'm not sure yet how best to expose that. I'm also not sure how useful it is to manage deserialization of the data on the user's behalf vs. just having the user do it (which is fairly trivial and the content type is provided anyway, as well as gets the Dapr SDK out of the serialization options business).

@WhitWaldo
Copy link
Contributor

@philliphoff I get that - I'll see if I can think of more evidence to support one approach or the other.

That's less than ideal behavior - happy to dig in and see if I can't help out once I've got Jobs wrapped up.

Let me know if you come to a conclusion on the serialization. It's something I'm torn on with the Jobs mapping is whether it should similarly get out of the serialization game. On one hand, it'd be nice to blindly pass types to a generic endpoint and it all Just Work. On the other hand, if it instead accepted only an array of bytes for the job payload and left serialization entirely to the developer, that would simplify the SDK mapping significantly because we wouldn't potentially have a situation of the job being triggered and deserialized into the wrong payload type and throwing - it'd be nice to ensure that any runtime errors are because the developer failed to deserialize and handle their own types instead of it being an inevitable "Dapr" problem.

@m3nax
Copy link
Contributor

m3nax commented Jul 30, 2024

@philliphoff I am reading the code written for streaming support and think that an IAsyncEnumerable version of the Subscribe method might be a good addition. Can be used to create message processing pipeline with some chained linq filters for example.

public abstract class DaprPublishSubscribeClient
{
    public DaprPublishSubscribeClient CreateClient();

    public Task SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);

    public IAsyncEnumerator<request> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);
}

What do you think about?

@yaron2
Copy link
Member

yaron2 commented Aug 26, 2024

@WhitWaldo do you have bandwidth to implement this for 1.15? @JoshVanL can help on anything related to the Dapr runtime as the implementor of streaming subscriptions.

Otherwise @philliphoff if you've made progress and can put this in 1.15 that'd be awesome.

@WhitWaldo
Copy link
Contributor

@yaron2 As I recall, @philliphoff was seeing some stability issues for his own implementation which seemed pretty nearly baked, so I'm not sure that there's much I can contribute here outside of some additional stability testing.

@WhitWaldo
Copy link
Contributor

@yaron2 I have a few more thoughts about this.

The more I think about it, the more I really like the idea of putting this in Dapr.Messaging and having multiple clients (opens the door for potentially versioning in the clients themselves), so I'd definitely take that from what @philliphoff was working on.

I think it makes sense to make an IAsyncEnumerator method available per @m3nax 's idea above as it opens options up of what to do with the received messages.

In my Scheduler PR, I introduce a Dapr.Common library to which I intended to centralize exceptions across the .NET clients and introduce a generic client builder and I'd ideally use that here (to avoid yet another implementation of it in the clients).

Otherwise, I think I'd make a few changes to what's already been put together here, but could probably have something hammered out pretty quickly. I don't see any reason it can't otherwise make it into 1.15. When's the target date for that release?

@yaron2
Copy link
Member

yaron2 commented Aug 27, 2024

@yaron2 I have a few more thoughts about this.

The more I think about it, the more I really like the idea of putting this in Dapr.Messaging and having multiple clients (opens the door for potentially versioning in the clients themselves), so I'd definitely take that from what @philliphoff was working on.

I think it makes sense to make an IAsyncEnumerator method available per @m3nax 's idea above as it opens options up of what to do with the received messages.

In my Scheduler PR, I introduce a Dapr.Common library to which I intended to centralize exceptions across the .NET clients and introduce a generic client builder and I'd ideally use that here (to avoid yet another implementation of it in the clients).

Otherwise, I think I'd make a few changes to what's already been put together here, but could probably have something hammered out pretty quickly. I don't see any reason it can't otherwise make it into 1.15. When's the target date for that release?

We're aiming for mid November

@philliphoff
Copy link
Collaborator

Otherwise @philliphoff if you've made progress and can put this in 1.15 that'd be awesome.

I'm afraid that I don't have bandwidth for feature work in the 1.15 timeframe, but if others do then I have no issue making this part of the 1.15 release. (I believe we had already targeted this for a post-1.14 release release.)

@WhitWaldo
Copy link
Contributor

I should be able to tackle this in that timeframe - again, it'd be great if we can get the Schedule PR first through so I can use some of those shared dependencies here and prevent some duplicate classes, but I'll work on having a draft done by the end of next week, if not sooner.

@yaron2
Copy link
Member

yaron2 commented Aug 27, 2024

I should be able to tackle this in that timeframe - again, it'd be great if we can get the Schedule PR first through so I can use some of those shared dependencies here and prevent some duplicate classes, but I'll work on having a draft done by the end of next week, if not sooner.

We discussed the Jobs API PR this morning and expediting the review for it.

@theperm
Copy link

theperm commented Aug 29, 2024

Joining in late here, on review of the proposal and given the ack based nature of the Go API then i concur IAsyncEnumerable is the most modern way to do this in .Net. I the think the return type would be IAsyncEnumerable NOT IAsyncEnumerator.

However I don't really see the need for the overload that takes a call back an returns a task? That feels antiquated in .NET now.

public abstract class DaprPublishSubscribeClient
{
    public DaprPublishSubscribeClient CreateClient();

    public IAsyncEnumerable<request> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions? options = null, CancellationToken cancellationToken = default);
}

@WhitWaldo
Copy link
Contributor

That's right - I mistyped. It should be IAsyncEnumerable.

I was planning to expose it as an overload much like the Service Bus SDK does in their own receiver implementation. It opens the door to developers using it in a more modern workflow, but still leaves the API accessible to those users more comfortable with a Task<IReadOnlyList<string>>.

@theperm
Copy link

theperm commented Aug 30, 2024

Task<IReadOnlyList> this is a task that completes with a list of (all) the messages - is the plan then to specify how many messages to consume (as a batch) like service bus?

In reality the message stream is expected to not end or be very long so this would usually be called in a loop and continue to fetch more. But would it be really be any different to just embracing Linq operators.

IAsyncEnumerable<msg> msgs = daprClient.SubscribeAsync(...)
var batch = msgs.Take(20);

As i wrote the above i started to ask myself will this be cold or hot AsyncEnumerable. (They are generally cold to align with cold observable behaviour). Then i think what will happen when there is two Enumerators at play on the same AsyncEnumerable?

@mustafaaozcan
Copy link

Hello,

We have used Dapr in previous projects and are now starting a new startup where we plan to continue leveraging Dapr. We are particularly interested in utilizing this feature. Could you provide an estimate on when it will be available?

Thank you.

@WhitWaldo
Copy link
Contributor

WhitWaldo commented Sep 2, 2024

@mustafaaozcan Aiming for a release alongside 1.15 in November

@alicihad
Copy link

alicihad commented Sep 2, 2024

Thanks for reply WhitWaldo, Can be it early, it can effect our production date, Thanks for your support

@WhitWaldo
Copy link
Contributor

@alicihad Not likely as all the .NET feature bits tend to be released on the minor releases. That said, the core functionality is already baked into the Dapr sidecar and was released with 1.14 - what's discussed here is just implementing the .NET SDK to integrate with it.

So with that said, there's little that stops you from either waiting for me to finish the PR/review and just writing/using your own implementation until the official bits come out in the 1.15 release. I do it all the time with the pieces I've written to validate that they work as they should.

@sebastianamaro
Copy link

Hi! Quick question, do you have an ETA for 1.15? this is exactly what we are looking for :)

Thanks a lot!

@m3nax
Copy link
Contributor

m3nax commented Dec 4, 2024

Hi! Quick question, do you have an ETA for 1.15? this is exactly what we are looking for :)

Thanks a lot!

This is the issue that collects information on release 1.15: dapr/dapr#8017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment