From 80867a62093b25247adf28f3dd0137d77d56fcec Mon Sep 17 00:00:00 2001 From: Executor Date: Wed, 22 Nov 2023 16:33:24 +0800 Subject: [PATCH 1/2] WebServices partially refactor. --- Lagrange.Core.Test/Tests/NTLoginTest.cs | 2 +- .../Core/Message/MessageService.cs | 4 +- .../Core/Network/LagrangeWebSvcCollection.cs | 149 ++++++++---- .../Core/Network/MsgRecvEventArgs.cs | 4 +- .../Options/ForwardWSServiceOptions.cs | 10 + .../Options/ReverseWSServiceOptions.cs | 12 + .../Core/Network/Options/WSServiceOptions.cs | 21 ++ .../DefaultLagrangeWebServiceFactory.cs | 50 ++++ .../Core/Network/Service/ForwardWSService.cs | 71 ++++-- .../Service/ForwardWSServiceFactory.cs | 23 ++ .../Core/Network/Service/HttpPostService.cs | 4 +- .../Network/Service/ILagrangeWebService.cs | 2 +- .../Core/Network/Service/LagrangeWSService.cs | 29 --- .../Service/LagrangeWebServiceFactory.cs | 35 +++ .../Core/Network/Service/ReverseWSService.cs | 221 +++++++++++------- .../Service/ReverseWSServiceFactory.cs | 23 ++ .../Core/Operation/OperationService.cs | 25 +- Lagrange.OneBot/LagrangeAppBuilder.cs | 11 +- 18 files changed, 493 insertions(+), 203 deletions(-) create mode 100644 Lagrange.OneBot/Core/Network/Options/ForwardWSServiceOptions.cs create mode 100644 Lagrange.OneBot/Core/Network/Options/ReverseWSServiceOptions.cs create mode 100644 Lagrange.OneBot/Core/Network/Options/WSServiceOptions.cs create mode 100644 Lagrange.OneBot/Core/Network/Service/DefaultLagrangeWebServiceFactory.cs create mode 100644 Lagrange.OneBot/Core/Network/Service/ForwardWSServiceFactory.cs delete mode 100644 Lagrange.OneBot/Core/Network/Service/LagrangeWSService.cs create mode 100644 Lagrange.OneBot/Core/Network/Service/LagrangeWebServiceFactory.cs create mode 100644 Lagrange.OneBot/Core/Network/Service/ReverseWSServiceFactory.cs diff --git a/Lagrange.Core.Test/Tests/NTLoginTest.cs b/Lagrange.Core.Test/Tests/NTLoginTest.cs index 151dfe50c..97dd9d3d8 100644 --- a/Lagrange.Core.Test/Tests/NTLoginTest.cs +++ b/Lagrange.Core.Test/Tests/NTLoginTest.cs @@ -48,7 +48,7 @@ public async Task LoginByPassword() if (captcha != null && randStr != null) bot.SubmitCaptcha(captcha, randStr); }; - bot.Invoker.OnGroupInvitationReceived += async (_, @event) => + bot.Invoker.OnGroupInvitationReceived += (_, @event) => { Console.WriteLine(@event.ToString()); }; diff --git a/Lagrange.OneBot/Core/Message/MessageService.cs b/Lagrange.OneBot/Core/Message/MessageService.cs index 7fb4b9deb..294d72b83 100644 --- a/Lagrange.OneBot/Core/Message/MessageService.cs +++ b/Lagrange.OneBot/Core/Message/MessageService.cs @@ -57,7 +57,7 @@ private void OnFriendMessageReceived(BotContext bot, FriendMessageEvent e) Message = Convert(e.Chain) }; - _service.SendJsonAsync(request); + _ =_service.SendJsonAsync(request); } private void OnGroupMessageReceived(BotContext bot, GroupMessageEvent e) @@ -65,7 +65,7 @@ private void OnGroupMessageReceived(BotContext bot, GroupMessageEvent e) var request = new OneBotGroupMsg(bot.UpdateKeystore().Uin, e.Chain.GroupUin ?? 0,Convert(e.Chain), e.Chain.GroupMemberInfo ?? throw new Exception("Group member not found")); - _service.SendJsonAsync(request); + _ = _service.SendJsonAsync(request); } private void OnTempMessageReceived(BotContext bot, TempMessageEvent e) diff --git a/Lagrange.OneBot/Core/Network/LagrangeWebSvcCollection.cs b/Lagrange.OneBot/Core/Network/LagrangeWebSvcCollection.cs index f6b656bf1..e5e6fc405 100644 --- a/Lagrange.OneBot/Core/Network/LagrangeWebSvcCollection.cs +++ b/Lagrange.OneBot/Core/Network/LagrangeWebSvcCollection.cs @@ -1,85 +1,132 @@ -using Lagrange.Core; using Lagrange.OneBot.Core.Network.Service; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace Lagrange.OneBot.Core.Network; -public class LagrangeWebSvcCollection : Dictionary, IHostedService +public sealed partial class LagrangeWebSvcCollection : IHostedService { private const string Tag = nameof(LagrangeWebSvcCollection); - public event EventHandler OnMessageReceived = delegate { }; + public event EventHandler? OnMessageReceived; - public LagrangeWebSvcCollection(BotContext context, IConfiguration global, ILogger logger) - { - uint uin = context.BotUin; - - if (global.GetSection("Implementations").Exists()) - { - logger.LogInformation($"[{Tag}]: Multi Connection has been configured"); + private readonly IServiceProvider _services; - foreach (var section in global.GetSection("Implementations").GetChildren()) - { - ILagrangeWebService? service = section["Type"] switch - { - "ReverseWebSocket" => new ReverseWSService(section, logger, uin), - "ForwardWebSocket" => new ForwardWSService(section, logger, uin), - _ => null - }; + private readonly IConfiguration _config; - if (service == null) logger.LogWarning($"[{Tag}]: unknown type of service of {section["Type"]} is configured, skipped"); - else Add(Guid.NewGuid().ToString(), service); - } + private readonly ILogger _logger; + + private readonly List<(IServiceScope, ILagrangeWebService)> _webServices; + + public LagrangeWebSvcCollection(IServiceProvider services, IConfiguration config, ILogger logger) + { + _services = services; + _config = config; + _logger = logger; + _webServices = new List<(IServiceScope, ILagrangeWebService)>(); + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + var implsSection = _config.GetSection("Implementations"); + if (implsSection.Exists()) + { + Log.LogMultiConnection(_logger, Tag); } - else if (global.GetSection("Implementation").Exists()) + else { - logger.LogInformation($"[{Tag}]: Single Connection has been configured"); - - string identifier = Guid.NewGuid().ToString(); - if (global.GetSection("Implementation:ReverseWebSocket").Exists()) - { - this[identifier] = new ReverseWSService(global.GetSection("Implementation:ReverseWebSocket"), logger, uin); - } - else if (global.GetSection("Implementation:ForwardWebSocket").Exists()) + implsSection = _config.GetSection("Implementation"); + if (!implsSection.Exists()) { - this[identifier] = new ForwardWSService(global.GetSection("Implementation:ForwardWebSocket"), logger, uin); + Log.LogNoConnection(_logger, Tag); + return; } - } - else - { - logger.LogWarning($"[{Tag}]: No implementation has been configured"); + Log.LogSingleConnection(_logger, Tag); } - foreach (var (identifier, service) in this) + foreach (var section in implsSection.GetChildren()) { - service.OnMessageReceived += (sender, args) => + var scope = _services.CreateScope(); + var services = scope.ServiceProvider; + var factory = services.GetRequiredService(); + factory.SetConfig(section); + var webService = services.GetRequiredService(); + webService.OnMessageReceived += (sender, args) => { - OnMessageReceived.Invoke(sender, new MsgRecvEventArgs(args.Data, identifier)); + OnMessageReceived?.Invoke(sender, new MsgRecvEventArgs(args.Data)); }; + try + { + await webService.StartAsync(cancellationToken); + _webServices.Add((scope, webService)); + } + catch (Exception e) + { + Log.LogWebServiceStartFailed(_logger, e, Tag); + scope.Dispose(); + } } } - - public async Task StartAsync(CancellationToken cancellationToken) - { - foreach (var (_, service) in this) await service.StartAsync(cancellationToken); - } public async Task StopAsync(CancellationToken cancellationToken) { - foreach (var (_, service) in this) await service.StopAsync(cancellationToken); + foreach (var (scope, service) in _webServices) + { + try + { + await service.StopAsync(cancellationToken); + } + catch (Exception e) + { + Log.LogWebServiceStopFailed(_logger, e, Tag); + } + finally + { + scope.Dispose(); + } + } } - public async Task SendJsonAsync(T json, string? identifier = null, CancellationToken cancellationToken = default) + public async Task SendJsonAsync(T json, CancellationToken cancellationToken = default) { - if (identifier == null) + foreach (var (_, service) in _webServices) { - foreach (var (_, service) in this) await service.SendJsonAsync(json, cancellationToken); - } - else - { - if (TryGetValue(identifier, out var service)) await service.SendJsonAsync(json, cancellationToken); + try + { + var vt = service.SendJsonAsync(json, cancellationToken); + if (!vt.IsCompletedSuccessfully) + { + var t = vt.AsTask(); + await t.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken); + } + } + catch (Exception e) + { + Log.LogWebServiceSendFailed(_logger, e, Tag); + } } } + + private static partial class Log + { + [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = "[{tag}]: Multi Connection has been configured")] + public static partial void LogMultiConnection(ILogger logger, string tag); + + [LoggerMessage(EventId = 2, Level = LogLevel.Information, Message = "[{tag}]: Single Connection has been configured")] + public static partial void LogSingleConnection(ILogger logger, string tag); + + [LoggerMessage(EventId = 3, Level = LogLevel.Warning, Message = "[{Tag}]: No implementation has been configured")] + public static partial void LogNoConnection(ILogger logger, string tag); + + [LoggerMessage(EventId = 4, Level = LogLevel.Warning, Message = "[{Tag}]: WebService start failed.")] + public static partial void LogWebServiceStartFailed(ILogger logger, Exception e, string tag); + + [LoggerMessage(EventId = 5, Level = LogLevel.Warning, Message = "[{Tag}]: WebService stop failed.")] + public static partial void LogWebServiceStopFailed(ILogger logger, Exception e, string tag); + + [LoggerMessage(EventId = 6, Level = LogLevel.Warning, Message = "[{Tag}]: WebService send message failed.")] + public static partial void LogWebServiceSendFailed(ILogger logger, Exception e, string tag); + } } \ No newline at end of file diff --git a/Lagrange.OneBot/Core/Network/MsgRecvEventArgs.cs b/Lagrange.OneBot/Core/Network/MsgRecvEventArgs.cs index 1990e04c0..6ff5ff0b5 100644 --- a/Lagrange.OneBot/Core/Network/MsgRecvEventArgs.cs +++ b/Lagrange.OneBot/Core/Network/MsgRecvEventArgs.cs @@ -1,8 +1,6 @@ namespace Lagrange.OneBot.Core.Network; -public class MsgRecvEventArgs(string data, string? identifier = null) : EventArgs +public class MsgRecvEventArgs(string data) : EventArgs { - public string? Identifier { get; init; } = identifier; - public string Data { get; init; } = data; } diff --git a/Lagrange.OneBot/Core/Network/Options/ForwardWSServiceOptions.cs b/Lagrange.OneBot/Core/Network/Options/ForwardWSServiceOptions.cs new file mode 100644 index 000000000..d88c2f27f --- /dev/null +++ b/Lagrange.OneBot/Core/Network/Options/ForwardWSServiceOptions.cs @@ -0,0 +1,10 @@ +namespace Lagrange.OneBot.Core.Network.Options +{ + public class ForwardWSServiceOptions : WSServiceOptions + { + public ForwardWSServiceOptions(string host, uint port, uint heartBeatInterval, string? accessToken) : base(host, port, heartBeatInterval, accessToken) + { + + } + } +} diff --git a/Lagrange.OneBot/Core/Network/Options/ReverseWSServiceOptions.cs b/Lagrange.OneBot/Core/Network/Options/ReverseWSServiceOptions.cs new file mode 100644 index 000000000..e3bedded7 --- /dev/null +++ b/Lagrange.OneBot/Core/Network/Options/ReverseWSServiceOptions.cs @@ -0,0 +1,12 @@ +namespace Lagrange.OneBot.Core.Network.Options +{ + public sealed class ReverseWSServiceOptions : WSServiceOptions + { + public string Suffix { get; set; } + + public ReverseWSServiceOptions(string host, uint port, uint heartBeatInterval, string suffix, string? accessToken) : base(host, port, heartBeatInterval, accessToken) + { + Suffix = suffix; + } + } +} diff --git a/Lagrange.OneBot/Core/Network/Options/WSServiceOptions.cs b/Lagrange.OneBot/Core/Network/Options/WSServiceOptions.cs new file mode 100644 index 000000000..d232cced3 --- /dev/null +++ b/Lagrange.OneBot/Core/Network/Options/WSServiceOptions.cs @@ -0,0 +1,21 @@ +namespace Lagrange.OneBot.Core.Network.Options +{ + public abstract class WSServiceOptions + { + public string Host { get; set; } + + public uint Port { get; set; } + + public uint HeartBeatInterval { get; set; } + + public string? AccessToken { get; set; } + + protected WSServiceOptions(string host, uint port, uint heartBeatInterval, string? accessToken) + { + Host = host; + Port = port; + HeartBeatInterval = heartBeatInterval; + AccessToken = accessToken; + } + } +} diff --git a/Lagrange.OneBot/Core/Network/Service/DefaultLagrangeWebServiceFactory.cs b/Lagrange.OneBot/Core/Network/Service/DefaultLagrangeWebServiceFactory.cs new file mode 100644 index 000000000..ea376d56d --- /dev/null +++ b/Lagrange.OneBot/Core/Network/Service/DefaultLagrangeWebServiceFactory.cs @@ -0,0 +1,50 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Lagrange.OneBot.Core.Network.Service +{ + public class DefaultLagrangeWebServiceFactory : LagrangeWebServiceFactory + { + public DefaultLagrangeWebServiceFactory(IServiceProvider services) : base(services) + { + + } + + public override ILagrangeWebService? Create() + { + var config = _config ?? throw new InvalidOperationException("Configuration must be provided"); + var type = config["Type"]; + if (!string.IsNullOrEmpty(type)) + { + return type switch + { + "ReverseWebSocket" => Create(config), + "ForwardWebSocket" => Create(config), + _ => null + }; + } + var rws = config.GetSection("ReverseWebSocket"); + if (rws.Exists()) + { + return Create(rws); + } + var fws = config.GetSection("ForwardWebSocket"); + if (fws.Exists()) + { + return Create(fws); + } + return null; + } + + protected ILagrangeWebService? Create(IConfiguration config) where TService : ILagrangeWebService + { + var factory = _services.GetService>(); + if (factory == null) + { + return null; + } + factory.SetConfig(config); + return factory.Create(); + } + } +} diff --git a/Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs b/Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs index ed1a501ae..1d9f1e09c 100644 --- a/Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs +++ b/Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs @@ -1,17 +1,28 @@ using System.Net.NetworkInformation; +using System.Runtime.InteropServices; using System.Text.Json; using Fleck; +using Lagrange.Core; using Lagrange.OneBot.Core.Entity.Meta; +using Lagrange.OneBot.Core.Network.Options; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace Lagrange.OneBot.Core.Network.Service; -public sealed class ForwardWSService : LagrangeWSService +public sealed class ForwardWSService : ILagrangeWebService { private const string Tag = nameof(ForwardWSService); - public override event EventHandler? OnMessageReceived = delegate { }; - + + public event EventHandler? OnMessageReceived; + + private readonly ForwardWSServiceOptions _options; + + private readonly ILogger _logger; + + private readonly BotContext _context; + private readonly WebSocketServer _server; private IWebSocketConnection? _connection; @@ -20,25 +31,26 @@ public sealed class ForwardWSService : LagrangeWSService private readonly string _accessToken; - public ForwardWSService(IConfiguration config, ILogger logger, uint uin) : base(config, logger, uin) + public ForwardWSService(IOptions options, ILogger logger, BotContext context) { - string url = $"ws://{config["Host"]}:{config["Port"]}"; - - _server = new WebSocketServer(url) + _options = options.Value; + _logger = logger; + _context = context; + _server = new WebSocketServer($"ws://{_options.Host}:{_options.Port}") { RestartAfterListenError = true }; - _timer = new Timer(OnHeartbeat, null, 1, config.GetValue("HeartBeatInterval")); - _accessToken = string.IsNullOrEmpty(config["AccessToken"]) ? "" : config["AccessToken"]!; + _timer = new Timer(OnHeartbeat, null, 1, _options.HeartBeatInterval); + _accessToken = _options.AccessToken ?? ""; } - public override Task StartAsync(CancellationToken cancellationToken) + public Task StartAsync(CancellationToken cancellationToken) { - uint port = Config.GetValue("Port") ?? throw new Exception("Port is not defined"); + uint port = _options.Port; if (IsPortInUse(port)) { - Logger.LogCritical($"[{Tag}] The port {port} is in use, {Tag} failed to start"); + _logger.LogCritical($"[{Tag}] The port {port} is in use, {Tag} failed to start"); return Task.CompletedTask; } @@ -50,7 +62,7 @@ public override Task StartAsync(CancellationToken cancellationToken) conn.OnMessage = s => { - Logger.LogTrace($"[{Tag}] Receive: {s}"); + _logger.LogTrace($"[{Tag}] Receive: {s}"); OnMessageReceived?.Invoke(this, new MsgRecvEventArgs(s)); }; @@ -65,24 +77,24 @@ public override Task StartAsync(CancellationToken cancellationToken) } } - Logger.LogInformation($"[{Tag}]: Connected"); + _logger.LogInformation($"[{Tag}]: Connected"); - var lifecycle = new OneBotLifecycle(Uin, "connect"); + var lifecycle = new OneBotLifecycle(_context.BotUin, "connect"); SendJsonAsync(lifecycle, cancellationToken).GetAwaiter().GetResult(); - _timer.Change(0, Config.GetValue("HeartBeatInterval")); + _timer.Change(0, _options.HeartBeatInterval); }; conn.OnClose = () => { - Logger.LogWarning($"[{Tag}: Disconnected]"); + _logger.LogWarning($"[{Tag}: Disconnected]"); _connection = null; }; }); }, cancellationToken); } - public override Task StopAsync(CancellationToken cancellationToken) + public Task StopAsync(CancellationToken cancellationToken) { _timer.Dispose(); _server.ListenerSocket.Close(); @@ -91,14 +103,27 @@ public override Task StopAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - public override async ValueTask SendJsonAsync(T json, CancellationToken cancellationToken = default) + public ValueTask SendJsonAsync(T json, CancellationToken cancellationToken = default) { string payload = JsonSerializer.Serialize(json); - - Logger.LogTrace($"[{Tag}] Send: {payload}"); - await (_connection?.Send(payload) ?? Task.CompletedTask); + _logger.LogTrace($"[{Tag}] Send: {payload}"); + + var connection = _connection; + if (connection != null) + { + return new ValueTask(connection.Send(payload)); + } + return default; } - + + private void OnHeartbeat(object? sender) + { + var status = new OneBotStatus(true, true); + var heartBeat = new OneBotHeartBeat(_context.BotUin, (int)_options.HeartBeatInterval, status); + + _ = SendJsonAsync(heartBeat); + } + private static bool IsPortInUse(uint port) { return IPGlobalProperties.GetIPGlobalProperties().GetActiveTcpListeners().Any(endpoint => endpoint.Port == port); diff --git a/Lagrange.OneBot/Core/Network/Service/ForwardWSServiceFactory.cs b/Lagrange.OneBot/Core/Network/Service/ForwardWSServiceFactory.cs new file mode 100644 index 000000000..58dd18644 --- /dev/null +++ b/Lagrange.OneBot/Core/Network/Service/ForwardWSServiceFactory.cs @@ -0,0 +1,23 @@ +using Lagrange.OneBot.Core.Network.Options; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Lagrange.OneBot.Core.Network.Service +{ + public sealed class ForwardWSServiceFactory : LagrangeWebServiceFactory, ILagrangeWebServiceFactory + { + public ForwardWSServiceFactory(IServiceProvider services) : base(services) + { + + } + + public override ILagrangeWebService? Create() + { + var config = _config ?? throw new InvalidOperationException("Configuration must be provided"); + var options = _services.GetRequiredService>(); + config.Bind(options.Value); + return _services.GetRequiredService(); + } + } +} diff --git a/Lagrange.OneBot/Core/Network/Service/HttpPostService.cs b/Lagrange.OneBot/Core/Network/Service/HttpPostService.cs index 10e63d241..d67138047 100644 --- a/Lagrange.OneBot/Core/Network/Service/HttpPostService.cs +++ b/Lagrange.OneBot/Core/Network/Service/HttpPostService.cs @@ -43,9 +43,9 @@ public async Task StopAsync(CancellationToken cancellationToken) await SendJsonAsync(lifecycle, cancellationToken); } - public async ValueTask SendJsonAsync(T json, CancellationToken cancellationToken = default) + public ValueTask SendJsonAsync(T json, CancellationToken cancellationToken = default) { string payload = JsonSerializer.Serialize(json); - await _client.PostAsync(_url, new StringContent(payload, Encoding.UTF8), cancellationToken); + return new ValueTask(_client.PostAsync(_url, new StringContent(payload, Encoding.UTF8), cancellationToken)); } } \ No newline at end of file diff --git a/Lagrange.OneBot/Core/Network/Service/ILagrangeWebService.cs b/Lagrange.OneBot/Core/Network/Service/ILagrangeWebService.cs index 8fcbe4b1f..e072c1202 100644 --- a/Lagrange.OneBot/Core/Network/Service/ILagrangeWebService.cs +++ b/Lagrange.OneBot/Core/Network/Service/ILagrangeWebService.cs @@ -7,4 +7,4 @@ public interface ILagrangeWebService : IHostedService public event EventHandler OnMessageReceived; public ValueTask SendJsonAsync(T json, CancellationToken cancellationToken = default); -} \ No newline at end of file +} diff --git a/Lagrange.OneBot/Core/Network/Service/LagrangeWSService.cs b/Lagrange.OneBot/Core/Network/Service/LagrangeWSService.cs deleted file mode 100644 index 765bb01d1..000000000 --- a/Lagrange.OneBot/Core/Network/Service/LagrangeWSService.cs +++ /dev/null @@ -1,29 +0,0 @@ -using Lagrange.OneBot.Core.Entity.Meta; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Logging; - -namespace Lagrange.OneBot.Core.Network.Service; - -public abstract class LagrangeWSService(IConfiguration config, ILogger logger, uint uin) : ILagrangeWebService -{ - protected readonly ILogger Logger = logger; - - protected readonly IConfiguration Config = config; - - protected readonly uint Uin = uin; - - protected void OnHeartbeat(object? sender) - { - var status = new OneBotStatus(true, true); - var heartBeat = new OneBotHeartBeat(Uin, Config.GetValue("HeartBeatInterval"), status); - - SendJsonAsync(heartBeat); - } - - public abstract Task StartAsync(CancellationToken cancellationToken); - - public abstract Task StopAsync(CancellationToken cancellationToken); - - public abstract event EventHandler? OnMessageReceived; - public abstract ValueTask SendJsonAsync(T json, CancellationToken cancellationToken = default); -} \ No newline at end of file diff --git a/Lagrange.OneBot/Core/Network/Service/LagrangeWebServiceFactory.cs b/Lagrange.OneBot/Core/Network/Service/LagrangeWebServiceFactory.cs new file mode 100644 index 000000000..fddcdf3da --- /dev/null +++ b/Lagrange.OneBot/Core/Network/Service/LagrangeWebServiceFactory.cs @@ -0,0 +1,35 @@ +using Microsoft.Extensions.Configuration; + +namespace Lagrange.OneBot.Core.Network.Service +{ + public interface ILagrangeWebServiceFactory + { + void SetConfig(IConfiguration config); + + ILagrangeWebService? Create(); + } + + public interface ILagrangeWebServiceFactory : ILagrangeWebServiceFactory where TService : ILagrangeWebService + { + + } + + public abstract class LagrangeWebServiceFactory : ILagrangeWebServiceFactory + { + protected readonly IServiceProvider _services; + + protected IConfiguration? _config; + + protected LagrangeWebServiceFactory(IServiceProvider services) + { + _services = services; + } + + public void SetConfig(IConfiguration config) + { + _config = config; + } + + public abstract ILagrangeWebService? Create(); + } +} diff --git a/Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs b/Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs index a4a6caf60..fa1308b89 100644 --- a/Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs +++ b/Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs @@ -1,126 +1,191 @@ using System.Net.WebSockets; -using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; -using Lagrange.Core.Utility.Extension; +using Lagrange.Core; using Lagrange.OneBot.Core.Entity.Meta; -using Microsoft.Extensions.Configuration; +using Lagrange.OneBot.Core.Network.Options; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace Lagrange.OneBot.Core.Network.Service; -public class ReverseWSService : LagrangeWSService +public partial class ReverseWSService : BackgroundService, ILagrangeWebService { - private const string Tag = nameof(ReverseWSService); + protected const string Tag = nameof(ReverseWSService); - public override event EventHandler? OnMessageReceived = delegate { }; + public event EventHandler? OnMessageReceived; - private ClientWebSocket _socket; - - private readonly Timer _timer; + protected readonly ReverseWSServiceOptions _options; - public ReverseWSService(IConfiguration config, ILogger logger, uint uin) : base(config, logger, uin) - { - _socket = SetupSocket(); - _timer = new Timer(OnHeartbeat, null, -1, config.GetValue("HeartBeatInterval")); - } + protected readonly ILogger _logger; + protected readonly BotContext _botCtx; - public override async Task StartAsync(CancellationToken cancellationToken) + protected ConnectionContext? _connCtx; + + protected sealed class ConnectionContext : IDisposable { - await _socket.ConnectAsync(new Uri($"ws://{Config["Host"]}:{Config["Port"]}{Config["Suffix"]}"), cancellationToken); - _ = ReceiveLoop(cancellationToken); - - var lifecycle = new OneBotLifecycle(Uin, "connect"); - await SendJsonAsync(lifecycle, cancellationToken); + public readonly ClientWebSocket WebSocket; + + public readonly Task ConnectTask; + + private readonly CancellationTokenSource _cts; - _timer.Change(0, Config.GetValue("HeartBeatInterval")); + public CancellationToken Token => _cts.Token; + + public ConnectionContext(ClientWebSocket webSocket, Task connectTask) + { + WebSocket = webSocket; + ConnectTask = connectTask; + _cts = new CancellationTokenSource(); + } + + public void Dispose() + { + _cts.Cancel(); + } } - public override Task StopAsync(CancellationToken cancellationToken) + public ReverseWSService(IOptions options, ILogger logger, BotContext context) { - _timer.Dispose(); - _socket.Dispose(); - - return Task.CompletedTask; + _options = options.Value; + _logger = logger; + _botCtx = context; } - public override async ValueTask SendJsonAsync(T json, CancellationToken cancellationToken = default) + public ValueTask SendJsonAsync(T payload, CancellationToken cancellationToken = default) { - if (_socket.State is WebSocketState.Closed or WebSocketState.None) + var connCtx = _connCtx ?? throw new InvalidOperationException("Reverse webSocket service was not running"); + var connTask = connCtx.ConnectTask; + if (!connTask.IsCompletedSuccessfully) { - Logger.LogWarning($"[{Tag}] Detected Disconnect, scheduling reconnect"); - await Reconnect(cancellationToken); + return SendJsonAsync(connCtx.WebSocket, connTask, payload, connCtx.Token); } + return SendJsonAsync(connCtx.WebSocket, payload, connCtx.Token); + } - string payload = JsonSerializer.Serialize(json); - - Logger.LogTrace($"[{Tag}] Send: {payload}"); - await _socket.SendAsync(Encoding.UTF8.GetBytes(payload).AsMemory(), WebSocketMessageType.Text, true, cancellationToken); + protected async ValueTask SendJsonAsync(ClientWebSocket ws, Task connectTask, T payload, CancellationToken token) + { + await connectTask; + await SendJsonAsync(ws, payload, token); } - private async Task ReceiveLoop(CancellationToken cancellationToken) + protected ValueTask SendJsonAsync(ClientWebSocket ws, T payload, CancellationToken token) { - try - { - await Task.CompletedTask.ForceAsync(); - var buffer = new byte[64 * 1024 * 1024]; - - while (true) - { - var result = await _socket.ReceiveAsync(buffer.AsMemory(), cancellationToken); - byte[] newBuffer = new byte[result.Count]; - Unsafe.CopyBlock(ref newBuffer[0], ref buffer[0], (uint)result.Count); + var json = JsonSerializer.Serialize(payload); + var buffer = Encoding.UTF8.GetBytes(json); + Log.LogSendingData(_logger, Tag, json); + return ws.SendAsync(buffer.AsMemory(), WebSocketMessageType.Text, true, token); + } - string text = Encoding.UTF8.GetString(newBuffer); - Logger.LogTrace($"[{Tag}] Receive: {text}"); - OnMessageReceived?.Invoke(this, new MsgRecvEventArgs(text)); - } - } - catch + protected ClientWebSocket CreateDefaultWebSocket() + { + var ws = new ClientWebSocket(); + ws.Options.SetRequestHeader("X-Client-Role", "Universal"); + ws.Options.SetRequestHeader("X-Self-ID", _botCtx.BotUin.ToString()); + ws.Options.SetRequestHeader("User-Agent", Constant.OneBotImpl); + if (_options.AccessToken != null) { - // + ws.Options.SetRequestHeader("Authorization", $"Bearer {_options.AccessToken}"); } + ws.Options.KeepAliveInterval = Timeout.InfiniteTimeSpan; + return ws; } - private async Task Reconnect(CancellationToken cancellationToken = default) + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - if (_socket.State is WebSocketState.Open or WebSocketState.Connecting) return; - if (_socket.State == WebSocketState.Closed) + string urlstr = $"ws://{_options.Host}:{_options.Port}{_options.Suffix}"; + if (!Uri.TryCreate(urlstr, UriKind.Absolute, out var url)) { - _socket.Dispose(); - _socket = SetupSocket(); + Log.LogInvalidUrl(_logger, Tag, urlstr); + return; } - - try + while (true) { - await _socket.ConnectAsync(new Uri($"ws://{Config["Host"]}:{Config["Port"]}{Config["Suffix"]}"), cancellationToken); - Logger.LogInformation($"[{Tag}] Reconnected Successfully"); + try + { + using var ws = CreateDefaultWebSocket(); + var connTask = ws.ConnectAsync(url, stoppingToken); + using var connCtx = new ConnectionContext(ws, connTask); + _connCtx = connCtx; + await connTask; + + var lifecycle = new OneBotLifecycle(_botCtx.BotUin, "connect"); + await SendJsonAsync(ws, lifecycle, stoppingToken); + + var recvTask = ReceiveLoop(ws, stoppingToken); + if (_options.HeartBeatInterval > 0) + { + var heartbeatTask = HeartbeatLoop(ws, stoppingToken); + await Task.WhenAll([recvTask, heartbeatTask]); + } + else + { + await recvTask; + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + _connCtx = null; + break; + } + catch (Exception e) + { + Log.LogClientDisconnected(_logger, e, Tag); + } } - catch + } + + private async Task ReceiveLoop(ClientWebSocket ws, CancellationToken token) + { + var buffer = new byte[1024]; + while (true) { - Logger.LogWarning($"[{Tag}] Reconnected failed"); + int rcvd = 0; + while (true) + { + var result = await ws.ReceiveAsync(buffer.AsMemory(rcvd), token); + if (result.EndOfMessage) + { + break; + } + rcvd += result.Count; + if (rcvd == buffer.Length) + { + Array.Resize(ref buffer, rcvd + 1024); + } + } + string text = Encoding.UTF8.GetString(buffer); + Log.LogDataReceived(_logger, Tag, text); + OnMessageReceived?.Invoke(this, new MsgRecvEventArgs(text)); // Handle user handlers error? } } - private ClientWebSocket SetupSocket() + private async Task HeartbeatLoop(ClientWebSocket ws, CancellationToken token) { - var socket = new ClientWebSocket(); - - SetRequestHeader(socket, new Dictionary + var interval = TimeSpan.FromSeconds(_options.HeartBeatInterval); + while (true) { - { "X-Client-Role", "Universal" }, - { "X-Self-ID", Uin.ToString() }, - { "User-Agent", Constant.OneBotImpl } - }); - if (string.IsNullOrEmpty(Config["AccessToken"])) socket.Options.SetRequestHeader("Authorization", $"Bearer {Config["AccessToken"]}"); - socket.Options.KeepAliveInterval = Timeout.InfiniteTimeSpan; - - return socket; + var status = new OneBotStatus(true, true); + var heartBeat = new OneBotHeartBeat(_botCtx.BotUin, (int)_options.HeartBeatInterval, status); + await SendJsonAsync(ws, heartBeat, token); + await Task.Delay(interval, token); + } } - - private static void SetRequestHeader(ClientWebSocket webSocket, Dictionary headers) + + private static partial class Log { - foreach (var (key, value) in headers) webSocket.Options.SetRequestHeader(key, value); + [LoggerMessage(EventId = 1, Level = LogLevel.Trace, Message = "[{tag}] Send: {data}")] + public static partial void LogSendingData(ILogger logger, string tag, string data); + + [LoggerMessage(EventId = 2, Level = LogLevel.Trace, Message = "[{tag}] Receive: {data}")] + public static partial void LogDataReceived(ILogger logger, string tag, string data); + + [LoggerMessage(EventId = 3, Level = LogLevel.Warning, Message = "[{tag}] Client disconnected")] + public static partial void LogClientDisconnected(ILogger logger, Exception e, string tag); + + [LoggerMessage(EventId = 10, Level = LogLevel.Error, Message = "[{tag}] Invalid configuration was detected, url: {url}")] + public static partial void LogInvalidUrl(ILogger logger, string tag, string url); } } \ No newline at end of file diff --git a/Lagrange.OneBot/Core/Network/Service/ReverseWSServiceFactory.cs b/Lagrange.OneBot/Core/Network/Service/ReverseWSServiceFactory.cs new file mode 100644 index 000000000..4f4ec6760 --- /dev/null +++ b/Lagrange.OneBot/Core/Network/Service/ReverseWSServiceFactory.cs @@ -0,0 +1,23 @@ +using Lagrange.OneBot.Core.Network.Options; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Lagrange.OneBot.Core.Network.Service +{ + public sealed class ReverseWSServiceFactory : LagrangeWebServiceFactory, ILagrangeWebServiceFactory + { + public ReverseWSServiceFactory(IServiceProvider services) : base(services) + { + + } + + public override ILagrangeWebService? Create() + { + var config = _config ?? throw new InvalidOperationException("Configuration must be provided"); + var options = _services.GetRequiredService>(); + config.Bind(options.Value); + return _services.GetRequiredService(); + } + } +} diff --git a/Lagrange.OneBot/Core/Operation/OperationService.cs b/Lagrange.OneBot/Core/Operation/OperationService.cs index f323fe255..8d90ed532 100644 --- a/Lagrange.OneBot/Core/Operation/OperationService.cs +++ b/Lagrange.OneBot/Core/Operation/OperationService.cs @@ -13,14 +13,12 @@ public sealed class OperationService { private readonly BotContext _bot; private readonly ILogger _logger; - private readonly LagrangeWebSvcCollection _service; private readonly Dictionary _operations; public OperationService(BotContext bot, ILogger logger, LagrangeWebSvcCollection service) { _bot = bot; _logger = logger; - _service = service; _operations = new Dictionary(); foreach (var type in Assembly.GetExecutingAssembly().GetTypes()) @@ -29,38 +27,41 @@ public OperationService(BotContext bot, ILogger logger, LagrangeWeb if (attribute != null) _operations[attribute.Api] = (IOperation)type.CreateInstance(false); } - service.OnMessageReceived += async (_, e) => await HandleOperation(e); + service.OnMessageReceived += (s, e) => _ = HandleOperation(s, e); } - private async Task HandleOperation(MsgRecvEventArgs eventArgs) + private async Task HandleOperation(object? sender, MsgRecvEventArgs eventArgs) { + if (sender is not ILagrangeWebService webService) + { + _logger.LogWarning("Json Serialization failed for such action"); + return; + } if (JsonSerializer.Deserialize(eventArgs.Data) is { } action) { try { - bool supported = _operations.TryGetValue(action.Action, out var handler); - - if (supported && handler != null) + if (_operations.TryGetValue(action.Action, out var handler)) { var result = await handler.HandleOperation(_bot, action.Params); result.Echo = action.Echo; - await _service.SendJsonAsync(result, eventArgs.Identifier); + await webService.SendJsonAsync(result); } else { - await _service.SendJsonAsync(new OneBotResult(null, 404, "failed") { Echo = action.Echo }, eventArgs.Identifier); + await webService.SendJsonAsync(new OneBotResult(null, 404, "failed") { Echo = action.Echo }); } } catch (Exception e) { - _logger.LogWarning(e.ToString()); - await _service.SendJsonAsync(new OneBotResult(null, 200, "failed") { Echo = action.Echo }, eventArgs.Identifier); + _logger.LogWarning(e, "Unexpected error encountered while handling message."); + await webService.SendJsonAsync(new OneBotResult(null, 200, "failed") { Echo = action.Echo }); } } else { - _logger.LogWarning($"Json Serialization failed for such action"); + _logger.LogWarning("Json Serialization failed for such action"); } } } \ No newline at end of file diff --git a/Lagrange.OneBot/LagrangeAppBuilder.cs b/Lagrange.OneBot/LagrangeAppBuilder.cs index 1628ca089..79defaba8 100644 --- a/Lagrange.OneBot/LagrangeAppBuilder.cs +++ b/Lagrange.OneBot/LagrangeAppBuilder.cs @@ -4,6 +4,7 @@ using Lagrange.Core.Utility.Sign; using Lagrange.OneBot.Core.Message; using Lagrange.OneBot.Core.Network; +using Lagrange.OneBot.Core.Network.Service; using Lagrange.OneBot.Core.Notify; using Lagrange.OneBot.Core.Operation; using Lagrange.OneBot.Database; @@ -79,7 +80,15 @@ public LagrangeAppBuilder ConfigureBots() public LagrangeAppBuilder ConfigureOneBot() { Services.AddSingleton(); - + Services.AddOptions(); + + Services.AddScoped, ForwardWSServiceFactory>(); + Services.AddScoped(); + Services.AddScoped, ReverseWSServiceFactory>(); + Services.AddScoped(); + Services.AddScoped(); + Services.AddScoped(services => services.GetRequiredService().Create()!); + Services.AddSingleton(); Services.AddSingleton(); From 047bf09bab7b2881893989550a03303ec7f6a224 Mon Sep 17 00:00:00 2001 From: Executor Date: Wed, 22 Nov 2023 16:42:53 +0800 Subject: [PATCH 2/2] Use IOptionsSnapshot instead of IOptions to resolve options correctly --- Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs | 2 +- Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs b/Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs index 1d9f1e09c..ebfd4adcd 100644 --- a/Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs +++ b/Lagrange.OneBot/Core/Network/Service/ForwardWSService.cs @@ -31,7 +31,7 @@ public sealed class ForwardWSService : ILagrangeWebService private readonly string _accessToken; - public ForwardWSService(IOptions options, ILogger logger, BotContext context) + public ForwardWSService(IOptionsSnapshot options, ILogger logger, BotContext context) { _options = options.Value; _logger = logger; diff --git a/Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs b/Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs index fa1308b89..a04e6d065 100644 --- a/Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs +++ b/Lagrange.OneBot/Core/Network/Service/ReverseWSService.cs @@ -47,7 +47,7 @@ public void Dispose() } } - public ReverseWSService(IOptions options, ILogger logger, BotContext context) + public ReverseWSService(IOptionsSnapshot options, ILogger logger, BotContext context) { _options = options.Value; _logger = logger;