Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev measure aggregate function #528

Merged
merged 40 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
00b2bdb
[API] MeasureAggregateFunctionService Support
StLeoX Jun 28, 2024
38aec7f
fix check
StLeoX Jun 29, 2024
4c94a7c
Merge branch 'main' into dev_measure_aggregate_function
wu-sheng Jun 29, 2024
ad3b14a
use MeasureAggregate enum
StLeoX Jul 31, 2024
d456b4f
Merge branch 'main' into dev_measure_aggregate_function
StLeoX Aug 1, 2024
acfe475
Merge branch 'main' into dev_measure_aggregate_function
hanahmily Aug 13, 2024
e05a97b
add PERCENTILE enum for MEASURE_AGGREGATE
StLeoX Aug 15, 2024
e1b7e98
add comments for MeasureAggregate enum
StLeoX Aug 16, 2024
b94f21b
Merge branch 'main' into dev_measure_aggregate_function
StLeoX Aug 17, 2024
7c9d27f
fix enum name error in FieldSpec
StLeoX Aug 22, 2024
11ccee1
a
StLeoX Aug 22, 2024
cfc3811
fix wrong enum name in FieldSpec
StLeoX Aug 22, 2024
31763db
Supports measure aggregate function avg and min, and test cases.
StLeoX Aug 25, 2024
13c1180
Merge remote-tracking branch 'origin/dev_measure_aggregate_function' …
StLeoX Aug 25, 2024
98076ff
Revert "a"
StLeoX Aug 25, 2024
515b3f9
Merge branch 'main' into dev_measure_aggregate_function
StLeoX Aug 25, 2024
deefc6e
fix Merge error
StLeoX Aug 25, 2024
420c4ef
remove struct MAFArgument[T]
StLeoX Aug 26, 2024
ac23ca7
rename Function
StLeoX Aug 29, 2024
2b5ca52
simplify Input and Arguments
StLeoX Aug 30, 2024
f6b6fe6
add NewArguments
StLeoX Aug 30, 2024
1098f25
supports `Max`
StLeoX Sep 3, 2024
aa722cd
supports `Count`
StLeoX Sep 3, 2024
337e2fc
supports `Sum`
StLeoX Sep 3, 2024
7de26d8
supports `Percent`
StLeoX Sep 3, 2024
fde584c
supports `Rate`
StLeoX Sep 3, 2024
792dc1b
add comments
StLeoX Sep 3, 2024
61eb634
Merge branch 'main' into dev_measure_aggregate_function
hanahmily Sep 3, 2024
6fa5573
Merge remote-tracking branch 'origin/dev_measure_aggregate_function' …
StLeoX Sep 5, 2024
806b6d9
fix unit test
StLeoX Sep 5, 2024
be89878
Merge branch 'main' into dev_measure_aggregate_function
StLeoX Sep 5, 2024
b3127eb
removes dynamic type assertion
StLeoX Sep 9, 2024
ad6416f
add to Function interface: FirstCumulation and SecondCumulation
StLeoX Sep 10, 2024
07b537d
Revert "add to Function interface: FirstCumulation and SecondCumulation"
StLeoX Sep 12, 2024
93ca015
change `Result() R` to `Result() (A, B, R)`
StLeoX Sep 12, 2024
89f07d5
introduce step-4 loop unrolling for sum-like aggregate functions
StLeoX Sep 18, 2024
d38a113
Merge branch 'main' into dev_measure_aggregate_function
wu-sheng Sep 21, 2024
312f4d1
Merge branch 'main' into dev_measure_aggregate_function
StLeoX Oct 10, 2024
9ab508b
Merge branch 'main' into dev_measure_aggregate_function
hanahmily Oct 12, 2024
645c4f3
fix lint
StLeoX Oct 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions banyand/measure/aggregate/aggregate_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,18 @@ func NewFunction[A, B Input, R Output](kind modelv1.MeasureAggregate) (Function[
switch kind {
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN:
function = &Min[A, B, R]{minimum: maxValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX:
function = &Max[A, B, R]{maximum: minValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT:
function = &Count[A, B, R]{count: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM:
function = &Sum[A, B, R]{summation: zeroValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG:
function = &Avg[A, B, R]{summation: zeroValue[R](), count: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_PERCENT:
function = &Percent[A, B, R]{total: 0, match: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_RATE:
function = &Rate[A, B, R]{denominator: 0, numerator: 0}
default:
return nil, fmt.Errorf("MeasureAggregate unknown")
}
Expand All @@ -76,6 +86,20 @@ func zeroValue[R Output]() R {
return r
}

func minValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
*a = math.MinInt64
case *float64:
*a = -math.MaxFloat64
case *string:
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
*a = ""
default:
panic("unreachable")
}
return
}

func maxValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
Expand Down
18 changes: 9 additions & 9 deletions banyand/measure/aggregate/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ type Avg[A, B Input, R Output] struct {
}

// Combine takes elements to do the aggregation.
// Avg uses type parameter A and B.
func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
// Avg uses type parameter A.
func (f *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
case int64:
a.summation += R(arg0)
f.summation += R(arg0)
case float64:
a.summation += R(arg0)
f.summation += R(arg0)
default:
return errFieldValueType
}
Expand All @@ -40,7 +40,7 @@ func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg1 := range arguments.arg1 {
switch arg1 := any(arg1).(type) {
case int64:
a.count += arg1
f.count += arg1
default:
return errFieldValueType
}
Expand All @@ -50,14 +50,14 @@ func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
}

// Result gives the result for the aggregation.
func (a *Avg[A, B, R]) Result() R {
func (f *Avg[A, B, R]) Result() R {
// In unusual situations it returns the zero value.
if a.count == 0 {
if f.count == 0 {
return zeroValue[R]()
}
// According to the semantics of GoLang, the division of one int by another int
// returns an int, instead of a float.
return a.summation / R(a.count)
// returns an int, instead of f float.
return f.summation / R(f.count)
}

// NewAvgArguments constructs arguments.
Expand Down
50 changes: 50 additions & 0 deletions banyand/measure/aggregate/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Count calculates the count value of elements.
type Count[A, B Input, R Output] struct {
count int64
}

// Combine takes elements to do the aggregation.
// Count uses none of type parameters.
func (f *Count[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
f.count += arg0
default:
return errFieldValueType
}
}
return nil
}

// Result gives the result for the aggregation.
func (f *Count[A, B, R]) Result() R {
return R(f.count)
}

// NewCountArguments constructs arguments.
func NewCountArguments(a []int64) Arguments[int64, Void] {
return Arguments[int64, Void]{
arg0: a,
arg1: nil,
}
}
39 changes: 39 additions & 0 deletions banyand/measure/aggregate/count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestCount(t *testing.T) {
var err error

// case1: input int64 values
countInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT)
err = countInt64.Combine(aggregate.NewCountArguments(
[]int64{1, 2, 3}, // mock the "count" column
))
assert.NoError(t, err)
assert.Equal(t, int64(6), countInt64.Result())
}
56 changes: 56 additions & 0 deletions banyand/measure/aggregate/max.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Max calculates the maximum value of elements.
type Max[A, B Input, R Output] struct {
maximum R
}

// Combine takes elements to do the aggregation.
// Max uses type parameter A.
func (f *Max[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
if R(arg0) > f.maximum {
f.maximum = R(arg0)
}
case float64:
if R(arg0) > f.maximum {
f.maximum = R(arg0)
}
default:
return errFieldValueType
}
}
return nil
}

// Result gives the result for the aggregation.
func (f *Max[A, B, R]) Result() R {
return f.maximum
}

// NewMaxArguments constructs arguments.
func NewMaxArguments[A Input](a []A) Arguments[A, Void] {
return Arguments[A, Void]{
arg0: a,
arg1: nil,
}
}
47 changes: 47 additions & 0 deletions banyand/measure/aggregate/max_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestMax(t *testing.T) {
var err error

// case1: input int64 values
maxInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX)
err = maxInt64.Combine(aggregate.NewMaxArguments[int64](
[]int64{1, 2, 3}, // mock the "maximum" column
))
assert.NoError(t, err)
assert.Equal(t, int64(3), maxInt64.Result())

// case2: input float64 values
maxFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX)
err = maxFloat64.Combine(aggregate.NewMaxArguments[float64](
[]float64{1.0, 2.0, 3.0}, // mock the "maximum" column
))
assert.NoError(t, err)
assert.Equal(t, 3.0, maxFloat64.Result())
}
14 changes: 7 additions & 7 deletions banyand/measure/aggregate/min.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ type Min[A, B Input, R Output] struct {

// Combine takes elements to do the aggregation.
// Min uses type parameter A.
func (m *Min[A, B, R]) Combine(arguments Arguments[A, B]) error {
func (f *Min[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
if R(arg0) < m.minimum {
m.minimum = R(arg0)
if R(arg0) < f.minimum {
f.minimum = R(arg0)
}
case float64:
if R(arg0) < m.minimum {
m.minimum = R(arg0)
if R(arg0) < f.minimum {
f.minimum = R(arg0)
}
default:
return errFieldValueType
Expand All @@ -43,8 +43,8 @@ func (m *Min[A, B, R]) Combine(arguments Arguments[A, B]) error {
}

// Result gives the result for the aggregation.
func (m *Min[A, B, R]) Result() R {
return m.minimum
func (f *Min[A, B, R]) Result() R {
return f.minimum
}

// NewMinArguments constructs arguments.
Expand Down
67 changes: 67 additions & 0 deletions banyand/measure/aggregate/percent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Percent calculates the average value of elements.
type Percent[A, B Input, R Output] struct {
total int64
match int64
}

// Combine takes elements to do the aggregation.
// Percent uses none of type parameters.
func (f *Percent[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
f.total += arg0
default:
return errFieldValueType
}
}

for _, arg1 := range arguments.arg1 {
switch arg1 := any(arg1).(type) {
case int64:
f.match += arg1
default:
return errFieldValueType
}
}

return nil
}

// Result gives the result for the aggregation.
func (f *Percent[A, B, R]) Result() R {
// In unusual situations it returns the zero value.
if f.total == 0 {
return zeroValue[R]()
}
// Factory 10000 is used to improve accuracy. This factory is same as OAP.
// For example, "10 percent" will return 1000.
return R(f.match) * 10000 / R(f.total)
}

// NewPercentArguments constructs arguments.
func NewPercentArguments(a []int64, b []int64) Arguments[int64, int64] {
return Arguments[int64, int64]{
arg0: a,
arg1: b,
}
}
Loading