Skip to content

Commit

Permalink
query: add experiment Select() cache
Browse files Browse the repository at this point in the history
Add experiment support for caching select() requests. Only local tests
for now. Cache works on matching identical request signatures. Data is
cached only on io.EOF error; if something else occurs then the response
is not cached.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Dec 14, 2022
1 parent acf3c68 commit 462632b
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 2 deletions.
7 changes: 6 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
Expand Down Expand Up @@ -79,6 +80,8 @@ func registerQuery(app *extkingpin.App) {
httpBindAddr, httpGracePeriod, httpTLSConfig := extkingpin.RegisterHTTPFlags(cmd)
grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA, grpcMaxConnAge := extkingpin.RegisterGRPCFlags(cmd)

selectCacheSize := cmd.Flag("grpc-select-cache-size", "How many bytes should be allocated for storing Select() results in memory for each StoreAPI").Default("0").Bytes()

secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool()
skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool()
cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String()
Expand Down Expand Up @@ -319,6 +322,7 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
promqlEngineType(*promqlEngine),
uint64(*selectCacheSize),
)
})
}
Expand Down Expand Up @@ -395,6 +399,7 @@ func runQuery(
queryTelemetrySamplesQuantiles []int64,
queryTelemetrySeriesQuantiles []int64,
promqlEngine promqlEngineType,
selectCacheSize uint64,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand All @@ -409,7 +414,7 @@ func runQuery(
Help: "The number of times a duplicated store addresses is detected from the different configs in query",
})

dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName)
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName, model.Bytes(selectCacheSize))
if err != nil {
return errors.Wrap(err, "building gRPC client")
}
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func runReceive(
conf.rwClientKey,
conf.rwClientServerCA,
conf.rwClientServerName,
0,
)
if err != nil {
return err
Expand Down
10 changes: 9 additions & 1 deletion pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/thanos-io/thanos/pkg/extgrpc/grpccache"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
)

// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client.
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string, maxCacheSize model.Bytes) ([]grpc.DialOption, error) {
grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}),
)

cacher, err := grpccache.NewSeriesRequestcachingInterceptor(reg, maxCacheSize, 128*1024*1024)
if err != nil {
return nil, err
}
dialOpts := []grpc.DialOption{
// We want to make sure that we can receive huge gRPC messages from storeAPI.
// On TCP level we can be fine, but the gRPC overhead for huge messages could be significant.
Expand All @@ -42,6 +49,7 @@ func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer ope
grpc_middleware.ChainStreamClient(
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
cacher.GetInterceptor(),
),
),
}
Expand Down
202 changes: 202 additions & 0 deletions pkg/extgrpc/grpccache/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package grpccache

import (
"context"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/cespare/xxhash/v2"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/cache"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
)

func getResponses(c *cache.InMemoryCache, reqSig string) ([]*storepb.SeriesResponse, error) {
numKeysKey := fmt.Sprintf("%s-num", reqSig)
numKeysFetchRes := c.Fetch(context.Background(), []string{numKeysKey})

var numKeys int
if numKeysBytes, ok := numKeysFetchRes[numKeysKey]; !ok {
return nil, fmt.Errorf("not found num key")
} else {
if n, err := fmt.Sscanf(string(numKeysBytes), "%d", &numKeys); n != 1 || err != nil {
return nil, fmt.Errorf("parsing %s", string(numKeysBytes))
}
}

var fetchKeys = make([]string, 0, numKeys)
for i := 0; i < numKeys; i++ {
fetchKeys = append(fetchKeys, fmt.Sprintf("%s-%d", reqSig, i))
}
fetchedData := c.Fetch(context.Background(), fetchKeys)

if len(fetchedData) != numKeys {
return nil, fmt.Errorf("got wrong number of keys: expected %d, got %d", numKeys, len(fetchedData))
}
ret := make([]*storepb.SeriesResponse, 0, numKeys)

for i := 0; i < numKeys; i++ {
var r storepb.SeriesResponse
data := fetchedData[fmt.Sprintf("%s-%d", reqSig, i)]

if err := r.Unmarshal(data); err != nil {
return nil, fmt.Errorf("unmarshaling data from cache: %w", err)
}

ret = append(ret, &r)
}

return ret, nil
}

func putResponses(c *cache.InMemoryCache, reqSig string, responses []*storepb.SeriesResponse) []byte {
keys := map[string][]byte{
fmt.Sprintf("%s-num", reqSig): []byte(fmt.Sprintf("%d", len(responses))),
}

for i, resp := range responses {
m, _ := resp.Marshal()
keys[fmt.Sprintf("%s-%d", reqSig, i)] = m
}

c.Store(context.Background(), keys, 5*time.Minute)
return nil
}

func splitMethodName(fullMethod string) (string, string) {
fullMethod = strings.TrimPrefix(fullMethod, "/") // remove leading slash
if i := strings.Index(fullMethod, "/"); i >= 0 {
return fullMethod[:i], fullMethod[i+1:]
}
return "unknown", "unknown"
}

type seriesInterceptor struct {
grpc.ClientStream
target string
c *cache.InMemoryCache

hashedReq string

responses []*storepb.SeriesResponse
cachedResponsesAvailable bool

cachedCalls prometheus.Counter
}

func hashReqTarget(r *storepb.SeriesRequest, target string) string {
h := xxhash.New()
_, _ = h.WriteString(target)
m, _ := r.Marshal()
_, _ = h.Write(m)

return string(h.Sum(nil))
}

func (i *seriesInterceptor) RecvMsg(m interface{}) error {
if i.hashedReq == "" {
return i.ClientStream.RecvMsg(m)
}
if i.cachedResponsesAvailable {
if len(i.responses) == 0 {
return io.EOF
}
resp, ok := m.(*storepb.SeriesResponse)
if !ok {
panic("should be a series response type")
}
*resp = *i.responses[0]
i.responses = i.responses[1:]
return nil
}

if err := i.ClientStream.RecvMsg(m); err != nil {
if err == io.EOF && len(i.responses) > 0 {
putResponses(i.c, i.hashedReq, i.responses)
}
return err
}
if resp, ok := m.(*storepb.SeriesResponse); ok {
i.responses = append(i.responses, resp)
}
return nil
}

func (i *seriesInterceptor) SendMsg(m interface{}) error {
if req, ok := m.(*storepb.SeriesRequest); ok {
i.hashedReq = hashReqTarget(req, i.target)

responses, err := getResponses(i.c, i.hashedReq)
if err == nil {
i.responses = responses
i.cachedResponsesAvailable = true
i.cachedCalls.Inc()
}

}
return i.ClientStream.SendMsg(m)
}

type SeriesRequestCachingInterceptor struct {
inmemoryCache *cache.InMemoryCache
passthrough bool

cachedCalls prometheus.Counter
}

func NewSeriesRequestcachingInterceptor(reg *prometheus.Registry, maxSize model.Bytes, maxItemSize model.Bytes) (*SeriesRequestCachingInterceptor, error) {
if maxSize == 0 {
maxItemSize = 0
}
imc, err := cache.NewInMemoryCacheWithConfig(
"cachinginterceptor",
log.NewLogfmtLogger(os.Stderr),
reg,
cache.InMemoryCacheConfig{
MaxSize: maxSize,
MaxItemSize: maxItemSize,
},
)
if err != nil {
return nil, err
}
return &SeriesRequestCachingInterceptor{
inmemoryCache: imc,
passthrough: maxSize == 0,
cachedCalls: promauto.With(prometheus.Registerer(reg)).NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_cached_calls_total",
Help: "How many Series() calls were fully cached",
}),
}, nil
}

func (i *SeriesRequestCachingInterceptor) GetInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
cs, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return cs, err
}

if i.passthrough {
return cs, nil
}

if svc, actualMethod := splitMethodName(method); svc == "thanos.Store" && actualMethod == "Series" {
return &seriesInterceptor{
ClientStream: cs,
target: cc.Target(),
c: i.inmemoryCache,
cachedCalls: i.cachedCalls,
}, nil
}

return cs, nil
}
}

0 comments on commit 462632b

Please sign in to comment.