Skip to content

Commit

Permalink
[query] Hide m3 iterator expansion behind a query flag (#1256)
Browse files Browse the repository at this point in the history
* Hide block iterator expansion behind a query flag until issues debugged

* Linter

* Pull prom query flag parsing into a function
  • Loading branch information
arnikola authored Dec 19, 2018
1 parent c5015ef commit a7d9ccb
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 48 deletions.
53 changes: 30 additions & 23 deletions src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
queryParam = "query"
stepParam = "step"
debugParam = "debug"
useIterParam = "iters"
endExclusiveParam = "end-exclusive"

formatErrStr = "error parsing param: %s, error: %v"
Expand Down Expand Up @@ -115,18 +116,9 @@ func parseParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) {
if err != nil {
return params, xhttp.NewParseError(fmt.Errorf(formatErrStr, queryParam, err), http.StatusBadRequest)
}
params.Query = query

// Skip debug if unable to parse debug param
debugVal := r.FormValue(debugParam)
if debugVal != "" {
debug, err := strconv.ParseBool(r.FormValue(debugParam))
if err != nil {
logging.WithContext(r.Context()).Warn("unable to parse debug flag", zap.Any("error", err))
}
params.Debug = debug
}

params.Query = query
params.Debug, params.UseIterators = parseDebugAndIterFlags(r)
// Default to including end if unable to parse the flag
endExclusiveVal := r.FormValue(endExclusiveParam)
params.IncludeEnd = true
Expand All @@ -146,6 +138,32 @@ func parseParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) {
return params, nil
}

func parseDebugAndIterFlags(r *http.Request) (bool, bool) {
var (
debug, useIter bool
err error
)
// Skip debug if unable to parse debug param
debugVal := r.FormValue(debugParam)
if debugVal != "" {
debug, err = strconv.ParseBool(r.FormValue(debugParam))
if err != nil {
logging.WithContext(r.Context()).Warn("unable to parse debug flag", zap.Error(err))
}
}

// Skip useIterators if unable to parse useIterators param
useIterVal := r.FormValue(useIterParam)
if useIterVal != "" {
useIter, err = strconv.ParseBool(r.FormValue(useIterParam))
if err != nil {
logging.WithContext(r.Context()).Warn("unable to parse useIter flag", zap.Error(err))
}
}

return debug, useIter
}

// parseInstantaneousParams parses all params from the GET request
func parseInstantaneousParams(r *http.Request) (models.RequestParams, *xhttp.ParseError) {
params := models.RequestParams{
Expand Down Expand Up @@ -176,18 +194,7 @@ func parseInstantaneousParams(r *http.Request) (models.RequestParams, *xhttp.Par
return params, xhttp.NewParseError(fmt.Errorf(formatErrStr, queryParam, err), http.StatusBadRequest)
}
params.Query = query

// Skip debug if unable to parse debug param
debugVal := r.FormValue(debugParam)
if debugVal != "" {
debug, err := strconv.ParseBool(r.FormValue(debugParam))
if err != nil {
logging.WithContext(r.Context()).Warn("unable to parse debug flag", zap.Any("error", err))
}

params.Debug = debug
}

params.Debug, params.UseIterators = parseDebugAndIterFlags(r)
return params, nil
}

Expand Down
6 changes: 4 additions & 2 deletions src/query/executor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ func GenerateExecutionState(
}

options := transform.Options{
TimeSpec: pplan.TimeSpec,
Debug: pplan.Debug,
TimeSpec: pplan.TimeSpec,
Debug: pplan.Debug,
UseIterators: pplan.UseIterators,
}

controller, err := state.createNode(step, options)
if err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions src/query/executor/transform/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ import (

// Options to create transform nodes
type Options struct {
TimeSpec TimeSpec
Debug bool
TimeSpec TimeSpec
Debug bool
UseIterators bool
}

// OpNode represents the execution node
Expand Down
24 changes: 17 additions & 7 deletions src/query/functions/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ type FetchOp struct {
// FetchNode is the execution node
// TODO: Make FetchNode private
type FetchNode struct {
op FetchOp
controller *transform.Controller
storage storage.Storage
timespec transform.TimeSpec
debug bool
op FetchOp
controller *transform.Controller
storage storage.Storage
timespec transform.TimeSpec
debug bool
useIterators bool
}

// OpType for the operator
Expand All @@ -76,7 +77,14 @@ func (o FetchOp) String() string {

// Node creates an execution node
func (o FetchOp) Node(controller *transform.Controller, storage storage.Storage, options transform.Options) parser.Source {
return &FetchNode{op: o, controller: controller, storage: storage, timespec: options.TimeSpec, debug: options.Debug}
return &FetchNode{
op: o,
controller: controller,
storage: storage,
timespec: options.TimeSpec,
debug: options.Debug,
useIterators: options.UseIterators,
}
}

// Execute runs the fetch node operation
Expand All @@ -90,7 +98,9 @@ func (n *FetchNode) Execute(ctx context.Context) error {
End: endTime,
TagMatchers: n.op.Matchers,
Interval: timeSpec.Step,
}, &storage.FetchOptions{})
}, &storage.FetchOptions{
UseIterators: n.useIterators,
})
if err != nil {
return err
}
Expand Down
15 changes: 8 additions & 7 deletions src/query/models/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ type RequestParams struct {
Start time.Time
End time.Time
// Now captures the current time and fixes it throughout the request, we may let people override it in the future
Now time.Time
Timeout time.Duration
Step time.Duration
Query string
Debug bool
IncludeEnd bool
FormatType FormatType
Now time.Time
Timeout time.Duration
Step time.Duration
Query string
Debug bool
UseIterators bool
IncludeEnd bool
FormatType FormatType
}

// ExclusiveEnd returns the end exclusive
Expand Down
14 changes: 8 additions & 6 deletions src/query/plan/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ import (

// PhysicalPlan represents the physical plan
type PhysicalPlan struct {
steps map[parser.NodeID]LogicalStep
pipeline []parser.NodeID // Ordered list of steps to be performed
ResultStep ResultOp
TimeSpec transform.TimeSpec
Debug bool
steps map[parser.NodeID]LogicalStep
pipeline []parser.NodeID // Ordered list of steps to be performed
ResultStep ResultOp
TimeSpec transform.TimeSpec
Debug bool
UseIterators bool
}

// ResultOp is resonsible for delivering results to the clients
Expand All @@ -59,7 +60,8 @@ func NewPhysicalPlan(lp LogicalPlan, storage storage.Storage, params models.Requ
Now: params.Now,
Step: params.Step,
},
Debug: params.Debug,
Debug: params.Debug,
UseIterators: params.UseIterators,
}

pl, err := p.createResultNode()
Expand Down
9 changes: 9 additions & 0 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ func (s *m3storage) FetchBlocks(
query *storage.FetchQuery,
options *storage.FetchOptions,
) (block.Result, error) {
if !options.UseIterators {
fetchResult, err := s.Fetch(ctx, query, options)
if err != nil {
return block.Result{}, err
}

return storage.FetchResultToBlockResult(fetchResult, query)
}

raw, _, err := s.FetchCompressed(ctx, query, options)
if err != nil {
return block.Result{}, err
Expand Down
3 changes: 2 additions & 1 deletion src/query/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (q *FetchQuery) String() string {
// FetchOptions represents the options for fetch query.
type FetchOptions struct {
// Limit is the maximum number of series to return.
Limit int
Limit int
UseIterators bool
}

// NewFetchOptions creates a new fetch options.
Expand Down

0 comments on commit a7d9ccb

Please sign in to comment.