Skip to content

Commit

Permalink
(#123) Align argument for BasiConsume
Browse files Browse the repository at this point in the history
  • Loading branch information
pardahlman committed Jan 29, 2017
1 parent 725d6e6 commit 024e345
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 25 deletions.
12 changes: 6 additions & 6 deletions src/RawRabbit/Pipe/Extensions/BasicConsumeExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace RawRabbit.Pipe.Extensions
public static class BasicConsumeExtension
{
public static Task BasicConsumeAsync(this IBusClient busClient, Func<BasicDeliverEventArgs, Task<Acknowledgement>> consumeFunc,
Action<IConsumerConfigurationBuilder> cfg)
Action<IPipeContext> context)
{
Func<object[], Task> genericFunc = args => consumeFunc(args[0] as BasicDeliverEventArgs);

Expand All @@ -23,22 +23,22 @@ public static Task BasicConsumeAsync(this IBusClient busClient, Func<BasicDelive
.Use<QueueDeclareMiddleware>()
.Use<QueueBindMiddleware>(new QueueBindOptions
{
ExchangeNameFunc = context => context.GetConsumeConfiguration()?.ExchangeName
ExchangeNameFunc = ctx => ctx.GetConsumeConfiguration()?.ExchangeName
})
.Use<ConsumerMiddleware>()
.Use<MessageConsumeMiddleware>(new ConsumeOptions
{
Pipe = p => p
.Use<HandlerInvokationMiddleware>(new HandlerInvokationOptions
{
HandlerArgsFunc = context => new object[] {context.GetDeliveryEventArgs()},
HandlerArgsFunc = ctx => new object[] { ctx.GetDeliveryEventArgs()},
})
.Use<ExplicitAckMiddleware>()
}),
context =>
ctx =>
{
context.Properties.Add(PipeKey.MessageHandler, genericFunc);
context.Properties.Add(PipeKey.ConfigurationAction, cfg);
ctx.Properties.Add(PipeKey.MessageHandler, genericFunc);
context?.Invoke(ctx);
}
);
}
Expand Down
42 changes: 23 additions & 19 deletions test/RawRabbit.IntegrationTests/Features/GlobalExecutionIdTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using RabbitMQ.Client.Events;
using RawRabbit.Common;
using RawRabbit.IntegrationTests.TestMessages;
using RawRabbit.Pipe;
using RawRabbit.Pipe.Extensions;
using Xunit;

Expand Down Expand Up @@ -32,17 +33,19 @@ public async Task Should_Forward_On_Pub_Sub()
await secondSubscriber.SubscribeAsync<SecondMessage>(message => secondSubscriber.PublishAsync(new ThirdMessage()));
await thridSubscriber.SubscribeAsync<SecondMessage>(message => Task.FromResult(0));
await consumer.BasicConsumeAsync(args =>
{
var tsc = taskCompletionSources.First(t => !t.Task.IsCompleted);
tsc.TrySetResult(args);
return Task.FromResult<Acknowledgement>(new Ack());
}, cfg => cfg
.Consume(c => c
.OnExchange("rawrabbit.integrationtests.testmessages")
.WithRoutingKey("#"))
.FromDeclaredQueue(q => q
.WithName("take_all")
.WithAutoDelete())
{
var tsc = taskCompletionSources.First(t => !t.Task.IsCompleted);
tsc.TrySetResult(args);
return Task.FromResult<Acknowledgement>(new Ack());
}, ctx => ctx
.UseConsumerConfiguration(cfg => cfg
.Consume(c => c
.OnExchange("rawrabbit.integrationtests.testmessages")
.WithRoutingKey("#"))
.FromDeclaredQueue(q => q
.WithName("take_all")
.WithAutoDelete())
)
);

/* Test */
Expand Down Expand Up @@ -91,14 +94,15 @@ await consumer.BasicConsumeAsync(args =>
var tsc = taskCompletionSources.First(t => !t.Task.IsCompleted);
tsc.TrySetResult(args);
return Task.FromResult<Acknowledgement>(new Ack());
}, cfg => cfg
.Consume(c => c
.OnExchange("rawrabbit.integrationtests.testmessages")
.WithRoutingKey("#"))
.FromDeclaredQueue(q => q
.WithName("take_all")
.WithAutoDelete())
);
}, ctx => ctx
.UseConsumerConfiguration(cfg => cfg
.Consume(c => c
.OnExchange("rawrabbit.integrationtests.testmessages")
.WithRoutingKey("#"))
.FromDeclaredQueue(q => q
.WithName("take_all")
.WithAutoDelete())
));

/* Test */
await requester.RequestAsync<FirstRequest, FirstResponse>();
Expand Down

0 comments on commit 024e345

Please sign in to comment.