Skip to content

Commit ea7a0de

Browse files
committed
Make queue clients consume methods async
1 parent 64b01f2 commit ea7a0de

File tree

6 files changed

+28
-18
lines changed

6 files changed

+28
-18
lines changed

Diff for: src/InEngine.Core/Queuing/Clients/FileClient.cs

+5-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.IO;
44
using System.Linq;
55
using System.Threading;
6+
using System.Threading.Tasks;
67
using InEngine.Core.Exceptions;
78
using InEngine.Core.IO;
89
using InEngine.Core.Queuing.Message;
@@ -55,15 +56,15 @@ private void PublishToQueue(CommandEnvelope commandEnvelope, string queuePath)
5556
);
5657
}
5758

58-
public void Consume(CancellationToken cancellationToken)
59+
public async Task Consume(CancellationToken cancellationToken)
5960
{
6061
try
6162
{
6263
while (true)
6364
{
6465
try
6566
{
66-
if (Consume() == null)
67+
if (await Consume() == null)
6768
Thread.Sleep(5000);
6869
}
6970
catch (Exception exception)
@@ -84,7 +85,7 @@ public void Consume(CancellationToken cancellationToken)
8485
}
8586
}
8687

87-
public ICommandEnvelope Consume()
88+
public async Task<ICommandEnvelope> Consume()
8889
{
8990
FileInfo fileInfo;
9091
var inProgressFilePath = string.Empty;
@@ -116,7 +117,7 @@ public ICommandEnvelope Consume()
116117
try
117118
{
118119
command.WriteSummaryToConsole();
119-
command.RunWithLifeCycleAsync().RunSynchronously();
120+
await command.RunWithLifeCycleAsync();
120121
}
121122
catch (Exception exception)
122123
{

Diff for: src/InEngine.Core/Queuing/Clients/RabbitMQClient.cs

+11-5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Text;
44
using System.Threading;
5+
using System.Threading.Tasks;
56
using InEngine.Core.Exceptions;
67
using InEngine.Core.IO;
78
using InEngine.Core.Queuing.Message;
@@ -94,13 +95,18 @@ public void Recover()
9495
{
9596
}
9697

97-
public void Consume(CancellationToken cancellationToken)
98+
public async Task Consume(CancellationToken cancellationToken)
9899
{
99100
InitChannel();
100101
var consumer = new EventingBasicConsumer(Channel);
101-
consumer.Received += (model, result) =>
102+
consumer.Received += async (model, result) =>
102103
{
103104
var eventingConsumer = (EventingBasicConsumer)model;
105+
if (eventingConsumer == null)
106+
{
107+
Log.LogWarning("EventingBasicConsumer is null while attempting to consume messages");
108+
return;
109+
}
104110

105111
var serializedMessage = Encoding.UTF8.GetString(result.Body);
106112
var commandEnvelope = serializedMessage.DeserializeFromJson<CommandEnvelope>();
@@ -115,7 +121,7 @@ public void Consume(CancellationToken cancellationToken)
115121
try
116122
{
117123
command.WriteSummaryToConsole();
118-
command.RunWithLifeCycleAsync().RunSynchronously();
124+
await command.RunWithLifeCycleAsync();
119125
}
120126
catch (Exception exception)
121127
{
@@ -135,7 +141,7 @@ public void Consume(CancellationToken cancellationToken)
135141
Channel.BasicConsume(queue: PendingQueueName, autoAck: false, consumer: consumer);
136142
}
137143

138-
public ICommandEnvelope Consume()
144+
public async Task<ICommandEnvelope> Consume()
139145
{
140146
InitChannel();
141147
var result = Channel.BasicGet(PendingQueueName, false);
@@ -155,7 +161,7 @@ public ICommandEnvelope Consume()
155161
try
156162
{
157163
command.WriteSummaryToConsole();
158-
command.RunWithLifeCycleAsync().RunSynchronously();
164+
await command.RunWithLifeCycleAsync();
159165
}
160166
catch (Exception exception)
161167
{

Diff for: src/InEngine.Core/Queuing/Clients/RedisClient.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void Recover()
8282
PublishToChannel();
8383
}
8484

85-
public void Consume(CancellationToken cancellationToken)
85+
public async Task Consume(CancellationToken cancellationToken)
8686
{
8787
try
8888
{
@@ -104,7 +104,7 @@ public void Consume(CancellationToken cancellationToken)
104104
}
105105
}
106106

107-
public ICommandEnvelope Consume()
107+
public async Task<ICommandEnvelope> Consume()
108108
{
109109
var rawRedisMessageValue = Redis.ListRightPopLeftPush(PendingQueueName, InProgressQueueName);
110110
var serializedMessage = rawRedisMessageValue.ToString();
@@ -122,7 +122,7 @@ public ICommandEnvelope Consume()
122122
try
123123
{
124124
command.WriteSummaryToConsole();
125-
command.RunWithLifeCycleAsync().RunSynchronously();
125+
await command.RunWithLifeCycleAsync();
126126
}
127127
catch (Exception exception)
128128
{

Diff for: src/InEngine.Core/Queuing/Clients/SyncClient.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Threading;
4+
using System.Threading.Tasks;
45
using InEngine.Core.IO;
56
using InEngine.Core.Queuing.Message;
67

@@ -43,12 +44,12 @@ public void Recover()
4344
{
4445
}
4546

46-
public void Consume(CancellationToken cancellationToken)
47+
public async Task Consume(CancellationToken cancellationToken)
4748
{
4849
throw new NotImplementedException();
4950
}
5051

51-
public ICommandEnvelope Consume()
52+
public async Task<ICommandEnvelope> Consume()
5253
{
5354
throw new NotImplementedException();
5455
}

Diff for: src/InEngine.Core/Queuing/IQueueClient.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Collections.Generic;
22
using System.Threading;
3+
using System.Threading.Tasks;
34
using InEngine.Core.Queuing.Message;
45
using InEngine.Core.IO;
56
using Microsoft.Extensions.Logging;
@@ -16,8 +17,8 @@ public interface IQueueClient : IHasMailSettings, IDisposable
1617
string QueueName { get; set; }
1718
bool UseCompression { get; set; }
1819
void Publish(AbstractCommand command);
19-
void Consume(CancellationToken cancellationToken);
20-
ICommandEnvelope Consume();
20+
Task Consume(CancellationToken cancellationToken);
21+
Task<ICommandEnvelope> Consume();
2122
void Recover();
2223
Dictionary<string, long> GetQueueLengths();
2324
bool ClearPendingQueue();

Diff for: src/InEngine.Core/Queuing/QueueAdapter.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Threading;
4+
using System.Threading.Tasks;
45
using InEngine.Core.IO;
56
using InEngine.Core.Queuing.Clients;
67
using InEngine.Core.Queuing.Message;
@@ -89,8 +90,8 @@ public static QueueAdapter Make(bool useSecondaryQueue, QueueSettings queueSetti
8990
}
9091

9192
public void Publish(AbstractCommand command) => QueueClient.Publish(command);
92-
public void Consume(CancellationToken cancellationToken) => QueueClient.Consume(cancellationToken);
93-
public ICommandEnvelope Consume() => QueueClient.Consume();
93+
public async Task Consume(CancellationToken cancellationToken) => await QueueClient.Consume(cancellationToken);
94+
public async Task<ICommandEnvelope> Consume() => await QueueClient.Consume();
9495
public void Recover() => QueueClient.Recover();
9596
public bool ClearPendingQueue() => QueueClient.ClearPendingQueue();
9697
public bool ClearInProgressQueue() => QueueClient.ClearInProgressQueue();

0 commit comments

Comments
 (0)