From 6f223224285666b1e2e2702fdbdd85001bb996b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 14 Dec 2022 17:27:41 +0200 Subject: [PATCH] query: add experimental Select() cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add experiment support for caching select() requests. Only local tests for now. Cache works on matching identical request signatures. Data is cached only on io.EOF error; if something else occurs then the response is not cached. Signed-off-by: Giedrius Statkevičius --- cmd/thanos/query.go | 7 +- cmd/thanos/receive.go | 1 + pkg/extgrpc/client.go | 10 +- pkg/extgrpc/grpccache/interceptor.go | 202 +++++++++++++++++++++++++++ 4 files changed, 218 insertions(+), 2 deletions(-) create mode 100644 pkg/extgrpc/grpccache/interceptor.go diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4862440af1..9b2543a07d 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -45,6 +45,7 @@ import ( "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/logging" "github.com/thanos-io/thanos/pkg/metadata" + "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/rules" @@ -79,6 +80,8 @@ func registerQuery(app *extkingpin.App) { httpBindAddr, httpGracePeriod, httpTLSConfig := extkingpin.RegisterHTTPFlags(cmd) grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA, grpcMaxConnAge := extkingpin.RegisterGRPCFlags(cmd) + selectCacheSize := cmd.Flag("grpc-select-cache-size", "How many bytes should be allocated for storing Select() results in memory for each StoreAPI").Default("0").Bytes() + secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool() cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String() @@ -319,6 +322,7 @@ func registerQuery(app *extkingpin.App) { *queryTelemetrySamplesQuantiles, *queryTelemetrySeriesQuantiles, promqlEngineType(*promqlEngine), + uint64(*selectCacheSize), ) }) } @@ -395,6 +399,7 @@ func runQuery( queryTelemetrySamplesQuantiles []int64, queryTelemetrySeriesQuantiles []int64, promqlEngine promqlEngineType, + selectCacheSize uint64, ) error { if alertQueryURL == "" { lastColon := strings.LastIndex(httpBindAddr, ":") @@ -409,7 +414,7 @@ func runQuery( Help: "The number of times a duplicated store addresses is detected from the different configs in query", }) - dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName) + dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName, model.Bytes(selectCacheSize)) if err != nil { return errors.Wrap(err, "building gRPC client") } diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index d86b560983..ab2eb62181 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -139,6 +139,7 @@ func runReceive( conf.rwClientKey, conf.rwClientServerCA, conf.rwClientServerName, + 0, ) if err != nil { return err diff --git a/pkg/extgrpc/client.go b/pkg/extgrpc/client.go index 7db0c8e570..fe35b2a6a8 100644 --- a/pkg/extgrpc/client.go +++ b/pkg/extgrpc/client.go @@ -16,16 +16,23 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "github.com/thanos-io/thanos/pkg/extgrpc/grpccache" + "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/tracing" ) // StoreClientGRPCOpts creates gRPC dial options for connecting to a store client. -func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) { +func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string, maxCacheSize model.Bytes) ([]grpc.DialOption, error) { grpcMets := grpc_prometheus.NewClientMetrics() grpcMets.EnableClientHandlingTimeHistogram( grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}), ) + + cacher, err := grpccache.NewSeriesRequestcachingInterceptor(reg, maxCacheSize, 128*1024*1024) + if err != nil { + return nil, err + } dialOpts := []grpc.DialOption{ // We want to make sure that we can receive huge gRPC messages from storeAPI. // On TCP level we can be fine, but the gRPC overhead for huge messages could be significant. @@ -42,6 +49,7 @@ func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer ope grpc_middleware.ChainStreamClient( grpcMets.StreamClientInterceptor(), tracing.StreamClientInterceptor(tracer), + cacher.GetInterceptor(), ), ), } diff --git a/pkg/extgrpc/grpccache/interceptor.go b/pkg/extgrpc/grpccache/interceptor.go new file mode 100644 index 0000000000..4aa9e75672 --- /dev/null +++ b/pkg/extgrpc/grpccache/interceptor.go @@ -0,0 +1,202 @@ +package grpccache + +import ( + "context" + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/cache" + "github.com/thanos-io/thanos/pkg/model" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc" +) + +func getResponses(c *cache.InMemoryCache, reqSig string) ([]*storepb.SeriesResponse, error) { + numKeysKey := fmt.Sprintf("%s-num", reqSig) + numKeysFetchRes := c.Fetch(context.Background(), []string{numKeysKey}) + + var numKeys int + if numKeysBytes, ok := numKeysFetchRes[numKeysKey]; !ok { + return nil, fmt.Errorf("not found num key") + } else { + if n, err := fmt.Sscanf(string(numKeysBytes), "%d", &numKeys); n != 1 || err != nil { + return nil, fmt.Errorf("parsing %s", string(numKeysBytes)) + } + } + + var fetchKeys = make([]string, 0, numKeys) + for i := 0; i < numKeys; i++ { + fetchKeys = append(fetchKeys, fmt.Sprintf("%s-%d", reqSig, i)) + } + fetchedData := c.Fetch(context.Background(), fetchKeys) + + if len(fetchedData) != numKeys { + return nil, fmt.Errorf("got wrong number of keys: expected %d, got %d", numKeys, len(fetchedData)) + } + ret := make([]*storepb.SeriesResponse, 0, numKeys) + + for i := 0; i < numKeys; i++ { + var r storepb.SeriesResponse + data := fetchedData[fmt.Sprintf("%s-%d", reqSig, i)] + + if err := r.Unmarshal(data); err != nil { + return nil, fmt.Errorf("unmarshaling data from cache: %w", err) + } + + ret = append(ret, &r) + } + + return ret, nil +} + +func putResponses(c *cache.InMemoryCache, reqSig string, responses []*storepb.SeriesResponse) []byte { + keys := map[string][]byte{ + fmt.Sprintf("%s-num", reqSig): []byte(fmt.Sprintf("%d", len(responses))), + } + + for i, resp := range responses { + m, _ := resp.Marshal() + keys[fmt.Sprintf("%s-%d", reqSig, i)] = m + } + + c.Store(context.Background(), keys, 5*time.Minute) + return nil +} + +func splitMethodName(fullMethod string) (string, string) { + fullMethod = strings.TrimPrefix(fullMethod, "/") // remove leading slash + if i := strings.Index(fullMethod, "/"); i >= 0 { + return fullMethod[:i], fullMethod[i+1:] + } + return "unknown", "unknown" +} + +type seriesInterceptor struct { + grpc.ClientStream + target string + c *cache.InMemoryCache + + hashedReq string + + responses []*storepb.SeriesResponse + cachedResponsesAvailable bool + + cachedCalls prometheus.Counter +} + +func hashReqTarget(r *storepb.SeriesRequest, target string) string { + h := xxhash.New() + _, _ = h.WriteString(target) + m, _ := r.Marshal() + _, _ = h.Write(m) + + return string(h.Sum(nil)) +} + +func (i *seriesInterceptor) RecvMsg(m interface{}) error { + if i.hashedReq == "" { + return i.ClientStream.RecvMsg(m) + } + if i.cachedResponsesAvailable { + if len(i.responses) == 0 { + return io.EOF + } + resp, ok := m.(*storepb.SeriesResponse) + if !ok { + panic("should be a series response type") + } + *resp = *i.responses[0] + i.responses = i.responses[1:] + return nil + } + + if err := i.ClientStream.RecvMsg(m); err != nil { + if err == io.EOF && len(i.responses) > 0 { + putResponses(i.c, i.hashedReq, i.responses) + } + return err + } + if resp, ok := m.(*storepb.SeriesResponse); ok { + i.responses = append(i.responses, resp) + } + return nil +} + +func (i *seriesInterceptor) SendMsg(m interface{}) error { + if req, ok := m.(*storepb.SeriesRequest); ok { + i.hashedReq = hashReqTarget(req, i.target) + + responses, err := getResponses(i.c, i.hashedReq) + if err == nil { + i.responses = responses + i.cachedResponsesAvailable = true + i.cachedCalls.Inc() + } + + } + return i.ClientStream.SendMsg(m) +} + +type SeriesRequestCachingInterceptor struct { + inmemoryCache *cache.InMemoryCache + passthrough bool + + cachedCalls prometheus.Counter +} + +func NewSeriesRequestcachingInterceptor(reg *prometheus.Registry, maxSize model.Bytes, maxItemSize model.Bytes) (*SeriesRequestCachingInterceptor, error) { + if maxSize == 0 { + maxItemSize = 0 + } + imc, err := cache.NewInMemoryCacheWithConfig( + "cachinginterceptor", + log.NewLogfmtLogger(os.Stderr), + reg, + cache.InMemoryCacheConfig{ + MaxSize: maxSize, + MaxItemSize: maxItemSize, + }, + ) + if err != nil { + return nil, err + } + return &SeriesRequestCachingInterceptor{ + inmemoryCache: imc, + passthrough: maxSize == 0, + cachedCalls: promauto.With(prometheus.Registerer(reg)).NewCounter(prometheus.CounterOpts{ + Name: "thanos_grpc_cached_calls_total", + Help: "How many Series() calls were fully cached", + }), + }, nil +} + +func (i *SeriesRequestCachingInterceptor) GetInterceptor() grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + cs, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + return cs, err + } + + if i.passthrough { + return cs, nil + } + + if svc, actualMethod := splitMethodName(method); svc == "thanos.Store" && actualMethod == "Series" { + return &seriesInterceptor{ + ClientStream: cs, + target: cc.Target(), + c: i.inmemoryCache, + cachedCalls: i.cachedCalls, + }, nil + } + + return cs, nil + } +}