From a7d9ccb1393226c1a6af98e08cc10a83f03d9346 Mon Sep 17 00:00:00 2001 From: arnikola Date: Wed, 19 Dec 2018 18:45:56 -0500 Subject: [PATCH] [query] Hide m3 iterator expansion behind a query flag (#1256) * Hide block iterator expansion behind a query flag until issues debugged * Linter * Pull prom query flag parsing into a function --- .../v1/handler/prometheus/native/common.go | 53 +++++++++++-------- src/query/executor/state.go | 6 ++- src/query/executor/transform/types.go | 5 +- src/query/functions/fetch.go | 24 ++++++--- src/query/models/params.go | 15 +++--- src/query/plan/physical.go | 14 ++--- src/query/storage/m3/storage.go | 9 ++++ src/query/storage/types.go | 3 +- 8 files changed, 81 insertions(+), 48 deletions(-) diff --git a/src/query/api/v1/handler/prometheus/native/common.go b/src/query/api/v1/handler/prometheus/native/common.go index ad18e6c61e..c82e7b1b29 100644 --- a/src/query/api/v1/handler/prometheus/native/common.go +++ b/src/query/api/v1/handler/prometheus/native/common.go @@ -48,6 +48,7 @@ const ( queryParam = "query" stepParam = "step" debugParam = "debug" + useIterParam = "iters" endExclusiveParam = "end-exclusive" formatErrStr = "error parsing param: %s, error: %v" @@ -115,18 +116,9 @@ func parseParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) { if err != nil { return params, xhttp.NewParseError(fmt.Errorf(formatErrStr, queryParam, err), http.StatusBadRequest) } - params.Query = query - - // Skip debug if unable to parse debug param - debugVal := r.FormValue(debugParam) - if debugVal != "" { - debug, err := strconv.ParseBool(r.FormValue(debugParam)) - if err != nil { - logging.WithContext(r.Context()).Warn("unable to parse debug flag", zap.Any("error", err)) - } - params.Debug = debug - } + params.Query = query + params.Debug, params.UseIterators = parseDebugAndIterFlags(r) // Default to including end if unable to parse the flag endExclusiveVal := r.FormValue(endExclusiveParam) params.IncludeEnd = true @@ -146,6 +138,32 @@ func parseParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) { return params, nil } +func parseDebugAndIterFlags(r *http.Request) (bool, bool) { + var ( + debug, useIter bool + err error + ) + // Skip debug if unable to parse debug param + debugVal := r.FormValue(debugParam) + if debugVal != "" { + debug, err = strconv.ParseBool(r.FormValue(debugParam)) + if err != nil { + logging.WithContext(r.Context()).Warn("unable to parse debug flag", zap.Error(err)) + } + } + + // Skip useIterators if unable to parse useIterators param + useIterVal := r.FormValue(useIterParam) + if useIterVal != "" { + useIter, err = strconv.ParseBool(r.FormValue(useIterParam)) + if err != nil { + logging.WithContext(r.Context()).Warn("unable to parse useIter flag", zap.Error(err)) + } + } + + return debug, useIter +} + // parseInstantaneousParams parses all params from the GET request func parseInstantaneousParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) { params := models.RequestParams{ @@ -176,18 +194,7 @@ func parseInstantaneousParams(r *http.Request) (models.RequestParams, *xhttp.Par return params, xhttp.NewParseError(fmt.Errorf(formatErrStr, queryParam, err), http.StatusBadRequest) } params.Query = query - - // Skip debug if unable to parse debug param - debugVal := r.FormValue(debugParam) - if debugVal != "" { - debug, err := strconv.ParseBool(r.FormValue(debugParam)) - if err != nil { - logging.WithContext(r.Context()).Warn("unable to parse debug flag", zap.Any("error", err)) - } - - params.Debug = debug - } - + params.Debug, params.UseIterators = parseDebugAndIterFlags(r) return params, nil } diff --git a/src/query/executor/state.go b/src/query/executor/state.go index be6c9e133e..875e5d9c62 100644 --- a/src/query/executor/state.go +++ b/src/query/executor/state.go @@ -118,9 +118,11 @@ func GenerateExecutionState( } options := transform.Options{ - TimeSpec: pplan.TimeSpec, - Debug: pplan.Debug, + TimeSpec: pplan.TimeSpec, + Debug: pplan.Debug, + UseIterators: pplan.UseIterators, } + controller, err := state.createNode(step, options) if err != nil { return nil, err diff --git a/src/query/executor/transform/types.go b/src/query/executor/transform/types.go index 65e4b1234f..3ab5b4838a 100644 --- a/src/query/executor/transform/types.go +++ b/src/query/executor/transform/types.go @@ -30,8 +30,9 @@ import ( // Options to create transform nodes type Options struct { - TimeSpec TimeSpec - Debug bool + TimeSpec TimeSpec + Debug bool + UseIterators bool } // OpNode represents the execution node diff --git a/src/query/functions/fetch.go b/src/query/functions/fetch.go index 2716c1ab1d..ea0934ff99 100644 --- a/src/query/functions/fetch.go +++ b/src/query/functions/fetch.go @@ -49,11 +49,12 @@ type FetchOp struct { // FetchNode is the execution node // TODO: Make FetchNode private type FetchNode struct { - op FetchOp - controller *transform.Controller - storage storage.Storage - timespec transform.TimeSpec - debug bool + op FetchOp + controller *transform.Controller + storage storage.Storage + timespec transform.TimeSpec + debug bool + useIterators bool } // OpType for the operator @@ -76,7 +77,14 @@ func (o FetchOp) String() string { // Node creates an execution node func (o FetchOp) Node(controller *transform.Controller, storage storage.Storage, options transform.Options) parser.Source { - return &FetchNode{op: o, controller: controller, storage: storage, timespec: options.TimeSpec, debug: options.Debug} + return &FetchNode{ + op: o, + controller: controller, + storage: storage, + timespec: options.TimeSpec, + debug: options.Debug, + useIterators: options.UseIterators, + } } // Execute runs the fetch node operation @@ -90,7 +98,9 @@ func (n *FetchNode) Execute(ctx context.Context) error { End: endTime, TagMatchers: n.op.Matchers, Interval: timeSpec.Step, - }, &storage.FetchOptions{}) + }, &storage.FetchOptions{ + UseIterators: n.useIterators, + }) if err != nil { return err } diff --git a/src/query/models/params.go b/src/query/models/params.go index f06df5bf4c..5f91b3ca1e 100644 --- a/src/query/models/params.go +++ b/src/query/models/params.go @@ -44,13 +44,14 @@ type RequestParams struct { Start time.Time End time.Time // Now captures the current time and fixes it throughout the request, we may let people override it in the future - Now time.Time - Timeout time.Duration - Step time.Duration - Query string - Debug bool - IncludeEnd bool - FormatType FormatType + Now time.Time + Timeout time.Duration + Step time.Duration + Query string + Debug bool + UseIterators bool + IncludeEnd bool + FormatType FormatType } // ExclusiveEnd returns the end exclusive diff --git a/src/query/plan/physical.go b/src/query/plan/physical.go index 927961f8f2..c9302ebd50 100644 --- a/src/query/plan/physical.go +++ b/src/query/plan/physical.go @@ -32,11 +32,12 @@ import ( // PhysicalPlan represents the physical plan type PhysicalPlan struct { - steps map[parser.NodeID]LogicalStep - pipeline []parser.NodeID // Ordered list of steps to be performed - ResultStep ResultOp - TimeSpec transform.TimeSpec - Debug bool + steps map[parser.NodeID]LogicalStep + pipeline []parser.NodeID // Ordered list of steps to be performed + ResultStep ResultOp + TimeSpec transform.TimeSpec + Debug bool + UseIterators bool } // ResultOp is resonsible for delivering results to the clients @@ -59,7 +60,8 @@ func NewPhysicalPlan(lp LogicalPlan, storage storage.Storage, params models.Requ Now: params.Now, Step: params.Step, }, - Debug: params.Debug, + Debug: params.Debug, + UseIterators: params.UseIterators, } pl, err := p.createResultNode() diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index e1d88c91ea..8a67e00498 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -93,6 +93,15 @@ func (s *m3storage) FetchBlocks( query *storage.FetchQuery, options *storage.FetchOptions, ) (block.Result, error) { + if !options.UseIterators { + fetchResult, err := s.Fetch(ctx, query, options) + if err != nil { + return block.Result{}, err + } + + return storage.FetchResultToBlockResult(fetchResult, query) + } + raw, _, err := s.FetchCompressed(ctx, query, options) if err != nil { return block.Result{}, err diff --git a/src/query/storage/types.go b/src/query/storage/types.go index a1437db798..42f9eeb3f0 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -81,7 +81,8 @@ func (q *FetchQuery) String() string { // FetchOptions represents the options for fetch query. type FetchOptions struct { // Limit is the maximum number of series to return. - Limit int + Limit int + UseIterators bool } // NewFetchOptions creates a new fetch options.