Skip to content

How to create non-blocking consumer and call delegate on new message received #588

Answered by mtmk
rdkumavat1 asked this question in Q&A
Discussion options

You must be logged in to vote
// dotnet add package NATS.Net
// dotnet add package Microsoft.Extensions.Logging.Console

var opts = new NatsOpts
{
    LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()),
};

await using var nats = new NatsConnection(opts);
var js = new NatsJSContext(nats);

var stream = await js.CreateStreamAsync(new StreamConfig("MY_STREAM", ["events.>"]));
var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("MY_CONSUMER"));

var cts = new CancellationTokenSource();

async Task MessageDelegate(NatsJSMsg<string> msg)
{
    Console.WriteLine($"processing: {msg}");
    await msg.AckAsync(cancellationToken: cts.Token);
}

var consumeTask = Task.Run(async () =>
{
    

Replies: 1 comment 3 replies

Comment options

You must be logged in to vote
3 replies
@rdkumavat1
Comment options

@mtmk
Comment options

@rickdotnet
Comment options

Answer selected by rickdotnet
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
3 participants