Table of Contents generated with DocToc
-
很多应用场景是需要系统保证幂等性的(如api服务或消息消费者),并发情况下或消息重复很容易造成系统重入,那么分布式锁是保障幂等的一个重要手段。
-
另一方面,很多抢单场景或者叫交易撮合场景,如dd司机抢单或唯一商品抢拍等都需要用一把“全局锁”来解决并发造成的问题。在防止并发情况下造成库存超卖的场景,也常用分布式锁来解决。
redis锁、zookeeper锁,etcd锁
各节点通过set key value nx ex即可,如果set执行成功,则表明加锁成功,否则失败,其中value为随机串,用来判断是否是当前应用实例加的锁;nx用来判断该key是否存在以实现排他特性,ex用来指定锁的过期时间,避免死锁。
向redis服务发送并执行一段lua脚本,脚本如下,也很好理解,如果是自己加的锁,那么安全释放,否则什么也不做。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
如果redis采用了主备的部署方式,存在一种场景,master上set成功后宕机,而set的key没有来得及同步到slave的话,会存在不一致的场景,可以通过redis持久化和fsync=always的方式来保持一致,但是有性能损耗。
设集群有N个redis节点,那么,redlock算法约定,任意应用实例在半数以上(N/2 + 1)的redis节点上执行set成功,就认为当前应用实例成功持有锁.
这里面有几个问题需要考虑:网络延迟、超时处理、节点宕机、新增节点
-
网络延迟 由于set时指定了ex参数,官方称为TTL,所以锁本身就是有生命周期的。而应用实例又需要与多个redis实例通信,网络io的耗时不能无视,官方给出的建议值是,如果ex参数设置为10s,那么请求单个实例的超时时间应在5-50ms以内,换算下来,就是5‰ - 0.5‰
-
超时处理 由于TTL中包含了网络传输耗时、各及节点的耗时差异,所以加锁成功后,应用实例有效的持有锁时长 = TTL - (最晚执行set成功的response时间 - 最早执行set成功的response时间) - Clock drift,讲真,这里clock drift我没理解,网上讲这是时钟频率的差异?或者可能是部署在不同时区时,服务之间的时区差值。
-
节点宕机 当一个应用实例持有锁时,如果一个持有key的redis实例宕机了,且没有配置主备同步策略,那么锁状态依然可能会出现不一致情形。官方有两个解决方案:一个是像单redis实例一样,对每个实例配置主备同步持久化,并采用fsync=always策略进行主从同步,这会带来性能损耗。另一个不依赖持久化策略,令宕机redis实例延迟启动,延迟启动的作用,就是使宕机节点已经持有的key超时掉,迫使这个节点变为一个未持有key的节点,但这引入一个风险,就是当大多数redis节点同时宕机时,会使分布式锁不可用。
-
新增节点 官方文档没有提及,但是这里有坑,我的理解是,用于实现分布式锁的redis集群,需要显式的配置节点地址,如果采用动态的redis服务发现策略,那么追加节点可能会导致锁状态的不一致。
结构体
// A Mutex is a distributed mutual exclusion lock.
type Mutex struct {
name string // 锁在redis上的key
expiry time.Duration // 超时时间
tries int // 重试次数
delayFunc DelayFunc // 延时函数,用于在每两次重试之间的休眠期,避免大量请求拥塞
factor float64 // 时钟偏移因子
quorum int // 成功获取锁需要set成功的最少redis节点数,N/2+1
genValueFunc func() (string, error) // 用于生成随机value的方法
value string // 锁在热地上的value值
until time.Time // 持有锁的deadline时间
pools []redis.Pool // redis连接池
}
方法
func (m *Mutex) Lock() error //
func (m *Mutex) Unlock() (bool, error)
func (m *Mutex) LockContext(ctx context.Context) error
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error)
func (m *Mutex) Extend() (bool, error)
func (m *Mutex) ExtendContext(ctx context.Context) (bool, error)
func (m *Mutex) Valid() (bool, error)
func (m *Mutex) ValidContext(ctx context.Context) (bool, error)
带有context的可以通过应用层控制获取或释放锁的过程。Extend簇函数用来重置key的超时时间,Valid用来验证当前节点是否持有锁。
redsync与redis集群通信时,采用了并发访问方式,并发过程在actOnPoolsAsync函数中,其参数传入的是与单个节点通信的实现函数地址
func (m *Mutex) actOnPoolsAsync(actFn func(redis.Pool) (bool, error)) (int, error) {
type result struct {
Status bool
Err error
}
// 创建用于收集所有redis节点返回值的chan
ch := make(chan result)
for _, pool := range m.pools {
// 并发请求所有redis节点,结果写入chan
go func(pool redis.Pool) {
r := result{}
r.Status, r.Err = actFn(pool)
ch <- r
}(pool)
}
// 校验所有redis节点的返回值,并返回成功节点数量
n := 0
var err error
// 特殊语法糖-省略
for range m.pools {
r := <-ch
if r.Status {
n++
} else if r.Err != nil {
err = multierror.Append(err, r.Err)
}
}
return n, err
}
func (m *Mutex) LockContext(ctx context.Context) error {
// 生成随机value
value, err := m.genValueFunc()
if err != nil {
return err
}
// 循环重试
for i := 0; i < m.tries; i++ {
if i != 0 {
time.Sleep(m.delayFunc(i))
}
start := time.Now()
// 并发在所有redis节点上获取锁
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.acquire(ctx, pool, value)
})
if n == 0 && err != nil {
return err
}
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.factor)))
// 如果成功在半数以上节点set成功,并且在锁的有效时间内,则说明加锁成功
if n >= m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
// 加锁失败,清除所有set成功的节点上的key
_, _ = m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, value)
})
}
return ErrFailed
}
func (m *Mutex) UnlockContext(ctx context.Context) (bool, error) {
// 并发执行delete lua脚本
n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, m.value)
})
// 执行成功的节点数小于约定的加锁成功节点数,则说明有节点删除失败了,那么释放锁就会失败
if n < m.quorum {
return false, err
}
return true, nil
}
需要注意的是,在分布式锁场景中,无论获取还是释放锁,与操作系统的锁相比,执行失败会是常态,所以一定要检查Lock、Unlock的返回值。
在actOnPoolsAsync方法中,在处理所有redis节点的返回时,引用了multierror库,这个库自定义了Error结构,用于保存多个error,当你的处理过程中在多个位置可能会返回不同error信息,但是返回值又只有一个error时,可以通过multierror.Append方法将这些error合成一个返回。内部创建了一个[]error来保存这些error,保留了层层弹栈返回时,各层的错误信息。代码很少但却很实用
ZooKeeper 也是一个典型的分布式元数据存储服务,它的分布式锁实现基于 ZooKeeper 的临时节点和顺序特性。
临时节点具备数据自动删除的功能。当 client 与 ZooKeeper 连接和 session 断掉时,相应的临时节点就会被删除。
ZooKeeper 也提供了 Watch 特性可监听 key 的数据变化
使用 Zookeeper 加锁的伪代码如下
Lock
1 n = create(l + “/lock-”, EPHEMERAL|SEQUENTIAL)
2 C = getChildren(l, false)
3 if n is lowest znode in C, exit
4 p = znode in C ordered just before n
5 if exists(p, true) wait for watch event
6 goto 2
Unlock
1 delete(n)
相比 Redis 基于主备异步复制导致锁的安全性问题,etcd 是基于 Raft 共识算法实现的,一个写请求需要经过集群多数节点确认。 因此一旦分布式锁申请返回给 client 成功后,它一定是持久化到了集群多数节点上,不会出现 Redis 主备异步复制可能导致丢数据的问题,具备更高的安全性。
-
Lease 机制:即租约机制(TTL,Time To Live),Etcd 可以为存储的 Key-Value 对设置租约,当租约到期,Key-Value 将失效删除;同时也支持续约续期(KeepAlive)
-
Revision 机制:每个 key 带有一个 Revision 属性值,Etcd 每进行一次事务对应的全局 Revision 值都会加一,因此每个 Key 对应的 Revision 属性值都是全局唯一的。通过比较 Revision 的大小就可以知道进行写操作的顺序。在实现分布式锁时,多个程序同时抢锁,根据 Revision 值大小依次获得锁,可以避免惊群效应,实现公平锁
-
Prefix 机制:即前缀机制(或目录机制)。可以根据前缀(目录)获取该目录下所有的 Key 及对应的属性(包括 Key、Value 以及 Revision 等)
-
Watch 机制:即监听机制,Watch 机制支持 Watch 某个固定的 Key,也支持 Watch 一个目录前缀(前缀机制),当被 Watch 的 Key 或目录发生变化,客户端将收到通知
etcd 社区提供了一个名为 concurrency 包帮助你更简单、正确地使用分布式锁、分布式选举。
-
首先通过 concurrency.NewSession 方法创建 Session,本质是创建了一个 TTL 为 10 的 Lease。
-
其次得到 session 对象后,通过 concurrency.NewMutex 创建了一个 mutex 对象,包含 Lease、key prefix 等信息。
-
然后通过 mutex 对象的 Lock 方法尝试获取锁。
-
最后使用结束,可通过 mutex 对象的 Unlock 方法释放锁。
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli, concurrency.WithTTL(10))
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")
// acquire lock for s1
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")
if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
在调用NewSession方法时候实际上是初始化了一个用户指定行为的租约(行为可以是指定ttl,或者复用其他的lease等),并异步进行keepalive。
const defaultSessionTTL = 60 //session 的默认 TTL 是 60s
// Session represents a lease kept alive for the lifetime of a client.
// Fault-tolerant applications may use sessions to reason about liveness.
type Session struct {
client *v3.Client // 包含一个 clientv3 客户端
opts *sessionOptions
id v3.LeaseID //lease 租约
//s.Lease() 是一个 64 位的整数值,Etcd v3 引入了 lease(租约)的概念
//concurrency 包基于 lease 封装了 session,每一个客户端都有自己的 lease,也就是说每个客户端都有一个唯一的 64 位整形值
cancel context.CancelFunc //context
donec <-chan struct{} //
}
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
for _, opt := range opts {
opt(ops)
}
// 没有则生成租约Id
id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = resp.ID
}
// 异步进行keepalive
ctx, cancel := context.WithCancel(ops.ctx)
keepAlive, err := client.KeepAlive(ctx, id)
if err != nil || keepAlive == nil {
cancel()
return nil, err
}
donec := make(chan struct{})
s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
// keep the lease alive until client error or cancelled context
go func() {
defer close(donec)
for range keepAlive {
// eat messages until keep alive channel closes
}
}()
return s, nil
}
type Mutex struct {
s *Session //保存的租约相关的信息
pfx string //锁的名称,key的前缀
myKey string //锁完整的key,当前持有锁的客户端的 leaseid 值(完整 Key 的组成为 pfx+"/"+leaseid)
myRev int64 //revision,理解为当前持有锁的 Revision(修改数) 编号 或者是 CreateRevision
hdr *pb.ResponseHeader
}
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}
NewMutex实际上创建了一个锁的数据结构,该结构可以保存一些锁的信息,入参的“mutex-prefix”只是一个key的前缀,还有后续要创建的完整key,revision等信息。
上锁Lock
func (m *Mutex) Lock(ctx context.Context) error {
// 尝试获取锁
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
//ownerKey 获取当前实际拿到锁的KEY
ownerKey := resp.Responses[1].GetResponseRange().Kvs
//如果ownerKey的长度为0或者持有者的Revision与自己的Revision相同,说明自己持有锁,可以直接返回,并对共享资源进行操作
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
// 走到这里代表没有获得锁,需要等待之前的锁被释放,即 CreateRevision 小于当前 CreateRevision 的 kv 被删除
// 阻塞等待 Owner 释放锁
client := m.s.Client()
_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
//确保session没有过期
gresp, werr := client.Get(ctx, m.myKey)
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
if len(gresp.Kvs) == 0 {
return ErrSessionExpired
}
m.hdr = gresp.Header
return nil
}
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()
//完整key是前缀名称加租约ID,由于不同进程生成的不同租约,所以锁互不相同
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
//cmp通过比较createRevision是否为0判断当前的key是不是第一次创建
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
//put会把key绑定上租约并存储
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
//get会获取当前key的值
get := v3.OpGet(m.myKey)
//getOwner获取当前锁的真正持有者,是通过前缀来范围查找,WithFirstCreate()筛选出当前存在的最小revision对应的值
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
// Txn 事务,判断 cmp 的条件是否成立,成立执行 Then,不成立执行 Else,最终执行 Commit()
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
//将该事务的revision赋值到锁的myRev字段
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}
options
// WithFirstCreate gets the key with the oldest creation revision in the request range.
func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }
// withTop gets the first key over the get's prefix given a sort order
func withTop(target SortTarget, order SortOrder) []OpOption {
return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
}
// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// can return 'foo1', 'foo2', and so on.
func WithPrefix() OpOption {
// 返回所有满足 prefix 匹配的 key-value,和 etcdctl get key --prefix 功能一样
return func(op *Op) {
if len(op.key) == 0 {
op.key, op.end = []byte{0}, []byte{0}
return
}
op.end = getPrefix(op.key)
}
}
在获取锁的时候,通过事务操作来尝试加锁。
如果当前的key是第一次创建,则将key绑定租约并存储,否则获取当前的key详细信息。getOwner通过前缀来进行查找最小revision对应的值, 目的是获取当前锁的持有者(如果最小Revision的key释放锁,则该key会被删除,所以最小Revision的key就是当前锁的持有者)。
!resp.Succeeded代表key不是第一次创建,则之前执行的是get操作,获取该key创建时候的revision并赋值到锁的myRev字段。
waitDeletes 模拟了一种公平的先来后到的排队逻辑,等待所有当前比当前 key 的 revision 小的 key 被删除后,锁释放后才返回。
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var wr v3.WatchResponse
//通过Revsion来watch key,也就是前一个锁
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
//监听Delete事件
if ev.Type == mvccpb.DELETE {
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return fmt.Errorf("lost watcher waiting for delete")
}
TryLock:TryLock比Lock,多调用了一个waitDeletes 函数,这个函数模拟了一种公平的先来后到的排队逻辑, 等待所有当前比当前 key 的 revision 小的 key 被删除后,锁释放后才返回。
UnLock:解锁操作会直接删除对应的kv,这会触发下一个锁的获取。
func (m *Mutex) Unlock(ctx context.Context) error {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return nil
}