Skip to content

Commit

Permalink
internal/pool, baseClient: instrument with OpenCensus
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed May 28, 2018
1 parent 0f9028a commit c749a89
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 31 deletions.
64 changes: 58 additions & 6 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
"github.com/go-redis/redis/internal/singleflight"

"go.opencensus.io/trace"
)

var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
Expand Down Expand Up @@ -61,6 +63,9 @@ type ClusterOptions struct {
IdleCheckFrequency time.Duration

TLSConfig *tls.Config

// The initial context
Context context.Context
}

func (opt *ClusterOptions) init() {
Expand Down Expand Up @@ -642,6 +647,7 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
c := &ClusterClient{
opt: opt,
nodes: newClusterNodes(opt),
ctx: opt.Context,
}
c.state = newClusterStateHolder(c.loadState)
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
Expand Down Expand Up @@ -864,14 +870,31 @@ func (c *ClusterClient) Process(cmd Cmder) error {
}

func (c *ClusterClient) defaultProcess(cmd Cmder) error {
_, span := trace.StartSpan(c.Context(), "redis.(*ClusterClient)."+cmd.Name())
defer span.End()
// TODO: (@odeke-em) record stats to tally the number of respective invocations e.g:
// * "LPOP" --> increment the count of LPOP invocations.
// * "HGET" --> increment the count of HGET invocations.

var node *clusterNode
var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
span.Annotatef([]trace.Attribute{
trace.Int64Attribute("attempt", int64(attempt)),
trace.Int64Attribute("write_timeout_ns", c.opt.WriteTimeout.Nanoseconds()),
}, "Getting connection")

if attempt > 0 {
time.Sleep(c.retryBackoff(attempt))
td := c.retryBackoff(attempt)
span.Annotatef([]trace.Attribute{
trace.Int64Attribute("attempt", int64(attempt)),
trace.StringAttribute("sleep_duration", td.String()),
}, "Sleeping for exponential backoff")
time.Sleep(td)
}

if node == nil {
span.Annotatef(nil, "Creating new node")
var err error
_, node, err = c.cmdSlotAndNode(cmd)
if err != nil {
Expand All @@ -882,6 +905,7 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {

var err error
if ask {
span.Annotatef(nil, "Invoking command: ASKING")
pipe := node.Client.Pipeline()
_ = pipe.Process(NewCmd("ASKING"))
_ = pipe.Process(cmd)
Expand All @@ -899,11 +923,20 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {

// If slave is loading - read from master.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
span.Annotatef([]trace.Attribute{
trace.Int64Attribute("attempt", int64(attempt)),
trace.StringAttribute("err", err.Error()),
}, "Node is still loading")
node.MarkAsLoading()
continue
}

if internal.IsRetryableError(err, true) {
span.Annotatef([]trace.Attribute{
trace.Int64Attribute("attempt", int64(attempt)),
trace.StringAttribute("err", err.Error()),
}, "Retryable error so retrying")

// Firstly retry the same node.
if attempt == 0 {
continue
Expand All @@ -912,19 +945,34 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
// Secondly try random node.
node, err = c.nodes.Random()
if err != nil {
span.Annotatef(nil, "Failed to pick a random node")
span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()})
break
}

span.Annotatef([]trace.Attribute{
trace.Int64Attribute("attempt", int64(attempt)),
trace.StringAttribute("err", err.Error()),
}, "Retryable error so retrying")
continue
}

var moved bool
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
span.Annotatef(nil, "Performing lazy reload")
c.state.LazyReload()
span.Annotatef(nil, "Finished lazy reload")

node, err = c.nodes.GetOrCreate(addr)
if err != nil {
span.Annotatef([]trace.Attribute{
trace.StringAttribute("addr", addr),
trace.Int64Attribute("attempt", int64(attempt)),
trace.StringAttribute("err", err.Error()),
}, "Failed to create or get a node")
span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()})
break
}
continue
Expand All @@ -938,7 +986,11 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
break
}

return cmd.Err()
eErr := cmd.Err()
if eErr != nil {
span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: eErr.Error()})
}
return eErr
}

// ForEachMaster concurrently calls the fn on each master node in the cluster.
Expand Down Expand Up @@ -1172,7 +1224,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder)

for node, cmds := range cmdsMap {
cn, _, err := node.Client.getConn()
cn, _, err := node.Client.getConn(c.Context())
if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
Expand Down Expand Up @@ -1235,7 +1287,7 @@ func (c *ClusterClient) pipelineProcessCmds(
) error {
_ = cn.SetWriteTimeout(c.opt.WriteTimeout)

err := writeCmd(cn, cmds...)
err := writeCmd(c.Context(), cn, cmds...)
if err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
Expand Down Expand Up @@ -1336,7 +1388,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder)

for node, cmds := range cmdsMap {
cn, _, err := node.Client.getConn()
cn, _, err := node.Client.getConn(c.Context())
if err != nil {
if err == pool.ErrClosed {
c.remapCmds(cmds, failedCmds)
Expand Down Expand Up @@ -1377,7 +1429,7 @@ func (c *ClusterClient) txPipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := txPipelineWriteMulti(cn, cmds); err != nil {
if err := txPipelineWriteMulti(c.Context(), cn, cmds); err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
return err
Expand Down
34 changes: 32 additions & 2 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package redis

import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/go-redis/redis/internal"
"github.com/go-redis/redis/internal/observability"
"github.com/go-redis/redis/internal/pool"
"github.com/go-redis/redis/internal/proto"
"github.com/go-redis/redis/internal/util"

"go.opencensus.io/stats"
"go.opencensus.io/trace"
)

type Cmder interface {
Expand Down Expand Up @@ -44,15 +49,40 @@ func firstCmdsErr(cmds []Cmder) error {
return nil
}

func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
func writeCmd(ctx context.Context, cn *pool.Conn, cmds ...Cmder) error {
// 1. Start the span: It is imperative that we always start the span first before
// recording stats to ensure that the original context contains the parent span.
ctx, span := trace.StartSpan(ctx, "redis.writeCmd")
// 2. Record stats
ctx, _ = observability.TagKeyValuesIntoContext(ctx, observability.KeyCommandName, namesFromCommands(cmds)...)
cn.Wb.Reset()
stats.Record(ctx, observability.MWrites.M(1))
span.Annotatef([]trace.Attribute{
trace.Int64Attribute("n_args", int64(len(cmds))),
}, "Write buffer argument appending")

for _, cmd := range cmds {
if err := cn.Wb.Append(cmd.Args()); err != nil {
span.End()
stats.Record(ctx, observability.MWriteErrors.M(1))
return err
}
}

_, err := cn.Write(cn.Wb.Bytes())
out := cn.Wb.Bytes()
span.Annotatef([]trace.Attribute{
trace.Int64Attribute("len", int64(len(out))),
}, "Created bytes")
nWrote, err := cn.Write(out)
span.Annotatef([]trace.Attribute{
trace.Int64Attribute("len", int64(nWrote)),
}, "Wrote out bytes")
if err != nil {
span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()})
stats.Record(ctx, observability.MWriteErrors.M(1))
}
span.End()
stats.Record(ctx, observability.MWrites.M(1), observability.MBytesWritten.M(int64(nWrote)))
return err
}

Expand Down
15 changes: 15 additions & 0 deletions internal/observability/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package observability

import (
"context"

"go.opencensus.io/tag"
)

func TagKeyValuesIntoContext(ctx context.Context, key tag.Key, values ...string) (context.Context, error) {
insertions := make([]tag.Mutator, len(values))
for i, value := range values {
insertions[i] = tag.Insert(key, value)
}
return tag.New(ctx, insertions...)
}
130 changes: 130 additions & 0 deletions internal/observability/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package observability

import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

// Pool metrics:
// 1. Connections taken
// 2. Connections closed
// 3. Connections usetime -- how long is a connection used until it is closed, discarded or returned
// 4. Connections reused
// 4. Connections stale
// 5. Dial errors

const dimensionless = "1"
const unitSeconds = "s"

var (
MBytesRead = stats.Int64("redis/bytes_read", "The number of bytes read from the server", stats.UnitBytes)
MBytesWritten = stats.Int64("redis/bytes_written", "The number of bytes written out to the server", stats.UnitBytes)
MDialErrors = stats.Int64("redis/dial_errors", "The number of dial errors", dimensionless)
MConnectionsTaken = stats.Int64("redis/connections_taken", "The number of connections taken", dimensionless)
MConnectionsClosed = stats.Int64("redis/connections_closed", "The number of connections closed", dimensionless)
MConnectionsReturned = stats.Int64("redis/connections_returned", "The number of connections returned to the pool", dimensionless)
MConnectionsReused = stats.Int64("redis/connections_reused", "The number of connections reused", dimensionless)
MConnectionsNew = stats.Int64("redis/connections_new", "The number of newly created connections", dimensionless)
MConnectionUseTime = stats.Float64("redis/connection_usetime", "The number of seconds for which a connection is used", unitSeconds)
MRoundtripLatency = stats.Float64("redis/roundtrip_latency", "The time between sending the first byte to the server until the last byte of response is received back", unitSeconds)
MWriteErrors = stats.Int64("redis/write_errors", "The number of errors encountered during write routines", dimensionless)
MWrites = stats.Int64("redis/writes", "The number of write invocations", dimensionless)
)

var KeyCommandName, _ = tag.NewKey("cmd")

var defaultSecondsDistribution = view.Distribution(
// [0ms, 0.01ms, 0.05ms, 0.1ms, 0.5ms, 1ms, 1.5ms, 2ms, 2.5ms, 5ms, 10ms, 25ms, 50ms, 100ms, 200ms, 400ms, 600ms, 800ms, 1s, 1.5s, 2.5s, 5s, 10s, 20s, 40s, 100s, 200s, 500s]
0, 0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.0015, 0.002, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1, 1.5, 2.5, 5, 10, 20, 40, 100, 200, 500,
)

var defaultBytesDistribution = view.Distribution(
// [0, 1KB, 2KB, 4KB, 16KB, 64KB, 256KB, 1MB, 4MB, 16MB, 64MB, 256MB, 1GB, 4GB]
0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296,
)

var Views = []*view.View{
{
Name: "redis/client/connection_usetime",
Description: "The duration in seconds for which a connection is used before being returned to the pool, closed or discarded",

Aggregation: defaultSecondsDistribution,
Measure: MConnectionUseTime,
},
{
Name: "redis/client/dial_errors",
Description: "The number of errors encountered after dialling",
Aggregation: view.Count(),
Measure: MDialErrors,
},
{
Name: "redis/client/bytes_written_cummulative",
Description: "The number of bytes written out to the server",
Aggregation: view.Count(),
Measure: MBytesWritten,
},
{
Name: "redis/client/bytes_written_distribution",
Description: "The number of distribution of bytes written out to the server",
Aggregation: defaultBytesDistribution,
Measure: MBytesWritten,
},
{
Name: "redis/client/bytes_read_cummulative",
Description: "The number of bytes read from a response from the server",
Aggregation: view.Count(),
Measure: MBytesRead,
},
{
Name: "redis/client/bytes_read_distribution",
Description: "The number of distribution of bytes read from the server",
Aggregation: defaultBytesDistribution,
Measure: MBytesRead,
},
{
Name: "redis/client/roundtrip_latency",
Description: "The distribution of seconds of the roundtrip latencies",
Aggregation: defaultSecondsDistribution,
Measure: MRoundtripLatency,
TagKeys: []tag.Key{KeyCommandName},
},
{
Name: "redis/client/write_errors",
Description: "The number of errors encountered during a write routine",
Aggregation: view.Count(),
Measure: MWriteErrors,
TagKeys: []tag.Key{KeyCommandName},
},
{
Name: "redis/client/writes",
Description: "The number of write invocations",
Aggregation: view.Count(),
Measure: MWrites,
TagKeys: []tag.Key{KeyCommandName},
},
{
Name: "redis/client/connections_taken",
Description: "The number of connections taken out the pool",
Aggregation: view.Count(),
Measure: MConnectionsTaken,
},
{
Name: "redis/client/connections_returned",
Description: "The number of connections returned the connection pool",
Aggregation: view.Count(),
Measure: MConnectionsReturned,
},
{
Name: "redis/client/connections_reused",
Description: "The number of connections reused",
Aggregation: view.Count(),
Measure: MConnectionsReused,
},
{
Name: "redis/client/connections_new",
Description: "The number of newly created connections",
Aggregation: view.Count(),
Measure: MConnectionsNew,
},
}
2 changes: 2 additions & 0 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (cn *Conn) SetWriteTimeout(timeout time.Duration) error {
}

func (cn *Conn) Write(b []byte) (int, error) {
// TODO: Stats: (@odeke-em) Record written bytes
return cn.netConn.Write(b)
}

Expand All @@ -74,5 +75,6 @@ func (cn *Conn) RemoteAddr() net.Addr {
}

func (cn *Conn) Close() error {
// TODO: Stats: (@odeke-em) Record Conn closes
return cn.netConn.Close()
}
Loading

0 comments on commit c749a89

Please sign in to comment.