From 34a148c83de62258322228b43e63b59f0b2f1801 Mon Sep 17 00:00:00 2001 From: Mmx Date: Fri, 1 Nov 2024 20:58:53 +0800 Subject: [PATCH] feat(local): thumbnail token bucket smooth migration (#7425) * feat(local): allow to migrate static token buckets * improve(local): token bucket migration boundary handling --- drivers/local/driver.go | 2 +- drivers/local/token_bucket.go | 38 +++++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/drivers/local/driver.go b/drivers/local/driver.go index bf993e5d5f8..86980943ef5 100644 --- a/drivers/local/driver.go +++ b/drivers/local/driver.go @@ -76,7 +76,7 @@ func (d *Local) Init(ctx context.Context) error { if d.thumbConcurrency == 0 { d.thumbTokenBucket = NewNopTokenBucket() } else { - d.thumbTokenBucket = NewStaticTokenBucket(d.thumbConcurrency) + d.thumbTokenBucket = NewStaticTokenBucketWithMigration(d.thumbTokenBucket, d.thumbConcurrency) } return nil } diff --git a/drivers/local/token_bucket.go b/drivers/local/token_bucket.go index 38fbe73fc9b..23c6ebd63b7 100644 --- a/drivers/local/token_bucket.go +++ b/drivers/local/token_bucket.go @@ -23,6 +23,38 @@ func NewStaticTokenBucket(size int) StaticTokenBucket { return StaticTokenBucket{bucket: bucket} } +func NewStaticTokenBucketWithMigration(oldBucket TokenBucket, size int) StaticTokenBucket { + if oldBucket != nil { + oldStaticBucket, ok := oldBucket.(StaticTokenBucket) + if ok { + oldSize := cap(oldStaticBucket.bucket) + migrateSize := oldSize + if size < migrateSize { + migrateSize = size + } + + bucket := make(chan struct{}, size) + for range size - migrateSize { + bucket <- struct{}{} + } + + if migrateSize != 0 { + go func() { + for range migrateSize { + <-oldStaticBucket.bucket + bucket <- struct{}{} + } + close(oldStaticBucket.bucket) + }() + } + return StaticTokenBucket{bucket: bucket} + } + } + return NewStaticTokenBucket(size) +} + +// Take channel maybe closed when local driver is modified. +// don't call Put method after the channel is closed. func (b StaticTokenBucket) Take() <-chan struct{} { return b.bucket } @@ -35,8 +67,10 @@ func (b StaticTokenBucket) Do(ctx context.Context, f func() error) error { select { case <-ctx.Done(): return ctx.Err() - case <-b.bucket: - defer b.Put() + case _, ok := <-b.Take(): + if ok { + defer b.Put() + } } return f() }