Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
SpringHgui committed Jul 7, 2024
1 parent a8dce7b commit b5e1ea8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
17 changes: 9 additions & 8 deletions src/OpenTask.Application/Core/DiscoveryFromDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private void discoveryTimerElapsed(object? sender, ElapsedEventArgs e)
[Obsolete]
private async Task updateSubscriber(IEnumerable<Domain.Servers.OpenTaskServer> latestNodes)
{
logger.LogDebug($"[updateSubscriber] 节点数:{latestNodes.Count()}");
foreach (Domain.Servers.OpenTaskServer item in latestNodes)
{
// 新增的节点
Expand Down Expand Up @@ -188,24 +189,24 @@ private async Task TryReslot()

logger.LogInformation($"当前监听节点列表:{String.Join(",", clusterSubscribers.Select(x => x.Key))}");

// 1分钟内最多只有一个节点执行一次
if (!lockerService.TryLock(LOCK_KEY, myMqttServer.Identifier, 60, out Locker? locker))
{
logger.LogDebug($"[获取锁失败]:{locker?.Version}@{locker?.LockedAt}");
return;
}

// 服务发现
IEnumerable<Domain.Servers.OpenTaskServer> latestNodes = FindAllOnlineServer();
await updateSubscriber(latestNodes);

if (IsWholeSlot(latestNodes))
// 1分钟内最多只有一个节点执行一次
if (!lockerService.TryLock(LOCK_KEY, myMqttServer.Identifier, 60, out Locker? locker))
{
logger.LogDebug($"[获取锁失败]:{locker?.Version}@{locker?.LockedAt}");
return;
}

try
{
if (IsWholeSlot(latestNodes))
{
return;
}

logger.LogInformation($"[集群节点分配不均] 开始重新分配slot,当前节点数:{latestNodes.Count()}");

// TODO: 存在通知失败的可能,要比对连接到当前server的其他的server,
Expand Down
2 changes: 2 additions & 0 deletions src/OpenTask.Application/Core/OpenTaskServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ public class OpenTaskServer : Disposable, ITaskServer

public string ExternalUrl => options.ExternalUrl ?? $"{options.Ip}:{options.Port}";

[Obsolete("通过数据库维护", true)]
public ConcurrentDictionary<string, ExecutorClient> CurrentNodeOnlineUsers { private set; get; } = new ConcurrentDictionary<string, ExecutorClient>();

[Obsolete("通过数据库维护", true)]
public ConcurrentDictionary<string, ExecutorClient> CurrentNodeOnlineServer { private set; get; } = new ConcurrentDictionary<string, ExecutorClient>();

public ConcurrentDictionary<string, IEnumerable<ExecutorClient>> OtherNodeOlineUsers { get; private set; } = new ConcurrentDictionary<string, IEnumerable<ExecutorClient>>();
Expand Down

0 comments on commit b5e1ea8

Please sign in to comment.