How to create non-blocking consumer and call delegate on new message received #588
Answered
by
mtmk
rdkumavat1
asked this question in
Q&A
Replies: 1 comment 3 replies
-
// dotnet add package NATS.Net
// dotnet add package Microsoft.Extensions.Logging.Console
var opts = new NatsOpts
{
LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()),
};
await using var nats = new NatsConnection(opts);
var js = new NatsJSContext(nats);
var stream = await js.CreateStreamAsync(new StreamConfig("MY_STREAM", ["events.>"]));
var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("MY_CONSUMER"));
var cts = new CancellationTokenSource();
async Task MessageDelegate(NatsJSMsg<string> msg)
{
Console.WriteLine($"processing: {msg}");
await msg.AckAsync(cancellationToken: cts.Token);
}
var consumeTask = Task.Run(async () =>
{
await foreach (var msg in consumer.ConsumeAsync<string>(cancellationToken: cts.Token))
{
await MessageDelegate(msg);
}
});
// Stop application
await cts.CancelAsync();
await consumeTask; |
Beta Was this translation helpful? Give feedback.
3 replies
Answer selected by
rickdotnet
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
How to create a non-blocking consumer subscription and call a method with a delegate on new message publish.
Beta Was this translation helpful? Give feedback.
All reactions