From 45621afb5cbb2158f37c14d3ba170ef9343a0735 Mon Sep 17 00:00:00 2001 From: iguazio-deploy Date: Wed, 9 Jan 2019 08:58:41 +0000 Subject: [PATCH] Updated TSDB to v0.8.7 --- .../examples/nuclio/ingest/ingest_example.go | 2 +- .../examples/nuclio/query/query_example.go | 2 +- .../v3io/v3io-tsdb/pkg/pquerier/frames.go | 48 ++++---- .../v3io/v3io-tsdb/pkg/pquerier/querier.go | 6 +- .../pkg/pquerier/query_integration_test.go | 52 +++++---- .../v3io/v3io-tsdb/pkg/pquerier/select.go | 30 +++-- .../v3io/v3io-tsdb/pkg/pquerier/series.go | 5 +- .../v3io/v3io-tsdb/pkg/pquerier/sql_parser.go | 109 ++++++++++++++++++ .../v3io-tsdb/pkg/pquerier/sql_parser_test.go | 62 ++++++++++ .../v3io/v3io-tsdb/pkg/pquerier/types.go | 3 + .../v3io/v3io-tsdb/pkg/tsdbctl/query.go | 17 ++- .../examples/nuclio/ingest/ingest_example.go | 2 +- .../examples/nuclio/query/query_example.go | 2 +- .../v3io/v3io-tsdb/pkg/pquerier/frames.go | 48 ++++---- .../v3io/v3io-tsdb/pkg/pquerier/querier.go | 6 +- .../pkg/pquerier/query_integration_test.go | 52 +++++---- .../v3io/v3io-tsdb/pkg/pquerier/select.go | 30 +++-- .../v3io/v3io-tsdb/pkg/pquerier/series.go | 5 +- .../v3io/v3io-tsdb/pkg/pquerier/sql_parser.go | 109 ++++++++++++++++++ .../v3io-tsdb/pkg/pquerier/sql_parser_test.go | 62 ++++++++++ .../v3io/v3io-tsdb/pkg/pquerier/types.go | 3 + .../v3io/v3io-tsdb/pkg/tsdbctl/query.go | 17 ++- 22 files changed, 542 insertions(+), 130 deletions(-) create mode 100644 functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go create mode 100644 functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go create mode 100644 functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go create mode 100644 functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/ingest/ingest_example.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/ingest/ingest_example.go index 2c1f896d..fb9bb1e3 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/ingest/ingest_example.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/ingest/ingest_example.go @@ -178,7 +178,7 @@ func createTSDBAppender(context *nuclio.Context, path string) error { if containerName == "" { containerName = "bigdata" } - container, err := tsdb.NewContainer(v3ioUrl, numWorkers, username, password, containerName, context.Logger) + container, err := tsdb.NewContainer(v3ioUrl, numWorkers, "", username, password, containerName, context.Logger) if err != nil { return err } diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/query/query_example.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/query/query_example.go index 34e5d00a..abccd980 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/query/query_example.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/query/query_example.go @@ -132,7 +132,7 @@ func createV3ioQuerier(context *nuclio.Context, path string) error { if containerName == "" { containerName = "bigdata" } - container, err := tsdb.NewContainer(v3ioUrl, numWorkers, username, password, containerName, context.Logger) + container, err := tsdb.NewContainer(v3ioUrl, numWorkers, "", username, password, containerName, context.Logger) if err != nil { return err } diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go index 71488949..c9db07a3 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go @@ -235,12 +235,12 @@ func (d *dataFrame) addMetricIfNotExist(metricName string, columnSize int, useSe func (d *dataFrame) addMetricFromTemplate(metricName string, columnSize int, useServerAggregates bool) error { newColumns := make([]Column, len(d.columnsTemplates)) for i, col := range d.columnsTemplates { + col.metric = metricName newCol, err := createColumn(col, columnSize, useServerAggregates) if err != nil { return err } - newCol.setMetricName(metricName) newColumns[i] = newCol if aggregate.IsCountAggregate(col.function) { d.metricToCountColumn[metricName] = newCol @@ -347,7 +347,7 @@ func (d *dataFrame) TimeSeries(i int) (utils.Series, error) { // t2 v1 v3 // func (d *dataFrame) rawSeriesToColumns() { - var timeData []int64 + var timeData []time.Time columns := make([][]interface{}, len(d.rawColumns)) nonExhaustedIterators := len(d.rawColumns) @@ -365,7 +365,7 @@ func (d *dataFrame) rawSeriesToColumns() { for nonExhaustedIterators > 0 { currentTime = nextTime nextTime = int64(math.MaxInt64) - timeData = append(timeData, currentTime) + timeData = append(timeData, time.Unix(currentTime/1000, (currentTime%1000)*1e6)) for seriesIndex, rawSeries := range d.rawColumns { iter := rawSeries.Iterator() @@ -398,7 +398,7 @@ func (d *dataFrame) rawSeriesToColumns() { numberOfRows := len(timeData) colSpec := columnMeta{metric: "time"} - d.index = NewDataColumn("time", colSpec, numberOfRows, IntType) + d.index = NewDataColumn("time", colSpec, numberOfRows, TimeType) d.index.SetData(timeData, numberOfRows) d.columns = make([]Column, len(d.rawColumns)) @@ -420,13 +420,13 @@ func (d *dataFrame) shouldGenerateRawColumns() bool { return d.isRawSeries && !d // Column is a data column type Column interface { - Len() int // Number of elements - Name() string // Column name - DType() DType // Data type (e.g. IntType, FloatType ...) - FloatAt(i int) (float64, error) // Float value at index i - StringAt(i int) (string, error) // String value at index i - TimeAt(i int) (int64, error) // time value at index i - GetColumnSpec() columnMeta // Get the column's metadata + Len() int // Number of elements + Name() string // Column name + DType() DType // Data type (e.g. IntType, FloatType ...) + FloatAt(i int) (float64, error) // Float value at index i + StringAt(i int) (string, error) // String value at index i + TimeAt(i int) (time.Time, error) // time value at index i + GetColumnSpec() columnMeta // Get the column's metadata SetDataAt(i int, value interface{}) error SetData(d interface{}, size int) error GetInterpolationFunction() (InterpolationFunction, int64) @@ -552,21 +552,21 @@ func (dc *dataColumn) StringAt(i int) (string, error) { } // TimeAt returns time.Time value at index i -func (dc *dataColumn) TimeAt(i int) (int64, error) { +func (dc *dataColumn) TimeAt(i int) (time.Time, error) { if !dc.isValidIndex(i) { - return 0, fmt.Errorf("index %d out of bounds [0:%d]", i, dc.size) + return time.Unix(0, 0), fmt.Errorf("index %d out of bounds [0:%d]", i, dc.size) } - typedCol, ok := dc.data.([]int64) + typedCol, ok := dc.data.([]time.Time) if !ok { genericCol, ok := dc.data.([]interface{}) if ok { - i, ok := genericCol[i].(int64) + i, ok := genericCol[i].(time.Time) if ok { return i, nil } } - return 0, fmt.Errorf("wrong type (type is %s)", dc.DType()) + return time.Unix(0, 0), fmt.Errorf("wrong type (type is %s)", dc.DType()) } return typedCol[i], nil @@ -630,8 +630,8 @@ func (c *ConcreteColumn) FloatAt(i int) (float64, error) { func (c *ConcreteColumn) StringAt(i int) (string, error) { return "", errors.New("aggregated column does not support string type") } -func (c *ConcreteColumn) TimeAt(i int) (int64, error) { - return 0, errors.New("aggregated column does not support time type") +func (c *ConcreteColumn) TimeAt(i int) (time.Time, error) { + return time.Unix(0, 0), errors.New("aggregated column does not support time type") } func (c *ConcreteColumn) SetDataAt(i int, val interface{}) error { if !c.isValidIndex(i) { @@ -681,14 +681,6 @@ func (c *virtualColumn) StringAt(i int) (string, error) { } return value.(string), nil } -func (c *virtualColumn) TimeAt(i int) (int64, error) { - if !c.isValidIndex(i) { - return 0, fmt.Errorf("index %d out of bounds [0:%d]", i, c.size) - } - - value, err := c.function(c.dependantColumns, i) - if err != nil { - return 0, err - } - return value.(int64), nil +func (c *virtualColumn) TimeAt(i int) (time.Time, error) { + return time.Unix(0, 0), errors.New("aggregated column does not support time type") } diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go index 5061eec1..78f9e067 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go @@ -287,7 +287,7 @@ func (q *V3ioQuerier) GetLabelSets(metric string, filter string) ([]utils.Labels input := v3io.GetItemsInput{ Path: partitionPaths[0], Filter: filter, - AttributeNames: []string{config.LabelSetAttrName}, + AttributeNames: []string{config.LabelSetAttrName, config.MetricNameAttrName}, } iter, err := utils.NewAsyncItemsCursor(q.container, &input, q.cfg.QryWorkers, shardingKeys, q.logger) @@ -298,12 +298,14 @@ func (q *V3ioQuerier) GetLabelSets(metric string, filter string) ([]utils.Labels // Iterate over the results for iter.Next() { labelSet := iter.GetField(config.LabelSetAttrName).(string) - currLabels, err := utils.LabelsFromString(labelSet) if err != nil { return nil, err } + currLabels = append(utils.LabelsFromStringList(config.PrometheusMetricNameAttribute, + iter.GetField(config.MetricNameAttrName).(string)), currLabels...) + labelsMap[currLabels.Hash()] = currLabels } diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go index 5ca70d24..38f495cd 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go @@ -1424,7 +1424,8 @@ func (suite *testQuerySuite) TestDataFrameRawDataMultipleMetrics() { for i := 0; i < frame.Index().Len(); i++ { t, _ := in.TimeAt(i) - assert.Equal(suite.T(), expectedTimeColumn[i], t, "time column does not match at index %v", i) + timeMillis := t.UnixNano() / int64(time.Millisecond) + assert.Equal(suite.T(), expectedTimeColumn[i], timeMillis, "time column does not match at index %v", i) for _, column := range cols { v, _ := column.FloatAt(i) @@ -1555,7 +1556,8 @@ func (suite *testQuerySuite) TestVariantTypeQueryWithDataFrame() { for i := 0; i < frame.Index().Len(); i++ { t, _ := in.TimeAt(i) - assert.Equal(suite.T(), expectedTimeColumn[i], t, "time column does not match at index %v", i) + timeMillis := t.UnixNano() / int64(time.Millisecond) + assert.Equal(suite.T(), expectedTimeColumn[i], timeMillis, "time column does not match at index %v", i) for _, column := range cols { v, _ := column.StringAt(i) @@ -1922,7 +1924,7 @@ func (suite *getLabelSetsSuite) TestGetLabels() { adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil) suite.Require().NoError(err, "failed to create v3io adapter") - expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), + labels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), utils.LabelsFromStringList("os", "linux", "region", "asia"), utils.LabelsFromStringList("os", "mac", "region", "europe")} numberOfEvents := 10 @@ -1935,18 +1937,21 @@ func (suite *getLabelSetsSuite) TestGetLabels() { Key: tsdbtest.OptTimeSeries, Value: tsdbtest.TimeSeries{tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[0], + Labels: labels[0], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[1], + Labels: labels[1], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[2], + Labels: labels[2], Data: ingestedData}, }}) tsdbtest.InsertData(suite.T(), testParams) + expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "linux", "region", "asia", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "mac", "region", "europe", config.PrometheusMetricNameAttribute, "cpu")} querierV2, err := adapter.QuerierV2() suite.Require().NoError(err, "failed to create querier v2") @@ -1963,7 +1968,7 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllMetrics() { adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil) suite.Require().NoError(err, "failed to create v3io adapter") - expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), + labels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), utils.LabelsFromStringList("os", "linux", "region", "asia"), utils.LabelsFromStringList("os", "mac", "region", "europe")} numberOfEvents := 10 @@ -1976,18 +1981,21 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllMetrics() { Key: tsdbtest.OptTimeSeries, Value: tsdbtest.TimeSeries{tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[0], + Labels: labels[0], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[1], + Labels: labels[1], Data: ingestedData}, tsdbtest.Metric{ Name: "diskio", - Labels: expectedLabels[2], + Labels: labels[2], Data: ingestedData}, }}) tsdbtest.InsertData(suite.T(), testParams) + expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "linux", "region", "asia", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "mac", "region", "europe", config.PrometheusMetricNameAttribute, "diskio")} querierV2, err := adapter.QuerierV2() suite.Require().NoError(err, "failed to create querier v2") @@ -2004,7 +2012,7 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllSpecificMetric() { adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil) suite.Require().NoError(err, "failed to create v3io adapter") - expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), + labels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), utils.LabelsFromStringList("os", "linux", "region", "asia"), utils.LabelsFromStringList("os", "mac", "region", "europe")} numberOfEvents := 10 @@ -2017,18 +2025,20 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllSpecificMetric() { Key: tsdbtest.OptTimeSeries, Value: tsdbtest.TimeSeries{tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[0], + Labels: labels[0], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[1], + Labels: labels[1], Data: ingestedData}, tsdbtest.Metric{ Name: "diskio", - Labels: expectedLabels[2], + Labels: labels[2], Data: ingestedData}, }}) tsdbtest.InsertData(suite.T(), testParams) + expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "linux", "region", "asia", config.PrometheusMetricNameAttribute, "cpu")} querierV2, err := adapter.QuerierV2() suite.Require().NoError(err, "failed to create querier v2") @@ -2038,14 +2048,14 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllSpecificMetric() { suite.T().Fatalf("failed to get label sets, err:%v\n", err) } - suite.ElementsMatch(expectedLabels[:2], labelsList, "actual label sets does not match expected") + suite.ElementsMatch(expectedLabels, labelsList, "actual label sets does not match expected") } func (suite *getLabelSetsSuite) TestGetLabelsWithFilter() { adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil) suite.Require().NoError(err, "failed to create v3io adapter") - expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), + labels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), utils.LabelsFromStringList("os", "linux", "region", "asia"), utils.LabelsFromStringList("os", "mac", "region", "europe")} numberOfEvents := 10 @@ -2058,18 +2068,20 @@ func (suite *getLabelSetsSuite) TestGetLabelsWithFilter() { Key: tsdbtest.OptTimeSeries, Value: tsdbtest.TimeSeries{tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[0], + Labels: labels[0], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[1], + Labels: labels[1], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[2], + Labels: labels[2], Data: ingestedData}, }}) tsdbtest.InsertData(suite.T(), testParams) + expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "linux", "region", "asia", config.PrometheusMetricNameAttribute, "cpu")} querierV2, err := adapter.QuerierV2() suite.Require().NoError(err, "failed to create querier v2") @@ -2079,7 +2091,7 @@ func (suite *getLabelSetsSuite) TestGetLabelsWithFilter() { suite.T().Fatalf("failed to get label sets, err:%v\n", err) } - suite.ElementsMatch(expectedLabels[:2], labelsList, "actual label sets does not match expected") + suite.ElementsMatch(expectedLabels, labelsList, "actual label sets does not match expected") } func TestGetLabelSetsSuite(t *testing.T) { diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go index d33635ed..3d3f6d4e 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/nuclio/logger" "github.com/pkg/errors" @@ -45,12 +46,6 @@ type selectQueryContext struct { func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params *SelectParams) (*frameIterator, error) { queryCtx.dataFrames = make(map[uint64]*dataFrame) - // If step isn't passed (e.g., when using the console), the step is the - // difference between the end (maxt) and start (mint) times (e.g., 5 minutes) - if params.Functions != "" && params.Step == 0 { - params.Step = params.To - params.From - } - queryCtx.queryParams = params var err error queryCtx.columnsSpec, queryCtx.columnsSpecByMetric, err = queryCtx.createColumnSpecs() @@ -58,6 +53,12 @@ func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params * return nil, err } + // If step isn't passed (e.g., when using the console), the step is the + // difference between the end (maxt) and start (mint) times (e.g., 5 minutes) + if queryCtx.hasAtLeastOneFunction() && params.Step == 0 { + queryCtx.queryParams.Step = params.To - params.From + } + // We query every partition for every requested metric queries := make([]*partQuery, len(parts)*len(queryCtx.columnsSpecByMetric)) @@ -376,17 +377,28 @@ func (queryCtx *selectQueryContext) getOrCreateTimeColumn() Column { func (queryCtx *selectQueryContext) generateTimeColumn() Column { columnMeta := columnMeta{metric: "time"} - timeColumn := NewDataColumn("time", columnMeta, queryCtx.getResultBucketsSize(), IntType) + timeColumn := NewDataColumn("time", columnMeta, queryCtx.getResultBucketsSize(), TimeType) i := 0 for t := queryCtx.queryParams.From; t <= queryCtx.queryParams.To; t += queryCtx.queryParams.Step { - timeColumn.SetDataAt(i, t) + timeColumn.SetDataAt(i, time.Unix(t/1000, (t%1000)*1e6)) i++ } return timeColumn } func (queryCtx *selectQueryContext) isRawQuery() bool { - return (queryCtx.queryParams.Functions == "" && queryCtx.queryParams.Step == 0) || queryCtx.queryParams.disableClientAggr + return (!queryCtx.hasAtLeastOneFunction() && queryCtx.queryParams.Step == 0) || queryCtx.queryParams.disableClientAggr +} + +func (queryCtx *selectQueryContext) hasAtLeastOneFunction() bool { + atLeastOneFunction := false + for _, col := range queryCtx.columnsSpec { + if col.function != 0 { + atLeastOneFunction = true + break + } + } + return atLeastOneFunction } func (queryCtx *selectQueryContext) getResultBucketsSize() int { diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go index 78f8033b..b2ef2562 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go @@ -2,6 +2,7 @@ package pquerier import ( "math" + "time" "github.com/v3io/v3io-tsdb/pkg/aggregate" "github.com/v3io/v3io-tsdb/pkg/chunkenc" @@ -73,7 +74,7 @@ func (it *dataFrameColumnSeriesIterator) At() (int64, float64) { if err != nil { it.err = err } - return t, v + return t.UnixNano() / int64(time.Millisecond), v } func (it *dataFrameColumnSeriesIterator) AtString() (int64, string) { @@ -85,7 +86,7 @@ func (it *dataFrameColumnSeriesIterator) AtString() (int64, string) { if err != nil { it.err = err } - return t, v + return t.UnixNano() / int64(time.Millisecond), v } func (it *dataFrameColumnSeriesIterator) Next() bool { diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go new file mode 100644 index 00000000..ab28b21c --- /dev/null +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go @@ -0,0 +1,109 @@ +package pquerier + +import ( + "fmt" + "strings" + + "github.com/xwb1989/sqlparser" +) + +const emptyTableName = "dual" + +// ParseQuery Parses an sql query into `tsdb.selectParams` +// Currently supported syntax: +// select - selecting multiple metrics, aggregations, interpolation functions and aliasing +// from - only one table +// where - equality, and range operators. Not supporting regex,`IS NULL`, etc.. +// group by +func ParseQuery(sql string) (*SelectParams, string, error) { + stmt, err := sqlparser.Parse(sql) + if err != nil { + return nil, "", err + } + slct, ok := stmt.(*sqlparser.Select) + if !ok { + return nil, "", fmt.Errorf("not a SELECT statement") + } + + fromTable, err := getTableName(slct) + if err != nil { + return nil, "", err + } + + selectParams := &SelectParams{} + var columns []RequestedColumn + + for _, sexpr := range slct.SelectExprs { + currCol := RequestedColumn{} + switch col := sexpr.(type) { + case *sqlparser.AliasedExpr: + if !col.As.IsEmpty() { + currCol.Alias = col.As.String() + } + + switch expr := col.Expr.(type) { + case *sqlparser.FuncExpr: + switch firstExpr := expr.Exprs[0].(type) { + case *sqlparser.AliasedExpr: + cc := firstExpr.Expr.(*sqlparser.ColName) + currCol.Function = sqlparser.String(expr.Name) + currCol.Interpolator = removeBackticks(sqlparser.String(cc.Qualifier.Name)) // Some of the interpolators are parsed with a ` + currCol.Metric = sqlparser.String(cc.Name) + case *sqlparser.StarExpr: + // Appending column with empty metric name, meaning a column template with the given aggregate + currCol.Function = sqlparser.String(expr.Name) + } + case *sqlparser.ColName: + currCol.Metric = sqlparser.String(expr.Name) + currCol.Interpolator = removeBackticks(sqlparser.String(expr.Qualifier.Name)) // Some of the interpolators are parsed with a ` + default: + return nil, "", fmt.Errorf("unknown columns type - %T", col.Expr) + } + columns = append(columns, currCol) + case *sqlparser.StarExpr: + // Appending empty column, meaning a column template for raw data + columns = append(columns, currCol) + default: + return nil, "", fmt.Errorf("unknown SELECT column type - %T", sexpr) + } + } + if len(columns) == 0 { + return nil, "", fmt.Errorf("no columns") + } + selectParams.RequestedColumns = columns + + if slct.Where != nil { + selectParams.Filter, _ = parseFilter(strings.TrimPrefix(sqlparser.String(slct.Where), " where ")) + } + if slct.GroupBy != nil { + selectParams.GroupBy = strings.TrimPrefix(sqlparser.String(slct.GroupBy), " group by ") + } + + return selectParams, fromTable, nil +} + +func getTableName(slct *sqlparser.Select) (string, error) { + if nTables := len(slct.From); nTables != 1 { + return "", fmt.Errorf("select from multiple tables is not supported (got %d)", nTables) + } + aliased, ok := slct.From[0].(*sqlparser.AliasedTableExpr) + if !ok { + return "", fmt.Errorf("not a table select") + } + table, ok := aliased.Expr.(sqlparser.TableName) + if !ok { + return "", fmt.Errorf("not a table in FROM field") + } + + tableStr := sqlparser.String(table) + if tableStr == emptyTableName { + return "", nil + } + return tableStr, nil +} +func parseFilter(originalFilter string) (string, error) { + return strings.Replace(originalFilter, " = ", " == ", -1), nil +} +func removeBackticks(origin string) string { + return strings.Replace(origin, "`", "", -1) +} diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go new file mode 100644 index 00000000..8b106ed5 --- /dev/null +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go @@ -0,0 +1,62 @@ +// +build unit + +package pquerier + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseQuery(t *testing.T) { + testCases := []struct { + input string + output *SelectParams + outputTable string + }{ + {input: "select columnA, columnB", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA"}, {Metric: "columnB"}}}}, + + {input: "select linear.columnA", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Interpolator: "linear"}}}}, + + {input: "select max(prev.columnA), avg(columnB)", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Interpolator: "prev", Function: "max"}, + {Metric: "columnB", Function: "avg"}}}}, + + {input: "select columnA where columnB = 'tal' and columnC < 'Neiman'", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA"}}, Filter: "columnB == 'tal' and columnC < 'Neiman'"}}, + + {input: "select max(columnA) group by columnB", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Function: "max"}}, GroupBy: "columnB"}}, + + {input: "select min(columnA) as bambi, max(linear.columnB) as bimba where columnB >= 123 group by columnB,columnC ", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Function: "min", Alias: "bambi"}, + {Metric: "columnB", Function: "max", Interpolator: "linear", Alias: "bimba"}}, + Filter: "columnB >= 123", GroupBy: "columnB, columnC"}}, + + {input: "select min(columnA) from my_table where columnB >= 123", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Function: "min"}}, + Filter: "columnB >= 123"}, + outputTable: "my_table"}, + + {input: "select * from my_table", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: ""}}}, + outputTable: "my_table"}, + + {input: "select max(*), avg(*) from my_table", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "", Function: "max"}, {Metric: "", Function: "avg"}}}, + outputTable: "my_table"}, + } + for _, test := range testCases { + t.Run(test.input, func(tt *testing.T) { + queryParams, table, err := ParseQuery(test.input) + if err != nil { + tt.Fatal(err) + } + + assert.Equal(tt, test.output, queryParams) + assert.Equal(tt, test.outputTable, table) + }) + } +} diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/types.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/types.go index 01c99314..5b1ad9d7 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/types.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/types.go @@ -66,6 +66,9 @@ func (c *columnMeta) isWildcard() bool { return c.metric == "*" } // Concrete Column = has real data behind it, Virtual column = described as a function on top of concrete columns func (c columnMeta) isConcrete() bool { return c.function == 0 || aggregate.IsRawAggregate(c.function) } func (c columnMeta) getColumnName() string { + if c.alias != "" { + return c.alias + } // If no aggregations are requested (raw down sampled data) if c.function == 0 { return c.metric diff --git a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/query.go b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/query.go index c110fbcc..495e2f51 100644 --- a/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/query.go +++ b/functions/ingest/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/query.go @@ -113,6 +113,7 @@ Arguments: "Aggregation interval for applying the aggregation functions\n(if set - see the -a|--aggregates flag), of the format\n\"[0-9]+[mhd]\" (where 'm' = minutes, 'h' = hours, and\n'd' = days). Examples: \"1h\"; \"150m\". (default =\n - )") cmd.Flags().StringVar(&commandeer.groupBy, "groupBy", "", "Comma separated list of labels to group the result by") + cmd.Flags().BoolVarP(&commandeer.oldQuerier, "oldQuerier", "q", false, "use old querier") cmd.Flags().Lookup("oldQuerier").Hidden = true cmd.Flags().Lookup("windows").Hidden = true // hidden, because only supported in old querier. @@ -186,8 +187,20 @@ func (qc *queryCommandeer) newQuery(from, to, step int64) error { return errors.Wrap(err, "Failed to initialize the Querier object.") } - selectParams := &pquerier.SelectParams{Name: qc.name, Functions: qc.functions, - Step: step, Filter: qc.filter, From: from, To: to, GroupBy: qc.groupBy} + var selectParams *pquerier.SelectParams + + if strings.HasPrefix(qc.name, "select") { + selectParams, _, err = pquerier.ParseQuery(qc.name) + if err != nil { + return errors.Wrap(err, "failed to parse sql") + } + selectParams.Step = step + selectParams.From = from + selectParams.To = to + } else { + selectParams = &pquerier.SelectParams{Name: qc.name, Functions: qc.functions, + Step: step, Filter: qc.filter, From: from, To: to, GroupBy: qc.groupBy} + } set, err := qry.Select(selectParams) if err != nil { diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/ingest/ingest_example.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/ingest/ingest_example.go index 2c1f896d..fb9bb1e3 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/ingest/ingest_example.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/ingest/ingest_example.go @@ -178,7 +178,7 @@ func createTSDBAppender(context *nuclio.Context, path string) error { if containerName == "" { containerName = "bigdata" } - container, err := tsdb.NewContainer(v3ioUrl, numWorkers, username, password, containerName, context.Logger) + container, err := tsdb.NewContainer(v3ioUrl, numWorkers, "", username, password, containerName, context.Logger) if err != nil { return err } diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/query/query_example.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/query/query_example.go index 34e5d00a..abccd980 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/query/query_example.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/examples/nuclio/query/query_example.go @@ -132,7 +132,7 @@ func createV3ioQuerier(context *nuclio.Context, path string) error { if containerName == "" { containerName = "bigdata" } - container, err := tsdb.NewContainer(v3ioUrl, numWorkers, username, password, containerName, context.Logger) + container, err := tsdb.NewContainer(v3ioUrl, numWorkers, "", username, password, containerName, context.Logger) if err != nil { return err } diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go index 71488949..c9db07a3 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/frames.go @@ -235,12 +235,12 @@ func (d *dataFrame) addMetricIfNotExist(metricName string, columnSize int, useSe func (d *dataFrame) addMetricFromTemplate(metricName string, columnSize int, useServerAggregates bool) error { newColumns := make([]Column, len(d.columnsTemplates)) for i, col := range d.columnsTemplates { + col.metric = metricName newCol, err := createColumn(col, columnSize, useServerAggregates) if err != nil { return err } - newCol.setMetricName(metricName) newColumns[i] = newCol if aggregate.IsCountAggregate(col.function) { d.metricToCountColumn[metricName] = newCol @@ -347,7 +347,7 @@ func (d *dataFrame) TimeSeries(i int) (utils.Series, error) { // t2 v1 v3 // func (d *dataFrame) rawSeriesToColumns() { - var timeData []int64 + var timeData []time.Time columns := make([][]interface{}, len(d.rawColumns)) nonExhaustedIterators := len(d.rawColumns) @@ -365,7 +365,7 @@ func (d *dataFrame) rawSeriesToColumns() { for nonExhaustedIterators > 0 { currentTime = nextTime nextTime = int64(math.MaxInt64) - timeData = append(timeData, currentTime) + timeData = append(timeData, time.Unix(currentTime/1000, (currentTime%1000)*1e6)) for seriesIndex, rawSeries := range d.rawColumns { iter := rawSeries.Iterator() @@ -398,7 +398,7 @@ func (d *dataFrame) rawSeriesToColumns() { numberOfRows := len(timeData) colSpec := columnMeta{metric: "time"} - d.index = NewDataColumn("time", colSpec, numberOfRows, IntType) + d.index = NewDataColumn("time", colSpec, numberOfRows, TimeType) d.index.SetData(timeData, numberOfRows) d.columns = make([]Column, len(d.rawColumns)) @@ -420,13 +420,13 @@ func (d *dataFrame) shouldGenerateRawColumns() bool { return d.isRawSeries && !d // Column is a data column type Column interface { - Len() int // Number of elements - Name() string // Column name - DType() DType // Data type (e.g. IntType, FloatType ...) - FloatAt(i int) (float64, error) // Float value at index i - StringAt(i int) (string, error) // String value at index i - TimeAt(i int) (int64, error) // time value at index i - GetColumnSpec() columnMeta // Get the column's metadata + Len() int // Number of elements + Name() string // Column name + DType() DType // Data type (e.g. IntType, FloatType ...) + FloatAt(i int) (float64, error) // Float value at index i + StringAt(i int) (string, error) // String value at index i + TimeAt(i int) (time.Time, error) // time value at index i + GetColumnSpec() columnMeta // Get the column's metadata SetDataAt(i int, value interface{}) error SetData(d interface{}, size int) error GetInterpolationFunction() (InterpolationFunction, int64) @@ -552,21 +552,21 @@ func (dc *dataColumn) StringAt(i int) (string, error) { } // TimeAt returns time.Time value at index i -func (dc *dataColumn) TimeAt(i int) (int64, error) { +func (dc *dataColumn) TimeAt(i int) (time.Time, error) { if !dc.isValidIndex(i) { - return 0, fmt.Errorf("index %d out of bounds [0:%d]", i, dc.size) + return time.Unix(0, 0), fmt.Errorf("index %d out of bounds [0:%d]", i, dc.size) } - typedCol, ok := dc.data.([]int64) + typedCol, ok := dc.data.([]time.Time) if !ok { genericCol, ok := dc.data.([]interface{}) if ok { - i, ok := genericCol[i].(int64) + i, ok := genericCol[i].(time.Time) if ok { return i, nil } } - return 0, fmt.Errorf("wrong type (type is %s)", dc.DType()) + return time.Unix(0, 0), fmt.Errorf("wrong type (type is %s)", dc.DType()) } return typedCol[i], nil @@ -630,8 +630,8 @@ func (c *ConcreteColumn) FloatAt(i int) (float64, error) { func (c *ConcreteColumn) StringAt(i int) (string, error) { return "", errors.New("aggregated column does not support string type") } -func (c *ConcreteColumn) TimeAt(i int) (int64, error) { - return 0, errors.New("aggregated column does not support time type") +func (c *ConcreteColumn) TimeAt(i int) (time.Time, error) { + return time.Unix(0, 0), errors.New("aggregated column does not support time type") } func (c *ConcreteColumn) SetDataAt(i int, val interface{}) error { if !c.isValidIndex(i) { @@ -681,14 +681,6 @@ func (c *virtualColumn) StringAt(i int) (string, error) { } return value.(string), nil } -func (c *virtualColumn) TimeAt(i int) (int64, error) { - if !c.isValidIndex(i) { - return 0, fmt.Errorf("index %d out of bounds [0:%d]", i, c.size) - } - - value, err := c.function(c.dependantColumns, i) - if err != nil { - return 0, err - } - return value.(int64), nil +func (c *virtualColumn) TimeAt(i int) (time.Time, error) { + return time.Unix(0, 0), errors.New("aggregated column does not support time type") } diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go index 5061eec1..78f9e067 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/querier.go @@ -287,7 +287,7 @@ func (q *V3ioQuerier) GetLabelSets(metric string, filter string) ([]utils.Labels input := v3io.GetItemsInput{ Path: partitionPaths[0], Filter: filter, - AttributeNames: []string{config.LabelSetAttrName}, + AttributeNames: []string{config.LabelSetAttrName, config.MetricNameAttrName}, } iter, err := utils.NewAsyncItemsCursor(q.container, &input, q.cfg.QryWorkers, shardingKeys, q.logger) @@ -298,12 +298,14 @@ func (q *V3ioQuerier) GetLabelSets(metric string, filter string) ([]utils.Labels // Iterate over the results for iter.Next() { labelSet := iter.GetField(config.LabelSetAttrName).(string) - currLabels, err := utils.LabelsFromString(labelSet) if err != nil { return nil, err } + currLabels = append(utils.LabelsFromStringList(config.PrometheusMetricNameAttribute, + iter.GetField(config.MetricNameAttrName).(string)), currLabels...) + labelsMap[currLabels.Hash()] = currLabels } diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go index 5ca70d24..38f495cd 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/query_integration_test.go @@ -1424,7 +1424,8 @@ func (suite *testQuerySuite) TestDataFrameRawDataMultipleMetrics() { for i := 0; i < frame.Index().Len(); i++ { t, _ := in.TimeAt(i) - assert.Equal(suite.T(), expectedTimeColumn[i], t, "time column does not match at index %v", i) + timeMillis := t.UnixNano() / int64(time.Millisecond) + assert.Equal(suite.T(), expectedTimeColumn[i], timeMillis, "time column does not match at index %v", i) for _, column := range cols { v, _ := column.FloatAt(i) @@ -1555,7 +1556,8 @@ func (suite *testQuerySuite) TestVariantTypeQueryWithDataFrame() { for i := 0; i < frame.Index().Len(); i++ { t, _ := in.TimeAt(i) - assert.Equal(suite.T(), expectedTimeColumn[i], t, "time column does not match at index %v", i) + timeMillis := t.UnixNano() / int64(time.Millisecond) + assert.Equal(suite.T(), expectedTimeColumn[i], timeMillis, "time column does not match at index %v", i) for _, column := range cols { v, _ := column.StringAt(i) @@ -1922,7 +1924,7 @@ func (suite *getLabelSetsSuite) TestGetLabels() { adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil) suite.Require().NoError(err, "failed to create v3io adapter") - expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), + labels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), utils.LabelsFromStringList("os", "linux", "region", "asia"), utils.LabelsFromStringList("os", "mac", "region", "europe")} numberOfEvents := 10 @@ -1935,18 +1937,21 @@ func (suite *getLabelSetsSuite) TestGetLabels() { Key: tsdbtest.OptTimeSeries, Value: tsdbtest.TimeSeries{tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[0], + Labels: labels[0], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[1], + Labels: labels[1], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[2], + Labels: labels[2], Data: ingestedData}, }}) tsdbtest.InsertData(suite.T(), testParams) + expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "linux", "region", "asia", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "mac", "region", "europe", config.PrometheusMetricNameAttribute, "cpu")} querierV2, err := adapter.QuerierV2() suite.Require().NoError(err, "failed to create querier v2") @@ -1963,7 +1968,7 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllMetrics() { adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil) suite.Require().NoError(err, "failed to create v3io adapter") - expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), + labels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), utils.LabelsFromStringList("os", "linux", "region", "asia"), utils.LabelsFromStringList("os", "mac", "region", "europe")} numberOfEvents := 10 @@ -1976,18 +1981,21 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllMetrics() { Key: tsdbtest.OptTimeSeries, Value: tsdbtest.TimeSeries{tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[0], + Labels: labels[0], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[1], + Labels: labels[1], Data: ingestedData}, tsdbtest.Metric{ Name: "diskio", - Labels: expectedLabels[2], + Labels: labels[2], Data: ingestedData}, }}) tsdbtest.InsertData(suite.T(), testParams) + expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "linux", "region", "asia", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "mac", "region", "europe", config.PrometheusMetricNameAttribute, "diskio")} querierV2, err := adapter.QuerierV2() suite.Require().NoError(err, "failed to create querier v2") @@ -2004,7 +2012,7 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllSpecificMetric() { adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil) suite.Require().NoError(err, "failed to create v3io adapter") - expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), + labels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), utils.LabelsFromStringList("os", "linux", "region", "asia"), utils.LabelsFromStringList("os", "mac", "region", "europe")} numberOfEvents := 10 @@ -2017,18 +2025,20 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllSpecificMetric() { Key: tsdbtest.OptTimeSeries, Value: tsdbtest.TimeSeries{tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[0], + Labels: labels[0], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[1], + Labels: labels[1], Data: ingestedData}, tsdbtest.Metric{ Name: "diskio", - Labels: expectedLabels[2], + Labels: labels[2], Data: ingestedData}, }}) tsdbtest.InsertData(suite.T(), testParams) + expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "linux", "region", "asia", config.PrometheusMetricNameAttribute, "cpu")} querierV2, err := adapter.QuerierV2() suite.Require().NoError(err, "failed to create querier v2") @@ -2038,14 +2048,14 @@ func (suite *getLabelSetsSuite) TestGetLabelsAllSpecificMetric() { suite.T().Fatalf("failed to get label sets, err:%v\n", err) } - suite.ElementsMatch(expectedLabels[:2], labelsList, "actual label sets does not match expected") + suite.ElementsMatch(expectedLabels, labelsList, "actual label sets does not match expected") } func (suite *getLabelSetsSuite) TestGetLabelsWithFilter() { adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil) suite.Require().NoError(err, "failed to create v3io adapter") - expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), + labels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe"), utils.LabelsFromStringList("os", "linux", "region", "asia"), utils.LabelsFromStringList("os", "mac", "region", "europe")} numberOfEvents := 10 @@ -2058,18 +2068,20 @@ func (suite *getLabelSetsSuite) TestGetLabelsWithFilter() { Key: tsdbtest.OptTimeSeries, Value: tsdbtest.TimeSeries{tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[0], + Labels: labels[0], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[1], + Labels: labels[1], Data: ingestedData}, tsdbtest.Metric{ Name: "cpu", - Labels: expectedLabels[2], + Labels: labels[2], Data: ingestedData}, }}) tsdbtest.InsertData(suite.T(), testParams) + expectedLabels := []utils.Labels{utils.LabelsFromStringList("os", "linux", "region", "europe", config.PrometheusMetricNameAttribute, "cpu"), + utils.LabelsFromStringList("os", "linux", "region", "asia", config.PrometheusMetricNameAttribute, "cpu")} querierV2, err := adapter.QuerierV2() suite.Require().NoError(err, "failed to create querier v2") @@ -2079,7 +2091,7 @@ func (suite *getLabelSetsSuite) TestGetLabelsWithFilter() { suite.T().Fatalf("failed to get label sets, err:%v\n", err) } - suite.ElementsMatch(expectedLabels[:2], labelsList, "actual label sets does not match expected") + suite.ElementsMatch(expectedLabels, labelsList, "actual label sets does not match expected") } func TestGetLabelSetsSuite(t *testing.T) { diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go index d33635ed..3d3f6d4e 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/select.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/nuclio/logger" "github.com/pkg/errors" @@ -45,12 +46,6 @@ type selectQueryContext struct { func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params *SelectParams) (*frameIterator, error) { queryCtx.dataFrames = make(map[uint64]*dataFrame) - // If step isn't passed (e.g., when using the console), the step is the - // difference between the end (maxt) and start (mint) times (e.g., 5 minutes) - if params.Functions != "" && params.Step == 0 { - params.Step = params.To - params.From - } - queryCtx.queryParams = params var err error queryCtx.columnsSpec, queryCtx.columnsSpecByMetric, err = queryCtx.createColumnSpecs() @@ -58,6 +53,12 @@ func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params * return nil, err } + // If step isn't passed (e.g., when using the console), the step is the + // difference between the end (maxt) and start (mint) times (e.g., 5 minutes) + if queryCtx.hasAtLeastOneFunction() && params.Step == 0 { + queryCtx.queryParams.Step = params.To - params.From + } + // We query every partition for every requested metric queries := make([]*partQuery, len(parts)*len(queryCtx.columnsSpecByMetric)) @@ -376,17 +377,28 @@ func (queryCtx *selectQueryContext) getOrCreateTimeColumn() Column { func (queryCtx *selectQueryContext) generateTimeColumn() Column { columnMeta := columnMeta{metric: "time"} - timeColumn := NewDataColumn("time", columnMeta, queryCtx.getResultBucketsSize(), IntType) + timeColumn := NewDataColumn("time", columnMeta, queryCtx.getResultBucketsSize(), TimeType) i := 0 for t := queryCtx.queryParams.From; t <= queryCtx.queryParams.To; t += queryCtx.queryParams.Step { - timeColumn.SetDataAt(i, t) + timeColumn.SetDataAt(i, time.Unix(t/1000, (t%1000)*1e6)) i++ } return timeColumn } func (queryCtx *selectQueryContext) isRawQuery() bool { - return (queryCtx.queryParams.Functions == "" && queryCtx.queryParams.Step == 0) || queryCtx.queryParams.disableClientAggr + return (!queryCtx.hasAtLeastOneFunction() && queryCtx.queryParams.Step == 0) || queryCtx.queryParams.disableClientAggr +} + +func (queryCtx *selectQueryContext) hasAtLeastOneFunction() bool { + atLeastOneFunction := false + for _, col := range queryCtx.columnsSpec { + if col.function != 0 { + atLeastOneFunction = true + break + } + } + return atLeastOneFunction } func (queryCtx *selectQueryContext) getResultBucketsSize() int { diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go index 78f8033b..b2ef2562 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/series.go @@ -2,6 +2,7 @@ package pquerier import ( "math" + "time" "github.com/v3io/v3io-tsdb/pkg/aggregate" "github.com/v3io/v3io-tsdb/pkg/chunkenc" @@ -73,7 +74,7 @@ func (it *dataFrameColumnSeriesIterator) At() (int64, float64) { if err != nil { it.err = err } - return t, v + return t.UnixNano() / int64(time.Millisecond), v } func (it *dataFrameColumnSeriesIterator) AtString() (int64, string) { @@ -85,7 +86,7 @@ func (it *dataFrameColumnSeriesIterator) AtString() (int64, string) { if err != nil { it.err = err } - return t, v + return t.UnixNano() / int64(time.Millisecond), v } func (it *dataFrameColumnSeriesIterator) Next() bool { diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go new file mode 100644 index 00000000..ab28b21c --- /dev/null +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser.go @@ -0,0 +1,109 @@ +package pquerier + +import ( + "fmt" + "strings" + + "github.com/xwb1989/sqlparser" +) + +const emptyTableName = "dual" + +// ParseQuery Parses an sql query into `tsdb.selectParams` +// Currently supported syntax: +// select - selecting multiple metrics, aggregations, interpolation functions and aliasing +// from - only one table +// where - equality, and range operators. Not supporting regex,`IS NULL`, etc.. +// group by +func ParseQuery(sql string) (*SelectParams, string, error) { + stmt, err := sqlparser.Parse(sql) + if err != nil { + return nil, "", err + } + slct, ok := stmt.(*sqlparser.Select) + if !ok { + return nil, "", fmt.Errorf("not a SELECT statement") + } + + fromTable, err := getTableName(slct) + if err != nil { + return nil, "", err + } + + selectParams := &SelectParams{} + var columns []RequestedColumn + + for _, sexpr := range slct.SelectExprs { + currCol := RequestedColumn{} + switch col := sexpr.(type) { + case *sqlparser.AliasedExpr: + if !col.As.IsEmpty() { + currCol.Alias = col.As.String() + } + + switch expr := col.Expr.(type) { + case *sqlparser.FuncExpr: + switch firstExpr := expr.Exprs[0].(type) { + case *sqlparser.AliasedExpr: + cc := firstExpr.Expr.(*sqlparser.ColName) + currCol.Function = sqlparser.String(expr.Name) + currCol.Interpolator = removeBackticks(sqlparser.String(cc.Qualifier.Name)) // Some of the interpolators are parsed with a ` + currCol.Metric = sqlparser.String(cc.Name) + case *sqlparser.StarExpr: + // Appending column with empty metric name, meaning a column template with the given aggregate + currCol.Function = sqlparser.String(expr.Name) + } + case *sqlparser.ColName: + currCol.Metric = sqlparser.String(expr.Name) + currCol.Interpolator = removeBackticks(sqlparser.String(expr.Qualifier.Name)) // Some of the interpolators are parsed with a ` + default: + return nil, "", fmt.Errorf("unknown columns type - %T", col.Expr) + } + columns = append(columns, currCol) + case *sqlparser.StarExpr: + // Appending empty column, meaning a column template for raw data + columns = append(columns, currCol) + default: + return nil, "", fmt.Errorf("unknown SELECT column type - %T", sexpr) + } + } + if len(columns) == 0 { + return nil, "", fmt.Errorf("no columns") + } + selectParams.RequestedColumns = columns + + if slct.Where != nil { + selectParams.Filter, _ = parseFilter(strings.TrimPrefix(sqlparser.String(slct.Where), " where ")) + } + if slct.GroupBy != nil { + selectParams.GroupBy = strings.TrimPrefix(sqlparser.String(slct.GroupBy), " group by ") + } + + return selectParams, fromTable, nil +} + +func getTableName(slct *sqlparser.Select) (string, error) { + if nTables := len(slct.From); nTables != 1 { + return "", fmt.Errorf("select from multiple tables is not supported (got %d)", nTables) + } + aliased, ok := slct.From[0].(*sqlparser.AliasedTableExpr) + if !ok { + return "", fmt.Errorf("not a table select") + } + table, ok := aliased.Expr.(sqlparser.TableName) + if !ok { + return "", fmt.Errorf("not a table in FROM field") + } + + tableStr := sqlparser.String(table) + if tableStr == emptyTableName { + return "", nil + } + return tableStr, nil +} +func parseFilter(originalFilter string) (string, error) { + return strings.Replace(originalFilter, " = ", " == ", -1), nil +} +func removeBackticks(origin string) string { + return strings.Replace(origin, "`", "", -1) +} diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go new file mode 100644 index 00000000..8b106ed5 --- /dev/null +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/sql_parser_test.go @@ -0,0 +1,62 @@ +// +build unit + +package pquerier + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseQuery(t *testing.T) { + testCases := []struct { + input string + output *SelectParams + outputTable string + }{ + {input: "select columnA, columnB", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA"}, {Metric: "columnB"}}}}, + + {input: "select linear.columnA", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Interpolator: "linear"}}}}, + + {input: "select max(prev.columnA), avg(columnB)", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Interpolator: "prev", Function: "max"}, + {Metric: "columnB", Function: "avg"}}}}, + + {input: "select columnA where columnB = 'tal' and columnC < 'Neiman'", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA"}}, Filter: "columnB == 'tal' and columnC < 'Neiman'"}}, + + {input: "select max(columnA) group by columnB", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Function: "max"}}, GroupBy: "columnB"}}, + + {input: "select min(columnA) as bambi, max(linear.columnB) as bimba where columnB >= 123 group by columnB,columnC ", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Function: "min", Alias: "bambi"}, + {Metric: "columnB", Function: "max", Interpolator: "linear", Alias: "bimba"}}, + Filter: "columnB >= 123", GroupBy: "columnB, columnC"}}, + + {input: "select min(columnA) from my_table where columnB >= 123", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "columnA", Function: "min"}}, + Filter: "columnB >= 123"}, + outputTable: "my_table"}, + + {input: "select * from my_table", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: ""}}}, + outputTable: "my_table"}, + + {input: "select max(*), avg(*) from my_table", + output: &SelectParams{RequestedColumns: []RequestedColumn{{Metric: "", Function: "max"}, {Metric: "", Function: "avg"}}}, + outputTable: "my_table"}, + } + for _, test := range testCases { + t.Run(test.input, func(tt *testing.T) { + queryParams, table, err := ParseQuery(test.input) + if err != nil { + tt.Fatal(err) + } + + assert.Equal(tt, test.output, queryParams) + assert.Equal(tt, test.outputTable, table) + }) + } +} diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/types.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/types.go index 01c99314..5b1ad9d7 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/types.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/pquerier/types.go @@ -66,6 +66,9 @@ func (c *columnMeta) isWildcard() bool { return c.metric == "*" } // Concrete Column = has real data behind it, Virtual column = described as a function on top of concrete columns func (c columnMeta) isConcrete() bool { return c.function == 0 || aggregate.IsRawAggregate(c.function) } func (c columnMeta) getColumnName() string { + if c.alias != "" { + return c.alias + } // If no aggregations are requested (raw down sampled data) if c.function == 0 { return c.metric diff --git a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/query.go b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/query.go index c110fbcc..495e2f51 100644 --- a/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/query.go +++ b/functions/query/vendor/github.com/v3io/v3io-tsdb/pkg/tsdbctl/query.go @@ -113,6 +113,7 @@ Arguments: "Aggregation interval for applying the aggregation functions\n(if set - see the -a|--aggregates flag), of the format\n\"[0-9]+[mhd]\" (where 'm' = minutes, 'h' = hours, and\n'd' = days). Examples: \"1h\"; \"150m\". (default =\n - )") cmd.Flags().StringVar(&commandeer.groupBy, "groupBy", "", "Comma separated list of labels to group the result by") + cmd.Flags().BoolVarP(&commandeer.oldQuerier, "oldQuerier", "q", false, "use old querier") cmd.Flags().Lookup("oldQuerier").Hidden = true cmd.Flags().Lookup("windows").Hidden = true // hidden, because only supported in old querier. @@ -186,8 +187,20 @@ func (qc *queryCommandeer) newQuery(from, to, step int64) error { return errors.Wrap(err, "Failed to initialize the Querier object.") } - selectParams := &pquerier.SelectParams{Name: qc.name, Functions: qc.functions, - Step: step, Filter: qc.filter, From: from, To: to, GroupBy: qc.groupBy} + var selectParams *pquerier.SelectParams + + if strings.HasPrefix(qc.name, "select") { + selectParams, _, err = pquerier.ParseQuery(qc.name) + if err != nil { + return errors.Wrap(err, "failed to parse sql") + } + selectParams.Step = step + selectParams.From = from + selectParams.To = to + } else { + selectParams = &pquerier.SelectParams{Name: qc.name, Functions: qc.functions, + Step: step, Filter: qc.filter, From: from, To: to, GroupBy: qc.groupBy} + } set, err := qry.Select(selectParams) if err != nil {