Skip to content

Commit

Permalink
fix: panic when explaining native query and add NaN, Inf format setti…
Browse files Browse the repository at this point in the history
…ngs (#24)
  • Loading branch information
hgiasac authored Oct 12, 2024
1 parent 7b6a7bd commit 7a19117
Show file tree
Hide file tree
Showing 19 changed files with 387 additions and 83 deletions.
27 changes: 23 additions & 4 deletions configuration/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ func (uc *updateCommand) SetMetadataMetric(key string, info metadata.MetricInfo)
uc.existedMetrics[key] = true
}

// MetricExists check if the metric exists
func (uc *updateCommand) MetricExists(key string) bool {
uc.lock.Lock()
defer uc.lock.Unlock()
_, ok := uc.existedMetrics[key]
return ok
}

// SetMetricExists marks if the metric as existent
func (uc *updateCommand) SetMetricExists(key string) {
uc.lock.Lock()
defer uc.lock.Unlock()
uc.existedMetrics[key] = true
}

func introspectSchema(ctx context.Context, args *UpdateArguments) error {
start := time.Now()
slog.Info("introspecting metadata", slog.String("dir", args.Dir))
Expand Down Expand Up @@ -184,13 +199,14 @@ func (uc *updateCommand) introspectMetric(ctx context.Context, key string, infos
continue
}

if _, ok := uc.existedMetrics[key]; ok {
if uc.MetricExists(key) {
slog.Warn(fmt.Sprintf("metric %s exists", key))
}

switch metricType {
case model.MetricTypeGauge, model.MetricTypeGaugeHistogram:
for _, suffix := range []string{"sum", "bucket", "count"} {
uc.existedMetrics[fmt.Sprintf("%s_%s", key, suffix)] = true
uc.SetMetricExists(fmt.Sprintf("%s_%s", key, suffix))
}
}

Expand Down Expand Up @@ -350,8 +366,11 @@ var defaultConfiguration = metadata.Configuration{
UnixTimeUnit: client.UnixTimeSecond,
ConcurrencyLimit: 5,
Format: metadata.RuntimeFormatSettings{
Timestamp: metadata.TimestampUnix,
Value: metadata.ValueFloat64,
Timestamp: metadata.TimestampUnix,
Value: metadata.ValueFloat64,
NaN: "NaN",
Inf: "+Inf",
NegativeInf: "-Inf",
},
},
}
Expand Down
3 changes: 3 additions & 0 deletions connector-definition/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ runtime:
format:
timestamp: unix
value: float64
nan: "NaN"
inf: "+Inf"
negative_inf: "-Inf"
47 changes: 39 additions & 8 deletions connector/api/decimal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"encoding/json"
"fmt"
"math"

"github.com/hasura/ndc-sdk-go/utils"
)
Expand All @@ -16,7 +17,7 @@ type Decimal struct {
raw *string
}

// NewDecimal creates a BigDecimal instance
// NewDecimal creates a Decimal instance
func NewDecimal[T comparable](value T) (Decimal, error) {
result := Decimal{}
if err := result.FromValue(value); err != nil {
Expand All @@ -25,25 +26,55 @@ func NewDecimal[T comparable](value T) (Decimal, error) {
return result, nil
}

// NewDecimalValue creates a Decimal instance from a number value
func NewDecimalValue[T int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | float32 | float64](value T) Decimal {
v := float64(value)
return Decimal{value: &v}
}

// ScalarName get the schema name of the scalar
func (bd Decimal) IsNil() bool {
return bd.raw == nil
}

// Value returns the decimal value
func (bd Decimal) Value() any {
if bd.value == nil {
if bd.raw != nil {
return *bd.raw
}
return nil
}

if math.IsNaN(*bd.value) {
return "NaN"
}
if *bd.value > 0 && math.IsInf(*bd.value, 1) {
return "+Inf"
}
if *bd.value < 0 && math.IsInf(*bd.value, -1) {
return "-Inf"
}

return *bd.value
}

// Stringer implements fmt.Stringer interface.
func (bd Decimal) String() string {
if bd.raw != nil {
return *bd.raw
}
if bd.value != nil {
return fmt.Sprint(*bd.value)
v := bd.Value()
if v == nil {
return "NaN"
}
return "Inf"
return fmt.Sprint(v)
}

// MarshalJSON implements json.Marshaler.
func (bi Decimal) MarshalJSON() ([]byte, error) {
return json.Marshal(bi.String())
v := bi.Value()
if v != nil {
v = fmt.Sprint(v)
}
return json.Marshal(v)
}

// UnmarshalJSON implements json.Unmarshaler.
Expand Down
5 changes: 2 additions & 3 deletions connector/api/decimal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import (
"encoding/json"
"testing"

"github.com/hasura/ndc-sdk-go/utils"
"gotest.tools/v3/assert"
)

func TestDecimal(t *testing.T) {
assert.Assert(t, Decimal{}.IsNil())
assert.Equal(t, Decimal{}.String(), "Inf")
assert.Equal(t, Decimal{value: utils.ToPtr(1.2)}.String(), "1.2")
assert.Equal(t, Decimal{}.String(), "NaN")
assert.Equal(t, NewDecimalValue(1.2).String(), "1.2")

_, err := NewDecimal("foo")
assert.ErrorContains(t, err, "failed to convert Float, got: foo")
Expand Down
13 changes: 11 additions & 2 deletions connector/internal/native_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hasura/ndc-sdk-go/schema"
"github.com/hasura/ndc-sdk-go/utils"
"github.com/prometheus/common/model"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -138,12 +139,16 @@ func (nqe *NativeQueryExecutor) queryInstant(ctx context.Context, queryString st
}

span := trace.SpanFromContext(ctx)
span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("expression", params.Expression)))
span.AddEvent("post_filter", trace.WithAttributes(
utils.JSONAttribute("expression", params.Expression),
attribute.Int("pre_filter_count", len(vector)),
))
vector, err = nqe.filterVectorResults(vector, params.Expression)
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
}

span.AddEvent("post_filter_results", trace.WithAttributes(attribute.Int("post_filter_count", len(vector))))
sortVector(vector, params.OrderBy)
vector = paginateVector(vector, nqe.Request.Query)
results := createQueryResultsFromVector(vector, nqe.NativeQuery.Labels, nqe.Runtime, flat)
Expand All @@ -158,12 +163,16 @@ func (nqe *NativeQueryExecutor) queryRange(ctx context.Context, queryString stri
}

span := trace.SpanFromContext(ctx)
span.AddEvent("post_filter_results", trace.WithAttributes(utils.JSONAttribute("expression", params.Expression)))
span.AddEvent("post_filter", trace.WithAttributes(
utils.JSONAttribute("expression", params.Expression),
attribute.Int("pre_filter_count", len(matrix)),
))
matrix, err = nqe.filterMatrixResults(matrix, params)
if err != nil {
return nil, schema.UnprocessableContentError(err.Error(), nil)
}

span.AddEvent("post_filter_results", trace.WithAttributes(attribute.Int("post_filter_count", len(matrix))))
sortMatrix(matrix, params.OrderBy)
results := createQueryResultsFromMatrix(matrix, nqe.NativeQuery.Labels, nqe.Runtime, flat)

Expand Down
4 changes: 2 additions & 2 deletions connector/internal/native_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func TestFilterVectorResults(t *testing.T) {
schema.NewExpressionBinaryComparisonOperator(
*schema.NewComparisonTargetColumn("job", []string{}, []schema.PathElement{}),
"_in",
schema.NewComparisonValueScalar([]string{"foo"}),
schema.NewComparisonValueScalar([]string{"ndc-prometheus"}),
),
).Encode(),
Expected: model.Vector{},
Expected: vectorFixtures,
},
{
Name: "nin",
Expand Down
21 changes: 16 additions & 5 deletions connector/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"encoding/json"
"fmt"
"math"
"slices"
"time"

Expand All @@ -17,7 +18,7 @@ func createQueryResultsFromVector(vector model.Vector, labels map[string]metadat
results := make([]map[string]any, len(vector))
for i, item := range vector {
ts := formatTimestamp(item.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
value := formatValue(item.Value, runtime.Format.Value)
value := formatValue(item.Value, runtime.Format)
r := map[string]any{
metadata.TimestampKey: ts,
metadata.ValueKey: value,
Expand Down Expand Up @@ -65,7 +66,7 @@ func createGroupQueryResultsFromMatrix(matrix model.Matrix, labels map[string]me
values := make([]map[string]any, valuesLen)
for i, value := range item.Values {
ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
v := formatValue(value.Value, runtime.Format.Value)
v := formatValue(value.Value, runtime.Format)
values[i] = map[string]any{
metadata.TimestampKey: ts,
metadata.ValueKey: v,
Expand All @@ -89,7 +90,7 @@ func createFlatQueryResultsFromMatrix(matrix model.Matrix, labels map[string]met
for _, item := range matrix {
for _, value := range item.Values {
ts := formatTimestamp(value.Timestamp, runtime.Format.Timestamp, runtime.UnixTimeUnit)
v := formatValue(value.Value, runtime.Format.Value)
v := formatValue(value.Value, runtime.Format)
r := map[string]any{
metadata.LabelsKey: item.Metric,
metadata.TimestampKey: ts,
Expand Down Expand Up @@ -117,9 +118,19 @@ func formatTimestamp(ts model.Time, format metadata.TimestampFormat, unixTime cl
}
}

func formatValue(value model.SampleValue, format metadata.ValueFormat) any {
switch format {
func formatValue(value model.SampleValue, format metadata.RuntimeFormatSettings) any {
switch format.Value {
case metadata.ValueFloat64:
if math.IsNaN(float64(value)) {
return format.NaN
}
if value > 0 && math.IsInf(float64(value), 1) {
return format.Inf
}
if value < 0 && math.IsInf(float64(value), -1) {
return format.NegativeInf
}

return float64(value)
default:
return value.String()
Expand Down
6 changes: 6 additions & 0 deletions connector/metadata/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ type RuntimeFormatSettings struct {
Timestamp TimestampFormat `json:"timestamp" yaml:"timestamp" jsonschema:"enum=rfc3339,enum=unix,default=unix"`
// The serialization format for value
Value ValueFormat `json:"value" yaml:"value" jsonschema:"enum=string,enum=float64,default=string"`
// The serialization format for not-a-number values
NaN any `json:"nan" yaml:"nan" jsonschema:"oneof_type=string;number"`
// The serialization format for infinite values
Inf any `json:"inf" yaml:"inf" jsonschema:"oneof_type=string;number"`
// The serialization format for negative infinite values
NegativeInf any `json:"negative_inf" yaml:"negative_inf" jsonschema:"oneof_type=string;number"`
}

// RuntimeSettings contain settings for the runtime engine
Expand Down
13 changes: 12 additions & 1 deletion connector/metadata/native_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metadata
import (
"fmt"
"regexp"
"slices"
"strings"

"github.com/hasura/ndc-sdk-go/schema"
Expand All @@ -12,6 +13,7 @@ import (

// The variable syntax for native queries is ${<name>} which is compatible with Grafana
var promQLVariableRegex = regexp.MustCompile(`\${(\w+)}`)
var allowedNativeQueryScalars = []ScalarName{ScalarString, ScalarDuration, ScalarInt64, ScalarFloat64}

// NativeOperations the list of native query and mutation definitions
type NativeOperations struct {
Expand Down Expand Up @@ -59,9 +61,18 @@ func (scb *connectorSchemaBuilder) buildNativeQuery(name string, query *NativeQu
if _, ok := arguments[key]; ok {
return fmt.Errorf("argument `%s` is already used by the function", key)
}
scalarName := arg.Type
if arg.Type != "" {
if !slices.Contains(allowedNativeQueryScalars, ScalarName(arg.Type)) {
return fmt.Errorf("%s: unsupported native query argument type %s; argument: %s ", name, scalarName, key)
}
} else {
scalarName = string(ScalarString)
}

arguments[key] = schema.ArgumentInfo{
Description: arg.Description,
Type: schema.NewNamedType(string(ScalarString)).Encode(),
Type: schema.NewNamedType(scalarName).Encode(),
}
}

Expand Down
3 changes: 3 additions & 0 deletions connector/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (c *PrometheusConnector) QueryExplain(ctx context.Context, conf *metadata.C
Client: state.Client,
Request: request,
Arguments: arguments,
Runtime: c.runtime,
}
_, queryString, err := executor.Explain(ctx)
if err != nil {
Expand All @@ -215,6 +216,7 @@ func (c *PrometheusConnector) QueryExplain(ctx context.Context, conf *metadata.C
Request: request,
NativeQuery: &nativeQuery,
Arguments: arguments,
Runtime: c.runtime,
}
_, queryString, err := executor.Explain(ctx)
if err != nil {
Expand Down Expand Up @@ -245,6 +247,7 @@ func (c *PrometheusConnector) QueryExplain(ctx context.Context, conf *metadata.C
Metric: collection,
Variables: requestVars[0],
Arguments: arguments,
Runtime: c.runtime,
}

_, queryString, _, err := executor.Explain(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
"collection": "process_cpu_seconds_total",
"query": {
"fields": {
"job": { "type": "column", "column": "job", "fields": null },
"value": { "type": "column", "column": "value", "fields": null },
"timestamp": { "type": "column", "column": "timestamp", "fields": null }
"job": { "type": "column", "column": "job", "fields": null }
},
"limit": 1,
"order_by": {
Expand Down
16 changes: 16 additions & 0 deletions connector/testdata/query/prometheus_alertmanagers/expected.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[
{
"rows": [
{
"__value": {
"activeAlertManagers": [
{
"url": "http://alertmanager:9093/api/v2/alerts"
}
],
"droppedAlertManagers": []
}
}
]
}
]
6 changes: 1 addition & 5 deletions connector/testdata/query/prometheus_alerts/request.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
"state": {
"column": "state",
"type": "column"
},
"value": {
"column": "value",
"type": "column"
}
},
"type": "object"
Expand All @@ -38,4 +34,4 @@
}
}
}
}
}
Loading

0 comments on commit 7a19117

Please sign in to comment.