From c749a89ae2180bcbf6a04936ebb90cb58de9afd3 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sun, 27 May 2018 20:46:27 -0700 Subject: [PATCH] internal/pool, baseClient: instrument with OpenCensus --- cluster.go | 64 +++++++- command.go | 34 ++++- internal/observability/helpers.go | 15 ++ internal/observability/observability.go | 130 ++++++++++++++++ internal/pool/conn.go | 2 + internal/pool/pool.go | 7 + observability.go | 5 + options.go | 4 + pubsub.go | 7 +- redis.go | 195 +++++++++++++++++++++--- ring.go | 2 +- 11 files changed, 434 insertions(+), 31 deletions(-) create mode 100644 internal/observability/helpers.go create mode 100644 internal/observability/observability.go create mode 100644 observability.go diff --git a/cluster.go b/cluster.go index 6f0855eb7..530ff1c18 100644 --- a/cluster.go +++ b/cluster.go @@ -18,6 +18,8 @@ import ( "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/singleflight" + + "go.opencensus.io/trace" ) var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") @@ -61,6 +63,9 @@ type ClusterOptions struct { IdleCheckFrequency time.Duration TLSConfig *tls.Config + + // The initial context + Context context.Context } func (opt *ClusterOptions) init() { @@ -642,6 +647,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { c := &ClusterClient{ opt: opt, nodes: newClusterNodes(opt), + ctx: opt.Context, } c.state = newClusterStateHolder(c.loadState) c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo) @@ -864,14 +870,31 @@ func (c *ClusterClient) Process(cmd Cmder) error { } func (c *ClusterClient) defaultProcess(cmd Cmder) error { + _, span := trace.StartSpan(c.Context(), "redis.(*ClusterClient)."+cmd.Name()) + defer span.End() + // TODO: (@odeke-em) record stats to tally the number of respective invocations e.g: + // * "LPOP" --> increment the count of LPOP invocations. + // * "HGET" --> increment the count of HGET invocations. + var node *clusterNode var ask bool for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("attempt", int64(attempt)), + trace.Int64Attribute("write_timeout_ns", c.opt.WriteTimeout.Nanoseconds()), + }, "Getting connection") + if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + td := c.retryBackoff(attempt) + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("attempt", int64(attempt)), + trace.StringAttribute("sleep_duration", td.String()), + }, "Sleeping for exponential backoff") + time.Sleep(td) } if node == nil { + span.Annotatef(nil, "Creating new node") var err error _, node, err = c.cmdSlotAndNode(cmd) if err != nil { @@ -882,6 +905,7 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { var err error if ask { + span.Annotatef(nil, "Invoking command: ASKING") pipe := node.Client.Pipeline() _ = pipe.Process(NewCmd("ASKING")) _ = pipe.Process(cmd) @@ -899,11 +923,20 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { // If slave is loading - read from master. if c.opt.ReadOnly && internal.IsLoadingError(err) { + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("attempt", int64(attempt)), + trace.StringAttribute("err", err.Error()), + }, "Node is still loading") node.MarkAsLoading() continue } if internal.IsRetryableError(err, true) { + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("attempt", int64(attempt)), + trace.StringAttribute("err", err.Error()), + }, "Retryable error so retrying") + // Firstly retry the same node. if attempt == 0 { continue @@ -912,8 +945,15 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { // Secondly try random node. node, err = c.nodes.Random() if err != nil { + span.Annotatef(nil, "Failed to pick a random node") + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) break } + + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("attempt", int64(attempt)), + trace.StringAttribute("err", err.Error()), + }, "Retryable error so retrying") continue } @@ -921,10 +961,18 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { var addr string moved, ask, addr = internal.IsMovedError(err) if moved || ask { + span.Annotatef(nil, "Performing lazy reload") c.state.LazyReload() + span.Annotatef(nil, "Finished lazy reload") node, err = c.nodes.GetOrCreate(addr) if err != nil { + span.Annotatef([]trace.Attribute{ + trace.StringAttribute("addr", addr), + trace.Int64Attribute("attempt", int64(attempt)), + trace.StringAttribute("err", err.Error()), + }, "Failed to create or get a node") + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) break } continue @@ -938,7 +986,11 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error { break } - return cmd.Err() + eErr := cmd.Err() + if eErr != nil { + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: eErr.Error()}) + } + return eErr } // ForEachMaster concurrently calls the fn on each master node in the cluster. @@ -1172,7 +1224,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.getConn() + cn, _, err := node.Client.getConn(c.Context()) if err != nil { if err == pool.ErrClosed { c.remapCmds(cmds, failedCmds) @@ -1235,7 +1287,7 @@ func (c *ClusterClient) pipelineProcessCmds( ) error { _ = cn.SetWriteTimeout(c.opt.WriteTimeout) - err := writeCmd(cn, cmds...) + err := writeCmd(c.Context(), cn, cmds...) if err != nil { setCmdsErr(cmds, err) failedCmds[node] = cmds @@ -1336,7 +1388,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) for node, cmds := range cmdsMap { - cn, _, err := node.Client.getConn() + cn, _, err := node.Client.getConn(c.Context()) if err != nil { if err == pool.ErrClosed { c.remapCmds(cmds, failedCmds) @@ -1377,7 +1429,7 @@ func (c *ClusterClient) txPipelineProcessCmds( node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder, ) error { cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { + if err := txPipelineWriteMulti(c.Context(), cn, cmds); err != nil { setCmdsErr(cmds, err) failedCmds[node] = cmds return err diff --git a/command.go b/command.go index 552c897bb..aff92df61 100644 --- a/command.go +++ b/command.go @@ -2,15 +2,20 @@ package redis import ( "bytes" + "context" "fmt" "strconv" "strings" "time" "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/observability" "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" "github.com/go-redis/redis/internal/util" + + "go.opencensus.io/stats" + "go.opencensus.io/trace" ) type Cmder interface { @@ -44,15 +49,40 @@ func firstCmdsErr(cmds []Cmder) error { return nil } -func writeCmd(cn *pool.Conn, cmds ...Cmder) error { +func writeCmd(ctx context.Context, cn *pool.Conn, cmds ...Cmder) error { + // 1. Start the span: It is imperative that we always start the span first before + // recording stats to ensure that the original context contains the parent span. + ctx, span := trace.StartSpan(ctx, "redis.writeCmd") + // 2. Record stats + ctx, _ = observability.TagKeyValuesIntoContext(ctx, observability.KeyCommandName, namesFromCommands(cmds)...) cn.Wb.Reset() + stats.Record(ctx, observability.MWrites.M(1)) + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("n_args", int64(len(cmds))), + }, "Write buffer argument appending") + for _, cmd := range cmds { if err := cn.Wb.Append(cmd.Args()); err != nil { + span.End() + stats.Record(ctx, observability.MWriteErrors.M(1)) return err } } - _, err := cn.Write(cn.Wb.Bytes()) + out := cn.Wb.Bytes() + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("len", int64(len(out))), + }, "Created bytes") + nWrote, err := cn.Write(out) + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("len", int64(nWrote)), + }, "Wrote out bytes") + if err != nil { + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) + stats.Record(ctx, observability.MWriteErrors.M(1)) + } + span.End() + stats.Record(ctx, observability.MWrites.M(1), observability.MBytesWritten.M(int64(nWrote))) return err } diff --git a/internal/observability/helpers.go b/internal/observability/helpers.go new file mode 100644 index 000000000..7bfe90c29 --- /dev/null +++ b/internal/observability/helpers.go @@ -0,0 +1,15 @@ +package observability + +import ( + "context" + + "go.opencensus.io/tag" +) + +func TagKeyValuesIntoContext(ctx context.Context, key tag.Key, values ...string) (context.Context, error) { + insertions := make([]tag.Mutator, len(values)) + for i, value := range values { + insertions[i] = tag.Insert(key, value) + } + return tag.New(ctx, insertions...) +} diff --git a/internal/observability/observability.go b/internal/observability/observability.go new file mode 100644 index 000000000..6e7f664e0 --- /dev/null +++ b/internal/observability/observability.go @@ -0,0 +1,130 @@ +package observability + +import ( + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +// Pool metrics: +// 1. Connections taken +// 2. Connections closed +// 3. Connections usetime -- how long is a connection used until it is closed, discarded or returned +// 4. Connections reused +// 4. Connections stale +// 5. Dial errors + +const dimensionless = "1" +const unitSeconds = "s" + +var ( + MBytesRead = stats.Int64("redis/bytes_read", "The number of bytes read from the server", stats.UnitBytes) + MBytesWritten = stats.Int64("redis/bytes_written", "The number of bytes written out to the server", stats.UnitBytes) + MDialErrors = stats.Int64("redis/dial_errors", "The number of dial errors", dimensionless) + MConnectionsTaken = stats.Int64("redis/connections_taken", "The number of connections taken", dimensionless) + MConnectionsClosed = stats.Int64("redis/connections_closed", "The number of connections closed", dimensionless) + MConnectionsReturned = stats.Int64("redis/connections_returned", "The number of connections returned to the pool", dimensionless) + MConnectionsReused = stats.Int64("redis/connections_reused", "The number of connections reused", dimensionless) + MConnectionsNew = stats.Int64("redis/connections_new", "The number of newly created connections", dimensionless) + MConnectionUseTime = stats.Float64("redis/connection_usetime", "The number of seconds for which a connection is used", unitSeconds) + MRoundtripLatency = stats.Float64("redis/roundtrip_latency", "The time between sending the first byte to the server until the last byte of response is received back", unitSeconds) + MWriteErrors = stats.Int64("redis/write_errors", "The number of errors encountered during write routines", dimensionless) + MWrites = stats.Int64("redis/writes", "The number of write invocations", dimensionless) +) + +var KeyCommandName, _ = tag.NewKey("cmd") + +var defaultSecondsDistribution = view.Distribution( + // [0ms, 0.01ms, 0.05ms, 0.1ms, 0.5ms, 1ms, 1.5ms, 2ms, 2.5ms, 5ms, 10ms, 25ms, 50ms, 100ms, 200ms, 400ms, 600ms, 800ms, 1s, 1.5s, 2.5s, 5s, 10s, 20s, 40s, 100s, 200s, 500s] + 0, 0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.0015, 0.002, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1, 1.5, 2.5, 5, 10, 20, 40, 100, 200, 500, +) + +var defaultBytesDistribution = view.Distribution( + // [0, 1KB, 2KB, 4KB, 16KB, 64KB, 256KB, 1MB, 4MB, 16MB, 64MB, 256MB, 1GB, 4GB] + 0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296, +) + +var Views = []*view.View{ + { + Name: "redis/client/connection_usetime", + Description: "The duration in seconds for which a connection is used before being returned to the pool, closed or discarded", + + Aggregation: defaultSecondsDistribution, + Measure: MConnectionUseTime, + }, + { + Name: "redis/client/dial_errors", + Description: "The number of errors encountered after dialling", + Aggregation: view.Count(), + Measure: MDialErrors, + }, + { + Name: "redis/client/bytes_written_cummulative", + Description: "The number of bytes written out to the server", + Aggregation: view.Count(), + Measure: MBytesWritten, + }, + { + Name: "redis/client/bytes_written_distribution", + Description: "The number of distribution of bytes written out to the server", + Aggregation: defaultBytesDistribution, + Measure: MBytesWritten, + }, + { + Name: "redis/client/bytes_read_cummulative", + Description: "The number of bytes read from a response from the server", + Aggregation: view.Count(), + Measure: MBytesRead, + }, + { + Name: "redis/client/bytes_read_distribution", + Description: "The number of distribution of bytes read from the server", + Aggregation: defaultBytesDistribution, + Measure: MBytesRead, + }, + { + Name: "redis/client/roundtrip_latency", + Description: "The distribution of seconds of the roundtrip latencies", + Aggregation: defaultSecondsDistribution, + Measure: MRoundtripLatency, + TagKeys: []tag.Key{KeyCommandName}, + }, + { + Name: "redis/client/write_errors", + Description: "The number of errors encountered during a write routine", + Aggregation: view.Count(), + Measure: MWriteErrors, + TagKeys: []tag.Key{KeyCommandName}, + }, + { + Name: "redis/client/writes", + Description: "The number of write invocations", + Aggregation: view.Count(), + Measure: MWrites, + TagKeys: []tag.Key{KeyCommandName}, + }, + { + Name: "redis/client/connections_taken", + Description: "The number of connections taken out the pool", + Aggregation: view.Count(), + Measure: MConnectionsTaken, + }, + { + Name: "redis/client/connections_returned", + Description: "The number of connections returned the connection pool", + Aggregation: view.Count(), + Measure: MConnectionsReturned, + }, + { + Name: "redis/client/connections_reused", + Description: "The number of connections reused", + Aggregation: view.Count(), + Measure: MConnectionsReused, + }, + { + Name: "redis/client/connections_new", + Description: "The number of newly created connections", + Aggregation: view.Count(), + Measure: MConnectionsNew, + }, +} diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 8af51d9de..28f6fb610 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -66,6 +66,7 @@ func (cn *Conn) SetWriteTimeout(timeout time.Duration) error { } func (cn *Conn) Write(b []byte) (int, error) { + // TODO: Stats: (@odeke-em) Record written bytes return cn.netConn.Write(b) } @@ -74,5 +75,6 @@ func (cn *Conn) RemoteAddr() net.Addr { } func (cn *Conn) Close() error { + // TODO: Stats: (@odeke-em) Record Conn closes return cn.netConn.Close() } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index ae81905ea..68645b35e 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -133,6 +133,7 @@ func (p *ConnPool) tryDial() { continue } + // TODO: Stats: (@odeke-em) Record successful dial atomic.StoreUint32(&p.dialErrorsNum, 0) _ = conn.Close() return @@ -173,6 +174,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) { case <-timer.C: timers.Put(timer) atomic.AddUint32(&p.stats.Timeouts, 1) + // TODO: Stats: (@odeke-em) Record Pool timeouts return nil, false, ErrPoolTimeout } } @@ -187,10 +189,12 @@ func (p *ConnPool) Get() (*Conn, bool, error) { } if cn.IsStale(p.opt.IdleTimeout) { + // TODO: Stats: (@odeke-em) Record stale connection p.CloseConn(cn) continue } + // TODO: Stats: (@odeke-em) Record reused connection atomic.AddUint32(&p.stats.Hits, 1) return cn, false, nil } @@ -203,6 +207,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) { return nil, false, err } + // TODO: Stats: (@odeke-em) Record taken connection return newcn, true, nil } @@ -226,6 +231,7 @@ func (p *ConnPool) Put(cn *Conn) error { p.freeConns = append(p.freeConns, cn) p.freeConnsMu.Unlock() <-p.queue + // TODO: Stats: (@odeke-em) Record Conn Put return nil } @@ -252,6 +258,7 @@ func (p *ConnPool) closeConn(cn *Conn) error { if p.opt.OnClose != nil { _ = p.opt.OnClose(cn) } + // TODO: Stats: (@odeke-em) Record Conn Close return cn.Close() } diff --git a/observability.go b/observability.go new file mode 100644 index 000000000..54b439c38 --- /dev/null +++ b/observability.go @@ -0,0 +1,5 @@ +package redis + +import "github.com/go-redis/redis/internal/observability" + +var ObservabilityMetricViews = observability.Views diff --git a/options.go b/options.go index 75648053d..4146ee268 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,7 @@ package redis import ( + "context" "crypto/tls" "errors" "fmt" @@ -77,6 +78,9 @@ type Options struct { // TLS Config to use. When set TLS will be negotiated. TLSConfig *tls.Config + + // The initial context + Context context.Context } func (opt *Options) init() { diff --git a/pubsub.go b/pubsub.go index b56728f3e..e36c14566 100644 --- a/pubsub.go +++ b/pubsub.go @@ -1,6 +1,7 @@ package redis import ( + "context" "fmt" "net" "sync" @@ -100,7 +101,8 @@ func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) cmd := NewSliceCmd(args...) cn.SetWriteTimeout(c.opt.WriteTimeout) - return writeCmd(cn, cmd) + // TODO: (@odeke-em) use the propagated context + return writeCmd(context.Background(), cn, cmd) } func (c *PubSub) releaseConn(cn *pool.Conn, err error) { @@ -217,7 +219,8 @@ func (c *PubSub) Ping(payload ...string) error { } cn.SetWriteTimeout(c.opt.WriteTimeout) - err = writeCmd(cn, cmd) + // TODO: (@odeke-em) use the propagated context + err = writeCmd(context.Background(), cn, cmd) c.releaseConn(cn, err) return err } diff --git a/redis.go b/redis.go index 7a606b70e..e8ec18460 100644 --- a/redis.go +++ b/redis.go @@ -8,8 +8,12 @@ import ( "time" "github.com/go-redis/redis/internal" + "github.com/go-redis/redis/internal/observability" "github.com/go-redis/redis/internal/pool" "github.com/go-redis/redis/internal/proto" + + "go.opencensus.io/stats" + "go.opencensus.io/trace" ) // Nil reply Redis returns when key does not exist. @@ -32,6 +36,19 @@ type baseClient struct { processTxPipeline func([]Cmder) error onClose func() error // hook called when client is closed + + ctxFunc func() context.Context // Optional function invoked +} + +func (c *baseClient) context() context.Context { + var ctx context.Context + if c.ctxFunc != nil { + ctx = c.ctxFunc() + } + if ctx == nil { + ctx = context.Background() + } + return ctx } func (c *baseClient) init() { @@ -60,29 +77,41 @@ func (c *baseClient) newConn() (*pool.Conn, error) { return cn, nil } -func (c *baseClient) getConn() (*pool.Conn, bool, error) { +func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, bool, error) { + _, span := trace.StartSpan(ctx, "redis.(*baseClient).getConn") + defer span.End() + cn, isNew, err := c.connPool.Get() if err != nil { + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) return nil, false, err } if !cn.Inited { + span.Annotatef(nil, "Initializing connection") if err := c.initConn(cn); err != nil { + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) _ = c.connPool.Remove(cn) return nil, false, err } } + if isNew { + stats.Record(ctx, observability.MConnectionsNew.M(1), observability.MConnectionsTaken.M(1)) + } else { + stats.Record(ctx, observability.MConnectionsReused.M(1), observability.MConnectionsTaken.M(1)) + } return cn, isNew, nil } -func (c *baseClient) releaseConn(cn *pool.Conn, err error) bool { +func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) bool { if internal.IsBadConn(err, false) { _ = c.connPool.Remove(cn) return false } _ = c.connPool.Put(cn) + stats.Record(ctx, observability.MConnectionsReturned.M(1)) return true } @@ -132,41 +161,99 @@ func (c *baseClient) Process(cmd Cmder) error { } func (c *baseClient) defaultProcess(cmd Cmder) error { + ctx, span := trace.StartSpan(c.context(), "redis.(*baseClient)."+cmd.Name()) + ctx, _ = observability.TagKeyValuesIntoContext(ctx, observability.KeyCommandName, cmd.Name()) + + startTime := time.Now() + defer func() { + span.End() + // Finally record the roundtrip latency. + stats.Record(ctx, observability.MRoundtripLatency.M(time.Since(startTime).Seconds())) + }() + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("attempt", int64(attempt)), + trace.Int64Attribute("write_timeout_ns", c.opt.WriteTimeout.Nanoseconds()), + }, "Getting connection") + if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + td := c.retryBackoff(attempt) + span.Annotatef([]trace.Attribute{ + trace.StringAttribute("sleep_duration", td.String()), + trace.Int64Attribute("attempt", int64(attempt)), + }, "Sleeping for exponential backoff") + time.Sleep(td) } - cn, _, err := c.getConn() + cn, _, err := c.getConn(ctx) if err != nil { cmd.setErr(err) if internal.IsRetryableError(err, true) { + span.Annotatef([]trace.Attribute{ + trace.StringAttribute("err", err.Error()), + trace.Int64Attribute("attempt", int64(attempt)), + }, "Retryable error so retrying") continue } + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) return err } cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := writeCmd(cn, cmd); err != nil { - c.releaseConn(cn, err) - cmd.setErr(err) - if internal.IsRetryableError(err, true) { + _, wSpan := trace.StartSpan(ctx, "redis.writeCmd") + wErr := writeCmd(ctx, cn, cmd) + wSpan.End() + if wErr != nil { + releaseStart := time.Now() + span.Annotatef(nil, "Releasing connection") + reusedConn := c.releaseConn(ctx, cn, wErr) + status := "removed conn" + if reusedConn { + status = "reused conn" + } + span.Annotatef([]trace.Attribute{ + trace.StringAttribute("time_spent", time.Since(releaseStart).String()), + trace.StringAttribute("status", status), + }, "Released connection") + cmd.setErr(wErr) + if internal.IsRetryableError(wErr, true) { continue } + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) return err } + _, rSpan := trace.StartSpan(ctx, "redis.(Cmder).readReply") cn.SetReadTimeout(c.cmdTimeout(cmd)) err = cmd.readReply(cn) - c.releaseConn(cn, err) - if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { - continue + rSpan.End() + span.Annotatef(nil, "Releasing connection") + reusedConn := c.releaseConn(ctx, cn, err) + status := "removed conn" + if reusedConn { + status = "reused conn" + } + span.Annotatef([]trace.Attribute{ + trace.StringAttribute("status", status), + }, "Released connection") + // TODO: (@odeke-em) multiplex on the errors retrieved + if err != nil { + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) + if internal.IsRetryableError(err, cmd.readTimeout() == nil) { + continue + } } return err } - return cmd.Err() + eErr := cmd.Err() + if eErr != nil { + // TODO: (@odeke-em) tally/categorize these errors and increment them + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: eErr.Error()}) + } + return eErr } func (c *baseClient) retryBackoff(attempt int) time.Duration { @@ -220,13 +307,26 @@ func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error { type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error { + ctx, span := trace.StartSpan(c.context(), "redis.(*baseClient).generalProcessPipeline") + defer span.End() + for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { + span.Annotatef([]trace.Attribute{ + trace.Int64Attribute("attempt", int64(attempt)), + }, "Getting connection") + if attempt > 0 { - time.Sleep(c.retryBackoff(attempt)) + td := c.retryBackoff(attempt) + span.Annotatef([]trace.Attribute{ + trace.StringAttribute("sleep_duration", td.String()), + trace.Int64Attribute("attempt", int64(attempt)), + }, "Sleeping for exponential backoff") + time.Sleep(td) } - cn, _, err := c.getConn() + cn, _, err := c.getConn(ctx) if err != nil { + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) setCmdsErr(cmds, err) return err } @@ -234,28 +334,81 @@ func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) e canRetry, err := p(cn, cmds) if err == nil || internal.IsRedisError(err) { + span.Annotatef(nil, "Putting connection back into the pool") _ = c.connPool.Put(cn) break } + span.Annotatef(nil, "Removing connection from the pool") _ = c.connPool.Remove(cn) if !canRetry || !internal.IsRetryableError(err, true) { + span.Annotatef([]trace.Attribute{ + trace.StringAttribute("err", err.Error()), + trace.Int64Attribute("attempt", int64(attempt)), + }, "Not retrying hence exiting") break } + + // Otherwise, retrying with the next attempt. + span.Annotatef([]trace.Attribute{ + trace.StringAttribute("err", err.Error()), + trace.Int64Attribute("attempt", int64(attempt)), + }, "Retryable error so retrying") } - return firstCmdsErr(cmds) + eErr := firstCmdsErr(cmds) + if eErr != nil { + span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: eErr.Error()}) + } + return eErr +} + +func attributeNamesFromCommands(cmds []Cmder) []trace.Attribute { + if len(cmds) == 0 { + return nil + } + names := namesFromCommands(cmds) + attributes := make([]trace.Attribute, len(names)) + for i, name := range names { + attributes[i] = trace.StringAttribute(fmt.Sprintf("%d", i), name) + } + return attributes +} + +func namesFromCommands(cmds []Cmder) []string { + names := make([]string, len(cmds)) + for i, cmd := range cmds { + names[i] = cmd.Name() + } + return names } func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { + ctx, span := trace.StartSpan(c.context(), "redis.(*baseClient).pipelineProcessCmds") + defer span.End() + attributes := append(attributeNamesFromCommands(cmds), + trace.StringAttribute("read_timeout", c.opt.ReadTimeout.String()), + trace.StringAttribute("write_timeout", c.opt.WriteTimeout.String()), + ) + span.Annotatef(attributes, "With commands") + cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := writeCmd(cn, cmds...); err != nil { + _, wSpan := trace.StartSpan(ctx, "redis.writeCmd") + defer wSpan.End() + if err := writeCmd(ctx, cn, cmds...); err != nil { + wSpan.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) setCmdsErr(cmds, err) return true, err } // Set read timeout for all commands. cn.SetReadTimeout(c.opt.ReadTimeout) - return true, pipelineReadCmds(cn, cmds) + _, rSpan := trace.StartSpan(ctx, "redis.pipelineReadCmds") + err := pipelineReadCmds(cn, cmds) + if err != nil { + rSpan.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()}) + } + rSpan.End() + return true, err } func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { @@ -270,7 +423,7 @@ func pipelineReadCmds(cn *pool.Conn, cmds []Cmder) error { func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { cn.SetWriteTimeout(c.opt.WriteTimeout) - if err := txPipelineWriteMulti(cn, cmds); err != nil { + if err := txPipelineWriteMulti(c.context(), cn, cmds); err != nil { setCmdsErr(cmds, err) return true, err } @@ -286,12 +439,12 @@ func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, e return false, pipelineReadCmds(cn, cmds) } -func txPipelineWriteMulti(cn *pool.Conn, cmds []Cmder) error { +func txPipelineWriteMulti(ctx context.Context, cn *pool.Conn, cmds []Cmder) error { multiExec := make([]Cmder, 0, len(cmds)+2) multiExec = append(multiExec, NewStatusCmd("MULTI")) multiExec = append(multiExec, cmds...) multiExec = append(multiExec, NewSliceCmd("EXEC")) - return writeCmd(cn, multiExec...) + return writeCmd(ctx, cn, multiExec...) } func (c *baseClient) txPipelineReadQueued(cn *pool.Conn, cmds []Cmder) error { @@ -351,7 +504,9 @@ func NewClient(opt *Options) *Client { opt: opt, connPool: newConnPool(opt), }, + ctx: opt.Context, } + c.baseClient.ctxFunc = c.Context c.baseClient.init() c.init() diff --git a/ring.go b/ring.go index 362bd0319..acf1bd05a 100644 --- a/ring.go +++ b/ring.go @@ -525,7 +525,7 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error { continue } - cn, _, err := shard.Client.getConn() + cn, _, err := shard.Client.getConn(c.Context()) if err != nil { setCmdsErr(cmds, err) continue