diff --git a/src/OpenTask.Application/Core/DiscoveryFromDb.cs b/src/OpenTask.Application/Core/DiscoveryFromDb.cs index 5e5eb4a..0d312f0 100644 --- a/src/OpenTask.Application/Core/DiscoveryFromDb.cs +++ b/src/OpenTask.Application/Core/DiscoveryFromDb.cs @@ -96,6 +96,7 @@ private void discoveryTimerElapsed(object? sender, ElapsedEventArgs e) [Obsolete] private async Task updateSubscriber(IEnumerable latestNodes) { + logger.LogDebug($"[updateSubscriber] 节点数:{latestNodes.Count()}"); foreach (Domain.Servers.OpenTaskServer item in latestNodes) { // 新增的节点 @@ -188,24 +189,24 @@ private async Task TryReslot() logger.LogInformation($"当前监听节点列表:{String.Join(",", clusterSubscribers.Select(x => x.Key))}"); - // 1分钟内最多只有一个节点执行一次 - if (!lockerService.TryLock(LOCK_KEY, myMqttServer.Identifier, 60, out Locker? locker)) - { - logger.LogDebug($"[获取锁失败]:{locker?.Version}@{locker?.LockedAt}"); - return; - } - // 服务发现 IEnumerable latestNodes = FindAllOnlineServer(); await updateSubscriber(latestNodes); - if (IsWholeSlot(latestNodes)) + // 1分钟内最多只有一个节点执行一次 + if (!lockerService.TryLock(LOCK_KEY, myMqttServer.Identifier, 60, out Locker? locker)) { + logger.LogDebug($"[获取锁失败]:{locker?.Version}@{locker?.LockedAt}"); return; } try { + if (IsWholeSlot(latestNodes)) + { + return; + } + logger.LogInformation($"[集群节点分配不均] 开始重新分配slot,当前节点数:{latestNodes.Count()}"); // TODO: 存在通知失败的可能,要比对连接到当前server的其他的server, diff --git a/src/OpenTask.Application/Core/OpenTaskServer.cs b/src/OpenTask.Application/Core/OpenTaskServer.cs index c0a34d2..b351e71 100644 --- a/src/OpenTask.Application/Core/OpenTaskServer.cs +++ b/src/OpenTask.Application/Core/OpenTaskServer.cs @@ -31,8 +31,10 @@ public class OpenTaskServer : Disposable, ITaskServer public string ExternalUrl => options.ExternalUrl ?? $"{options.Ip}:{options.Port}"; + [Obsolete("通过数据库维护", true)] public ConcurrentDictionary CurrentNodeOnlineUsers { private set; get; } = new ConcurrentDictionary(); + [Obsolete("通过数据库维护", true)] public ConcurrentDictionary CurrentNodeOnlineServer { private set; get; } = new ConcurrentDictionary(); public ConcurrentDictionary> OtherNodeOlineUsers { get; private set; } = new ConcurrentDictionary>();