diff --git a/banyand/measure/aggregate/aggregate_function.go b/banyand/measure/aggregate/aggregate_function.go index 69427c94f..6adbaa14d 100644 --- a/banyand/measure/aggregate/aggregate_function.go +++ b/banyand/measure/aggregate/aggregate_function.go @@ -62,8 +62,18 @@ func NewFunction[A, B Input, R Output](kind modelv1.MeasureAggregate) (Function[ switch kind { case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN: function = &Min[A, B, R]{minimum: maxValue[R]()} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX: + function = &Max[A, B, R]{maximum: minValue[R]()} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT: + function = &Count[A, B, R]{count: 0} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM: + function = &Sum[A, B, R]{summation: zeroValue[R]()} case modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG: function = &Avg[A, B, R]{summation: zeroValue[R](), count: 0} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_PERCENT: + function = &Percent[A, B, R]{total: 0, match: 0} + case modelv1.MeasureAggregate_MEASURE_AGGREGATE_RATE: + function = &Rate[A, B, R]{denominator: 0, numerator: 0} default: return nil, fmt.Errorf("MeasureAggregate unknown") } @@ -76,6 +86,20 @@ func zeroValue[R Output]() R { return r } +func minValue[R Output]() (r R) { + switch a := any(&r).(type) { + case *int64: + *a = math.MinInt64 + case *float64: + *a = -math.MaxFloat64 + case *string: + *a = "" + default: + panic("unreachable") + } + return +} + func maxValue[R Output]() (r R) { switch a := any(&r).(type) { case *int64: diff --git a/banyand/measure/aggregate/avg.go b/banyand/measure/aggregate/avg.go index c2fccf0cd..7abc686da 100644 --- a/banyand/measure/aggregate/avg.go +++ b/banyand/measure/aggregate/avg.go @@ -24,14 +24,14 @@ type Avg[A, B Input, R Output] struct { } // Combine takes elements to do the aggregation. -// Avg uses type parameter A and B. -func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error { +// Avg uses type parameter A. +func (f *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error { for _, arg0 := range arguments.arg0 { switch arg0 := any(arg0).(type) { case int64: - a.summation += R(arg0) + f.summation += R(arg0) case float64: - a.summation += R(arg0) + f.summation += R(arg0) default: return errFieldValueType } @@ -40,7 +40,7 @@ func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error { for _, arg1 := range arguments.arg1 { switch arg1 := any(arg1).(type) { case int64: - a.count += arg1 + f.count += arg1 default: return errFieldValueType } @@ -50,14 +50,14 @@ func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error { } // Result gives the result for the aggregation. -func (a *Avg[A, B, R]) Result() R { +func (f *Avg[A, B, R]) Result() R { // In unusual situations it returns the zero value. - if a.count == 0 { + if f.count == 0 { return zeroValue[R]() } // According to the semantics of GoLang, the division of one int by another int - // returns an int, instead of a float. - return a.summation / R(a.count) + // returns an int, instead of f float. + return f.summation / R(f.count) } // NewAvgArguments constructs arguments. diff --git a/banyand/measure/aggregate/count.go b/banyand/measure/aggregate/count.go new file mode 100644 index 000000000..0b2daccd4 --- /dev/null +++ b/banyand/measure/aggregate/count.go @@ -0,0 +1,50 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Count calculates the count value of elements. +type Count[A, B Input, R Output] struct { + count int64 +} + +// Combine takes elements to do the aggregation. +// Count uses none of type parameters. +func (f *Count[A, B, R]) Combine(arguments Arguments[A, B]) error { + for _, arg0 := range arguments.arg0 { + switch arg0 := any(arg0).(type) { + case int64: + f.count += arg0 + default: + return errFieldValueType + } + } + return nil +} + +// Result gives the result for the aggregation. +func (f *Count[A, B, R]) Result() R { + return R(f.count) +} + +// NewCountArguments constructs arguments. +func NewCountArguments(a []int64) Arguments[int64, Void] { + return Arguments[int64, Void]{ + arg0: a, + arg1: nil, + } +} diff --git a/banyand/measure/aggregate/count_test.go b/banyand/measure/aggregate/count_test.go new file mode 100644 index 000000000..db6a0b30c --- /dev/null +++ b/banyand/measure/aggregate/count_test.go @@ -0,0 +1,39 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestCount(t *testing.T) { + var err error + + // case1: input int64 values + countInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT) + err = countInt64.Combine(aggregate.NewCountArguments( + []int64{1, 2, 3}, // mock the "count" column + )) + assert.NoError(t, err) + assert.Equal(t, int64(6), countInt64.Result()) +} diff --git a/banyand/measure/aggregate/max.go b/banyand/measure/aggregate/max.go new file mode 100644 index 000000000..5879435af --- /dev/null +++ b/banyand/measure/aggregate/max.go @@ -0,0 +1,56 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Max calculates the maximum value of elements. +type Max[A, B Input, R Output] struct { + maximum R +} + +// Combine takes elements to do the aggregation. +// Max uses type parameter A. +func (f *Max[A, B, R]) Combine(arguments Arguments[A, B]) error { + for _, arg0 := range arguments.arg0 { + switch arg0 := any(arg0).(type) { + case int64: + if R(arg0) > f.maximum { + f.maximum = R(arg0) + } + case float64: + if R(arg0) > f.maximum { + f.maximum = R(arg0) + } + default: + return errFieldValueType + } + } + return nil +} + +// Result gives the result for the aggregation. +func (f *Max[A, B, R]) Result() R { + return f.maximum +} + +// NewMaxArguments constructs arguments. +func NewMaxArguments[A Input](a []A) Arguments[A, Void] { + return Arguments[A, Void]{ + arg0: a, + arg1: nil, + } +} diff --git a/banyand/measure/aggregate/max_test.go b/banyand/measure/aggregate/max_test.go new file mode 100644 index 000000000..361877174 --- /dev/null +++ b/banyand/measure/aggregate/max_test.go @@ -0,0 +1,47 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestMax(t *testing.T) { + var err error + + // case1: input int64 values + maxInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX) + err = maxInt64.Combine(aggregate.NewMaxArguments[int64]( + []int64{1, 2, 3}, // mock the "maximum" column + )) + assert.NoError(t, err) + assert.Equal(t, int64(3), maxInt64.Result()) + + // case2: input float64 values + maxFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX) + err = maxFloat64.Combine(aggregate.NewMaxArguments[float64]( + []float64{1.0, 2.0, 3.0}, // mock the "maximum" column + )) + assert.NoError(t, err) + assert.Equal(t, 3.0, maxFloat64.Result()) +} diff --git a/banyand/measure/aggregate/min.go b/banyand/measure/aggregate/min.go index 2caacd471..ab4576345 100644 --- a/banyand/measure/aggregate/min.go +++ b/banyand/measure/aggregate/min.go @@ -24,16 +24,16 @@ type Min[A, B Input, R Output] struct { // Combine takes elements to do the aggregation. // Min uses type parameter A. -func (m *Min[A, B, R]) Combine(arguments Arguments[A, B]) error { +func (f *Min[A, B, R]) Combine(arguments Arguments[A, B]) error { for _, arg0 := range arguments.arg0 { switch arg0 := any(arg0).(type) { case int64: - if R(arg0) < m.minimum { - m.minimum = R(arg0) + if R(arg0) < f.minimum { + f.minimum = R(arg0) } case float64: - if R(arg0) < m.minimum { - m.minimum = R(arg0) + if R(arg0) < f.minimum { + f.minimum = R(arg0) } default: return errFieldValueType @@ -43,8 +43,8 @@ func (m *Min[A, B, R]) Combine(arguments Arguments[A, B]) error { } // Result gives the result for the aggregation. -func (m *Min[A, B, R]) Result() R { - return m.minimum +func (f *Min[A, B, R]) Result() R { + return f.minimum } // NewMinArguments constructs arguments. diff --git a/banyand/measure/aggregate/percent.go b/banyand/measure/aggregate/percent.go new file mode 100644 index 000000000..4dd425937 --- /dev/null +++ b/banyand/measure/aggregate/percent.go @@ -0,0 +1,67 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Percent calculates the average value of elements. +type Percent[A, B Input, R Output] struct { + total int64 + match int64 +} + +// Combine takes elements to do the aggregation. +// Percent uses none of type parameters. +func (f *Percent[A, B, R]) Combine(arguments Arguments[A, B]) error { + for _, arg0 := range arguments.arg0 { + switch arg0 := any(arg0).(type) { + case int64: + f.total += arg0 + default: + return errFieldValueType + } + } + + for _, arg1 := range arguments.arg1 { + switch arg1 := any(arg1).(type) { + case int64: + f.match += arg1 + default: + return errFieldValueType + } + } + + return nil +} + +// Result gives the result for the aggregation. +func (f *Percent[A, B, R]) Result() R { + // In unusual situations it returns the zero value. + if f.total == 0 { + return zeroValue[R]() + } + // Factory 10000 is used to improve accuracy. This factory is same as OAP. + // For example, "10 percent" will return 1000. + return R(f.match) * 10000 / R(f.total) +} + +// NewPercentArguments constructs arguments. +func NewPercentArguments(a []int64, b []int64) Arguments[int64, int64] { + return Arguments[int64, int64]{ + arg0: a, + arg1: b, + } +} diff --git a/banyand/measure/aggregate/percent_test.go b/banyand/measure/aggregate/percent_test.go new file mode 100644 index 000000000..eb24a9b67 --- /dev/null +++ b/banyand/measure/aggregate/percent_test.go @@ -0,0 +1,40 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestPercent(t *testing.T) { + var err error + + // case1: input int64 values + PercentInt64, _ := aggregate.NewFunction[int64, int64, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_PERCENT) + err = PercentInt64.Combine(aggregate.NewPercentArguments( + []int64{10, 100, 1000}, // mock the "total" column + []int64{1, 10, 100}, // mock the "match" column + )) + assert.NoError(t, err) + assert.Equal(t, int64(1000), PercentInt64.Result()) +} diff --git a/banyand/measure/aggregate/rate.go b/banyand/measure/aggregate/rate.go new file mode 100644 index 000000000..a0a0dc9a3 --- /dev/null +++ b/banyand/measure/aggregate/rate.go @@ -0,0 +1,66 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Rate calculates the average value of elements. +type Rate[A, B Input, R Output] struct { + denominator int64 + numerator int64 +} + +// Combine takes elements to do the aggregation. +// Rate uses none of type parameters. +func (f *Rate[A, B, R]) Combine(arguments Arguments[A, B]) error { + for _, arg0 := range arguments.arg0 { + switch arg0 := any(arg0).(type) { + case int64: + f.denominator += arg0 + default: + return errFieldValueType + } + } + + for _, arg1 := range arguments.arg1 { + switch arg1 := any(arg1).(type) { + case int64: + f.numerator += arg1 + default: + return errFieldValueType + } + } + + return nil +} + +// Result gives the result for the aggregation. +func (f *Rate[A, B, R]) Result() R { + // In unusual situations it returns the zero value. + if f.denominator == 0 { + return zeroValue[R]() + } + // Factory 10000 is used to improve accuracy. This factory is same as OAP. + return R(f.numerator) * 10000 / R(f.denominator) +} + +// NewRateArguments constructs arguments. +func NewRateArguments(a []int64, b []int64) Arguments[int64, int64] { + return Arguments[int64, int64]{ + arg0: a, + arg1: b, + } +} diff --git a/banyand/measure/aggregate/rate_test.go b/banyand/measure/aggregate/rate_test.go new file mode 100644 index 000000000..e03989f88 --- /dev/null +++ b/banyand/measure/aggregate/rate_test.go @@ -0,0 +1,40 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestRate(t *testing.T) { + var err error + + // case1: input int64 values + rateInt64, _ := aggregate.NewFunction[int64, int64, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_RATE) + err = rateInt64.Combine(aggregate.NewRateArguments( + []int64{10, 100, 1000}, // mock the "denominator" column + []int64{1, 10, 100}, // mock the "numerator" column + )) + assert.NoError(t, err) + assert.Equal(t, int64(1000), rateInt64.Result()) +} diff --git a/banyand/measure/aggregate/sum.go b/banyand/measure/aggregate/sum.go new file mode 100644 index 000000000..96e418e95 --- /dev/null +++ b/banyand/measure/aggregate/sum.go @@ -0,0 +1,53 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate + +// Sum calculates the summation value of elements. +type Sum[A, B Input, R Output] struct { + summation R +} + +// Combine takes elements to do the aggregation. +// Sum uses type parameter A. +func (f *Sum[A, B, R]) Combine(arguments Arguments[A, B]) error { + for _, arg0 := range arguments.arg0 { + switch arg0 := any(arg0).(type) { + case int64: + f.summation += R(arg0) + case float64: + f.summation += R(arg0) + default: + return errFieldValueType + } + } + + return nil +} + +// Result gives the result for the aggregation. +func (f *Sum[A, B, R]) Result() R { + return f.summation +} + +// NewSumArguments constructs arguments. +func NewSumArguments[A Input](a []A) Arguments[A, Void] { + return Arguments[A, Void]{ + arg0: a, + arg1: nil, + } +} diff --git a/banyand/measure/aggregate/sum_test.go b/banyand/measure/aggregate/sum_test.go new file mode 100644 index 000000000..ff47e1443 --- /dev/null +++ b/banyand/measure/aggregate/sum_test.go @@ -0,0 +1,47 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package aggregate_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/measure/aggregate" +) + +func TestSum(t *testing.T) { + var err error + + // case1: input int64 values + sumInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM) + err = sumInt64.Combine(aggregate.NewSumArguments[int64]( + []int64{1, 2, 3}, // mock the "summation" column + )) + assert.NoError(t, err) + assert.Equal(t, int64(6), sumInt64.Result()) + + // case2: input float64 values + sumFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM) + err = sumFloat64.Combine(aggregate.NewSumArguments[float64]( + []float64{1.0, 2.0, 3.0}, // mock the "summation" column + )) + assert.NoError(t, err) + assert.Equal(t, 6.0, sumFloat64.Result()) +}