Skip to content

Commit

Permalink
Add a relayOn option to FW messages to another XMR instance, tone dow…
Browse files Browse the repository at this point in the history
…n default logging (#41)
  • Loading branch information
dasgarner authored Oct 17, 2023
1 parent bffd2d0 commit cab94e6
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 10 deletions.
3 changes: 0 additions & 3 deletions Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with Xibo. If not, see <http://www.gnu.org/licenses/>.
*/
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using xibo_xmr;

IHost host = Host.CreateDefaultBuilder(args)
Expand Down
52 changes: 47 additions & 5 deletions Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
* along with Xibo. If not, see <http://www.gnu.org/licenses/>.
*/
using System.Collections.Concurrent;
using System.Linq.Expressions;
using System.Text;
using ConcurrentPriorityQueue.Core;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NetMQ;
using NetMQ.Sockets;
Expand All @@ -38,13 +35,16 @@ public class Worker : BackgroundService

private readonly ConcurrentPriorityQueue<ZmqMessage, int> _queue;

private readonly BlockingCollection<string> _relayQueue;

private int _sentCount = 0;

public Worker(ILogger<Worker> logger, IOptions<ZmqSettings> settings)
{
_logger = logger;
_settings = settings.Value;
_queue = new();
_relayQueue = new();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand All @@ -69,12 +69,23 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
// messages arriving from the CMS and adds them to the queue with the right QoS
// 2. Set up a Publisher (PUB) socket bound to `pubOn` which processes the queue
// 3. Set up a periodic timer which sends a heartbeat message (H) every 30 seconds
// 4. Handle relay if set
// -------
await Task.WhenAll(
List<Task> tasks = new()
{
Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, ResponderAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default),
Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, PublisherAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default),
Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, HeartbeatAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)
);
};

// Do we relay?
if (!string.IsNullOrEmpty(_settings.relayOn))
{
tasks.Add(Task.Factory.StartNew(() => { new NetMQRuntime().Run(stoppingToken, RelayAsync(stoppingToken)); }, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default));
}

// Await all
await Task.WhenAll(tasks);

// Must call clean up at the end
NetMQConfig.Cleanup();
Expand Down Expand Up @@ -111,6 +122,16 @@ async Task ResponderAsync(CancellationToken stoppingToken)
}
else
{
// Relay
if (!string.IsNullOrEmpty(_settings.relayOn))
{
bool relayResult = _relayQueue.TryAdd(message);
if (!relayResult)
{
_logger.LogError("Failed to add message to the relay queue");
}
}

// Decode the message
try
{
Expand Down Expand Up @@ -235,6 +256,27 @@ async Task HeartbeatAsync(CancellationToken stoppingToken)
}
}

async Task RelayAsync(CancellationToken stoppingToken)
{
using var relaySocket = new RequestSocket(_settings.relayOn);

while (!stoppingToken.IsCancellationRequested)
{
await Task.Run(() => {
bool result = _relayQueue.TryTake(out string message, -1, stoppingToken);
if (result && !string.IsNullOrEmpty(message))
{
_logger.LogDebug("Relay message");
bool sendResult = relaySocket.TrySendFrame(message);
if (!sendResult)
{
_logger.LogError("Unable to relay message");
}
}
}, stoppingToken);
}
}

private static Dictionary<string, int> NewStats()
{
return new()
Expand Down
1 change: 1 addition & 0 deletions ZmqSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public class ZmqSettings
public bool ipv6RespSupport { get; set; }
public bool ipv6PubSupport { get; set; }
public int? pubSendTimeoutMs { get; set; }
public string? relayOn {get; set; }
}
5 changes: 3 additions & 2 deletions appsettings.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
},
"Console": {
Expand All @@ -20,6 +20,7 @@
"queueSize": 10,
"ipv6RespSupport": false,
"ipv6PubSupport": false,
"pubSendTimeoutMs": 500
"pubSendTimeoutMs": 500,
"relayOn": ""
}
}

0 comments on commit cab94e6

Please sign in to comment.