From 087036a8fa594caa71c5a3262f7c199af737c07b Mon Sep 17 00:00:00 2001 From: EquentR <62492107+EquentR@users.noreply.github.com> Date: Mon, 25 Nov 2024 13:48:23 +0800 Subject: [PATCH] feat: redis-writer add buffered sending to significantly improve speed (#886) --- internal/client/redis.go | 58 ++++++++++++++++++++++ internal/writer/redis_standalone_writer.go | 11 +++- shake.toml | 13 ++--- 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/internal/client/redis.go b/internal/client/redis.go index ee0f9acb..15546e38 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -8,6 +8,8 @@ import ( "regexp" "strconv" "strings" + "sync" + "sync/atomic" "time" "RedisShake/internal/client/proto" @@ -20,6 +22,9 @@ type Redis struct { writer *bufio.Writer protoReader *proto.Reader protoWriter *proto.Writer + timer *time.Timer + sendCount uint64 + mu sync.Mutex } func NewSentinelMasterClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis { @@ -81,6 +86,9 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo r = NewRedisClient(ctx, replicaInfo.BestReplica, username, password, Tls, false) } + r.timer = time.NewTimer(time.Second) + go r.autoFlush(ctx) + return r } @@ -185,11 +193,54 @@ func (r *Redis) SendBytes(buf []byte) { r.flush() } +func (r *Redis) SendBytesBuff(buf []byte) { + r.mu.Lock() + defer r.mu.Unlock() + _, err := r.writer.Write(buf) + if err != nil { + log.Panicf(err.Error()) + } + r.flushBuff() +} + +func (r *Redis) flushBuff() { + if atomic.AddUint64(&r.sendCount, 1)%100 != 0 { + return + } + if !r.timer.Stop() { + select { + case <-r.timer.C: + default: + } + } + r.timer.Reset(time.Second) + r.flush() +} + func (r *Redis) flush() { err := r.writer.Flush() if err != nil { log.Panicf(err.Error()) } + atomic.StoreUint64(&r.sendCount, 0) +} + +func (r *Redis) autoFlush(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-r.timer.C: + if atomic.LoadUint64(&r.sendCount) > 0 { + r.mu.Lock() + err := r.writer.Flush() + r.mu.Unlock() + if err != nil { + log.Panicf(err.Error()) + } + } + } + } } func (r *Redis) Receive() (interface{}, error) { @@ -217,6 +268,13 @@ func (r *Redis) Close() { if err := r.conn.Close(); err != nil { log.Infof("close redis conn err: %s\n", err.Error()) } + // release the timer + if !r.timer.Stop() { + select { + case <-r.timer.C: + default: + } + } } /* Commands */ diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index ee9b4470..e0cb1b0e 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -26,6 +26,7 @@ type RedisWriterOptions struct { Password string `mapstructure:"password" default:""` Tls bool `mapstructure:"tls" default:"false"` OffReply bool `mapstructure:"off_reply" default:"false"` + BuffSend bool `mapstructure:"buff_send" default:"false"` } type redisStandaloneWriter struct { @@ -39,6 +40,8 @@ type redisStandaloneWriter struct { ch chan *entry.Entry chWg sync.WaitGroup + buffSend bool + stat struct { Name string `json:"name"` UnansweredBytes int64 `json:"unanswered_bytes"` @@ -52,6 +55,7 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1) rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false) rw.ch = make(chan *entry.Entry, 1024) + rw.buffSend = opts.BuffSend if opts.OffReply { log.Infof("turn off the reply of write") rw.offReply = true @@ -93,7 +97,12 @@ func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entr atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize) atomic.AddInt64(&w.stat.UnansweredEntries, 1) } - w.client.SendBytes(bytes) + if w.buffSend { + w.client.SendBytesBuff(bytes) + } else { + w.client.SendBytes(bytes) + } + } w.chWg.Done() }() diff --git a/shake.toml b/shake.toml index 4c4b146d..12df92bc 100644 --- a/shake.toml +++ b/shake.toml @@ -36,6 +36,7 @@ username = "" # keep empty if not using ACL password = "" # keep empty if no authentication is required tls = false off_reply = false # turn off the server reply +buff_send = false # buffer send, default false. may be a sync delay when true, but it can greatly improve the speed [filter] # Allow keys with specific prefixes or suffixes @@ -70,8 +71,8 @@ block_db = [] # allow_command = ["GET", "SET"] # Only allow GET and SET commands # block_command = ["DEL", "FLUSHDB"] # Block DEL and FLUSHDB commands # Leave empty to allow all commands -allow_command = [] -block_command = [] +allow_command = [] +block_command = [] # Allow or block specific command groups # Available groups: @@ -82,8 +83,8 @@ block_command = [] # allow_command_group = ["STRING", "HASH"] # Only allow STRING and HASH commands # block_command_group = ["SCRIPTING", "PUBSUB"] # Block SCRIPTING and PUBSUB commands # Leave empty to allow all command groups -allow_command_group = [] -block_command_group = [] +allow_command_group = [] +block_command_group = [] # Function for custom data processing # For best practices and examples, visit: @@ -118,7 +119,7 @@ rdb_restore_command_behavior = "panic" # panic, rewrite or skip pipeline_count_limit = 1024 # This setting corresponds to the 'client-query-buffer-limit' in Redis configuration. -# The default value is typically 1GB. +# The default value is typically 1GB. # It's recommended not to modify this value unless absolutely necessary. target_redis_client_max_querybuf_len = 1073741824 # 1GB in bytes @@ -128,7 +129,7 @@ target_redis_client_max_querybuf_len = 1073741824 # 1GB in bytes # It's recommended not to modify this value unless absolutely necessary. target_redis_proto_max_bulk_len = 512_000_000 -# If the source is Elasticache, you can set this item. AWS ElastiCache has custom +# If the source is Elasticache, you can set this item. AWS ElastiCache has custom # psync command, which can be obtained through a ticket. aws_psync = "" # example: aws_psync = "10.0.0.1:6379@nmfu2sl5osync,10.0.0.1:6379@xhma21xfkssync"