Skip to content

如何动态分配雪花算法的WorkerId

alpha.yu edited this page May 31, 2022 · 9 revisions

前言

    生成主键的方法有很多种,比如自增Id,Guid,Redis的incr,雪花算法等等。各种方法的优劣,网上有很多文章介绍,本文不做探讨。Adnc采用基于Yitter的雪花算法生成Id。

Yitter雪花算法介绍

   传统雪花算法id组成由1位符号位+41位时间戳+10位工作机器id+12位自增序号组成,总共64比特组成的long类型。传统雪花算法有一个不好的地方就是生成的Id太长,超过了js Number类型的最大值,js无法正确解析。 Yitter对传统雪花算法做了改进,可以自定义工作机器id自增序号的长度,默认配置都是6位。在都是6位的情况下,用50年都不会超过 js Number类型最大值。同时Yitter的雪花算法也解决了系统时间回拨的问题,同时也提供了详细的文档与测试用例。

Yitter算法特点

✔ 整形数字,随时间单调递增(不一定连续),长度更短,用50年都不会超过 js Number类型最大值。(默认配置)

✔ 速度更快,是传统雪花算法的2-5倍,0.1秒可生成50万个(基于8代低压i7)。

✔ 支持时间回拨处理。比如服务器时间回拨1秒,本算法能自动适应生成临界时间的唯一ID。

✔ 支持手工插入新ID。当业务需要在历史时间生成新ID时,用本算法的预留位能生成5000个每秒。

✔ 不依赖任何外部缓存和数据库。(k8s环境下自动注册 WorkerId 的动态库依赖 redis)

✔ 基础功能,开箱即用,无需配置文件、数据库连接等。

Yitter性能数据

(参数:10位自增序列,1000次漂移最大值)

连续请求量 5K 5W 50W
传统雪花算法 0.0045s 0.053s 0.556s
雪花漂移算法 0.0015s 0.012s 0.113s

💍 极致性能:500W/s~3000W/s。(所有测试数据均基于8代低压i7计算)

Yitter如何处理时间回拨

🔶 当发生系统时间回拨时,算法采用过去时序的预留序数生成新的ID。

🔶 回拨生成的ID序号,默认靠前,也可以调整为靠后。

🔶 允许时间回拨至本算法预设基数(参数可调)。

Yitter雪花算法的使用

Yitter算法有3个参数需要配置

参数 描述
WorkerIdBitLength 机器码位长,决定 WorkerId 的最大值,默认值6。长度6位表示支持64个实例
SeqBitLength 序列数位长,默认值6,决定每毫秒基础生成的ID个数。规则要求:WorkerIdBitLength + SeqBitLength 不超过 22。
WorkerId 机器Id,最重要参数,无默认值,必须 全局唯一,必须 程序设定

WorkerIdBitLength与SeqBitLength这两个参数我们可以在代码直接配置
参考代码:https://github.com/AlphaYu/Adnc/blob/master/src/ServerApi/Infrastructures/Adnc.Infra.IdGenerater/Yitter/IdGenerater.cs

public static class IdGenerater
{
    private static bool _isSet = false;
    private static readonly object _locker = new();

    public static byte WorkerIdBitLength => 6;
    public static byte SeqBitLength => 6;
    public static short MaxWorkerId => (short)(Math.Pow(2.0, WorkerIdBitLength) - 1);
    public static short CurrentWorkerId { get; private set; } = -1;
    
    //其它业务逻辑
}

如何获取WorkerId

如果是单体架构的系统,我们可以直接从配置文件获取WorkerId。但分布式或者微服务架构的系统需要在项目启动的时候动态获取workerid。Adnc是预先生成好所有workerid并保存在redis的zset里面,value = workerid,socre = 时间戳。实例启动时从通过lua脚本从zset获取socre最小的value(workerid),并同时更新socre为当前时间戳。正常获取workerid后,会有一个定时服务每隔1分钟刷新当前workerid的score。

namespace Adnc.Infra.IdGenerater.Yitter
{
    public class WorkerNodeHostedService : BackgroundService
    {
        public WorkerNodeHostedService(ILogger<WorkerNodeHostedService> logger
            , WorkerNode workerNode
            , string serviceName)
        {
                _serviceName = serviceName;
                _workerNode = workerNode;
                _logger = logger;
        }

        public async override Task StartAsync(CancellationToken cancellationToken)
        {
            //预先生成好所有workerid并保存到redis
            await _workerNode.InitWorkerNodesAsync(_serviceName);
            //获取workerid
            var workerId = await _workerNode.GetWorkerIdAsync(_serviceName);
            //将获取到的workerid赋值给YitterSnowFlake.CurrentWorkerId
            YitterSnowFlake.CurrentWorkerId = (short)workerId;
            await base.StartAsync(cancellationToken);
        }

        
        public async override Task StopAsync(CancellationToken cancellationToken)
        {
            await base.StopAsync(cancellationToken);

            var subtractionMilliseconds = 0 - (_millisecondsDelay * 1.5);
            var score = DateTime.Now.AddMilliseconds(subtractionMilliseconds).GetTotalMilliseconds();
            //实例停止时,回收当前workerid.
            await _workerNode.RefreshWorkerIdScoreAsync(_serviceName, YitterSnowFlake.CurrentWorkerId, score);
        }
        
        protected async override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await Task.Delay(_millisecondsDelay, stoppingToken);

                    if (stoppingToken.IsCancellationRequested) break;

                    //定时刷新YitterSnowFlake.CurrentWorkerId的score。
                    await _workerNode.RefreshWorkerIdScoreAsync(_serviceName, YitterSnowFlake.CurrentWorkerId);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex.Message, ex);
                    await Task.Delay(_millisecondsDelay / 3, stoppingToken);
                }
            }
        }  
    }
}
namespace Adnc.Infra.IdGenerater.Yitter
{
    public class WorkerNode
    {
        //其他业务代码
        
        //实例启动时会调用该方法,将所有workerid保存到zset中。
        internal async Task InitWorkerNodesAsync(string serviceName)
        {
            var workerIdSortedSetCacheKey = string.Format(SharedCachingConsts.WorkerIdSortedSetCacheKey, serviceName);

            //如果已经存在,则不需要重复生成。
            if (!_redisProvider.KeyExists(workerIdSortedSetCacheKey))
            {
                var flag = await _distributedLocker.LockAsync(workerIdSortedSetCacheKey);
                if (!flag.Success)
                {
                    await Task.Delay(300);
                    await InitWorkerNodesAsync(serviceName);
                }
                long count = 0;
                try
                {
                    var set = new Dictionary<long, double>();
                    for (long index = 0; index <= YitterSnowFlake.MaxWorkerId; index++)
                    {
                        //index = workerid,score = 当前时间戳
                        set.Add(index, DateTime.Now.GetTotalMilliseconds());
                    }
                    //保存到zset中。
                    count = await _redisProvider.ZAddAsync(workerIdSortedSetCacheKey, set);
                }
                catch(Exception ex)
                {
                    throw new Exception(ex.Message, ex);
                }
                finally
                {
                    await _distributedLocker.SafedUnLockAsync(workerIdSortedSetCacheKey,flag.LockValue);
                }
            }
        }

        //获取workerid
        internal async Task<long> GetWorkerIdAsync(string serviceName)
        {
            var workerIdSortedSetCacheKey = string.Format(SharedCachingConsts.WorkerIdSortedSetCacheKey, serviceName);

            //通过lua脚本获取score最小的workerid,并且刷新score位当前时间戳。
            var scirpt = @"local workerids = redis.call('ZRANGE', @key, @start,@stop)
                                    redis.call('ZADD',@key,@score,workerids[1])
                                    return workerids[1]";

            var parameters = new { key = workerIdSortedSetCacheKey, start = 0, stop = 0, score = DateTime.Now.GetTotalMilliseconds() };
            var luaResult = (byte[]) await _redisProvider.ScriptEvaluateAsync(scirpt, parameters);
            var workerId = _redisProvider.Serializer.Deserialize<long>(luaResult);

            return workerId;
        }

        //定时刷新当前实例workerid的score值。
        internal async Task RefreshWorkerIdScoreAsync(string serviceName, long workerId, double? workerIdScore = null)
        {
            var workerIdSortedSetCacheKey = string.Format(SharedCachingConsts.WorkerIdSortedSetCacheKey, serviceName);

            var score = workerIdScore == null ? DateTime.Now.GetTotalMilliseconds() : workerIdScore.Value;
            //更新score
            await _redisProvider.ZAddAsync(workerIdSortedSetCacheKey, new Dictionary<long, double> { { workerId, score } });
        }
    }
}

如何调用Yitter

using Adnc.Infra.IdGenerater.Yitter;

namespace Adnc.XXX.Application.Services
{
    public class xxxAppService : AbstractAppService, IxxxAppService
    {
        var id = IdGenerater.GetNextId();
    }
}

WELL DONE
全文完