-
Notifications
You must be signed in to change notification settings - Fork 374
如何动态分配雪花算法的WorkerId
生成主键的方法有很多种,比如自增Id,Guid,Redis的incr,雪花算法等等。各种方法的优劣,网上有很多文章介绍,本文不做探讨。Adnc采用基于Yitter的雪花算法生成Id。
传统雪花算法id组成由1位符号位
+41位时间戳
+10位工作机器id
+12位自增序号
组成,总共64比特组成的long类型。传统雪花算法有一个不好的地方就是生成的Id太长,超过了js Number类型的最大值,js无法正确解析。
Yitter对传统雪花算法做了改进,可以自定义工作机器id
与自增序号
的长度,默认配置都是6位。在都是6位的情况下,用50年都不会超过 js Number类型最大值。同时Yitter的雪花算法也解决了系统时间回拨的问题,同时也提供了详细的文档与测试用例。
✔ 整形数字,随时间单调递增(不一定连续),长度更短,用50年都不会超过 js Number类型最大值。(默认配置)
✔ 速度更快,是传统雪花算法的2-5倍,0.1秒可生成50万个(基于8代低压i7)。
✔ 支持时间回拨处理。比如服务器时间回拨1秒,本算法能自动适应生成临界时间的唯一ID。
✔ 支持手工插入新ID。当业务需要在历史时间生成新ID时,用本算法的预留位能生成5000个每秒。
✔ 不依赖任何外部缓存和数据库。(k8s环境下自动注册 WorkerId 的动态库依赖 redis)
✔ 基础功能,开箱即用,无需配置文件、数据库连接等。
(参数:10位自增序列,1000次漂移最大值)
连续请求量 | 5K | 5W | 50W |
---|---|---|---|
传统雪花算法 | 0.0045s | 0.053s | 0.556s |
雪花漂移算法 | 0.0015s | 0.012s | 0.113s |
💍 极致性能:500W/s~3000W/s。(所有测试数据均基于8代低压i7计算)
🔶 当发生系统时间回拨时,算法采用过去时序的预留序数生成新的ID。
🔶 回拨生成的ID序号,默认靠前,也可以调整为靠后。
🔶 允许时间回拨至本算法预设基数(参数可调)。
Yitter算法有3个参数需要配置
参数 | 描述 |
---|---|
WorkerIdBitLength | 机器码位长,决定 WorkerId 的最大值,默认值6。长度6位表示支持64个实例 |
SeqBitLength | 序列数位长,默认值6,决定每毫秒基础生成的ID个数。规则要求:WorkerIdBitLength + SeqBitLength 不超过 22。 |
WorkerId | 机器Id,最重要参数,无默认值,必须 全局唯一,必须 程序设定。 |
WorkerIdBitLength与SeqBitLength这两个参数我们可以在代码直接配置
参考代码:https://github.com/AlphaYu/Adnc/blob/master/src/ServerApi/Infrastructures/Adnc.Infra.IdGenerater/Yitter/IdGenerater.cs
public static class IdGenerater
{
private static bool _isSet = false;
private static readonly object _locker = new();
public static byte WorkerIdBitLength => 6;
public static byte SeqBitLength => 6;
public static short MaxWorkerId => (short)(Math.Pow(2.0, WorkerIdBitLength) - 1);
public static short CurrentWorkerId { get; private set; } = -1;
//其它业务逻辑
}
如果是单体架构的系统,我们可以直接从配置文件获取WorkerId。但分布式或者微服务架构的系统需要在项目启动的时候动态获取workerid。Adnc是预先生成好所有workerid并保存在redis的zset里面,value = workerid,socre = 时间戳。实例启动时从通过lua脚本从zset获取socre最小的value(workerid),并同时更新socre为当前时间戳。正常获取workerid后,会有一个定时服务每隔1分钟刷新当前workerid的score。
namespace Adnc.Infra.IdGenerater.Yitter
{
public class WorkerNodeHostedService : BackgroundService
{
public WorkerNodeHostedService(ILogger<WorkerNodeHostedService> logger
, WorkerNode workerNode
, string serviceName)
{
_serviceName = serviceName;
_workerNode = workerNode;
_logger = logger;
}
public async override Task StartAsync(CancellationToken cancellationToken)
{
//预先生成好所有workerid并保存到redis
await _workerNode.InitWorkerNodesAsync(_serviceName);
//获取workerid
var workerId = await _workerNode.GetWorkerIdAsync(_serviceName);
//将获取到的workerid赋值给YitterSnowFlake.CurrentWorkerId
YitterSnowFlake.CurrentWorkerId = (short)workerId;
await base.StartAsync(cancellationToken);
}
public async override Task StopAsync(CancellationToken cancellationToken)
{
await base.StopAsync(cancellationToken);
var subtractionMilliseconds = 0 - (_millisecondsDelay * 1.5);
var score = DateTime.Now.AddMilliseconds(subtractionMilliseconds).GetTotalMilliseconds();
//实例停止时,回收当前workerid.
await _workerNode.RefreshWorkerIdScoreAsync(_serviceName, YitterSnowFlake.CurrentWorkerId, score);
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(_millisecondsDelay, stoppingToken);
if (stoppingToken.IsCancellationRequested) break;
//定时刷新YitterSnowFlake.CurrentWorkerId的score。
await _workerNode.RefreshWorkerIdScoreAsync(_serviceName, YitterSnowFlake.CurrentWorkerId);
}
catch (Exception ex)
{
_logger.LogError(ex.Message, ex);
await Task.Delay(_millisecondsDelay / 3, stoppingToken);
}
}
}
}
}
namespace Adnc.Infra.IdGenerater.Yitter
{
public class WorkerNode
{
//其他业务代码
//实例启动时会调用该方法,将所有workerid保存到zset中。
internal async Task InitWorkerNodesAsync(string serviceName)
{
var workerIdSortedSetCacheKey = string.Format(SharedCachingConsts.WorkerIdSortedSetCacheKey, serviceName);
//如果已经存在,则不需要重复生成。
if (!_redisProvider.KeyExists(workerIdSortedSetCacheKey))
{
var flag = await _distributedLocker.LockAsync(workerIdSortedSetCacheKey);
if (!flag.Success)
{
await Task.Delay(300);
await InitWorkerNodesAsync(serviceName);
}
long count = 0;
try
{
var set = new Dictionary<long, double>();
for (long index = 0; index <= YitterSnowFlake.MaxWorkerId; index++)
{
//index = workerid,score = 当前时间戳
set.Add(index, DateTime.Now.GetTotalMilliseconds());
}
//保存到zset中。
count = await _redisProvider.ZAddAsync(workerIdSortedSetCacheKey, set);
}
catch(Exception ex)
{
throw new Exception(ex.Message, ex);
}
finally
{
await _distributedLocker.SafedUnLockAsync(workerIdSortedSetCacheKey,flag.LockValue);
}
}
}
//获取workerid
internal async Task<long> GetWorkerIdAsync(string serviceName)
{
var workerIdSortedSetCacheKey = string.Format(SharedCachingConsts.WorkerIdSortedSetCacheKey, serviceName);
//通过lua脚本获取score最小的workerid,并且刷新score位当前时间戳。
var scirpt = @"local workerids = redis.call('ZRANGE', @key, @start,@stop)
redis.call('ZADD',@key,@score,workerids[1])
return workerids[1]";
var parameters = new { key = workerIdSortedSetCacheKey, start = 0, stop = 0, score = DateTime.Now.GetTotalMilliseconds() };
var luaResult = (byte[]) await _redisProvider.ScriptEvaluateAsync(scirpt, parameters);
var workerId = _redisProvider.Serializer.Deserialize<long>(luaResult);
return workerId;
}
//定时刷新当前实例workerid的score值。
internal async Task RefreshWorkerIdScoreAsync(string serviceName, long workerId, double? workerIdScore = null)
{
var workerIdSortedSetCacheKey = string.Format(SharedCachingConsts.WorkerIdSortedSetCacheKey, serviceName);
var score = workerIdScore == null ? DateTime.Now.GetTotalMilliseconds() : workerIdScore.Value;
//更新score
await _redisProvider.ZAddAsync(workerIdSortedSetCacheKey, new Dictionary<long, double> { { workerId, score } });
}
}
}
using Adnc.Infra.IdGenerater.Yitter;
namespace Adnc.XXX.Application.Services
{
public class xxxAppService : AbstractAppService, IxxxAppService
{
var id = IdGenerater.GetNextId();
}
}
WELL DONE
全文完
- 企 鹅 群:780634162
- github:https://github.com/alphayu/adnc
- 博 客:https://www.cnblogs.com/alphayu
- 官 网:https://aspdotnetcore.net
MIT
Free Software, Hell Yeah!