Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev measure aggregate function #528

Open
wants to merge 37 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
00b2bdb
[API] MeasureAggregateFunctionService Support
StLeoX Jun 28, 2024
38aec7f
fix check
StLeoX Jun 29, 2024
4c94a7c
Merge branch 'main' into dev_measure_aggregate_function
wu-sheng Jun 29, 2024
ad3b14a
use MeasureAggregate enum
StLeoX Jul 31, 2024
d456b4f
Merge branch 'main' into dev_measure_aggregate_function
StLeoX Aug 1, 2024
acfe475
Merge branch 'main' into dev_measure_aggregate_function
hanahmily Aug 13, 2024
e05a97b
add PERCENTILE enum for MEASURE_AGGREGATE
StLeoX Aug 15, 2024
e1b7e98
add comments for MeasureAggregate enum
StLeoX Aug 16, 2024
b94f21b
Merge branch 'main' into dev_measure_aggregate_function
StLeoX Aug 17, 2024
7c9d27f
fix enum name error in FieldSpec
StLeoX Aug 22, 2024
11ccee1
a
StLeoX Aug 22, 2024
cfc3811
fix wrong enum name in FieldSpec
StLeoX Aug 22, 2024
31763db
Supports measure aggregate function avg and min, and test cases.
StLeoX Aug 25, 2024
13c1180
Merge remote-tracking branch 'origin/dev_measure_aggregate_function' …
StLeoX Aug 25, 2024
98076ff
Revert "a"
StLeoX Aug 25, 2024
515b3f9
Merge branch 'main' into dev_measure_aggregate_function
StLeoX Aug 25, 2024
deefc6e
fix Merge error
StLeoX Aug 25, 2024
420c4ef
remove struct MAFArgument[T]
StLeoX Aug 26, 2024
ac23ca7
rename Function
StLeoX Aug 29, 2024
2b5ca52
simplify Input and Arguments
StLeoX Aug 30, 2024
f6b6fe6
add NewArguments
StLeoX Aug 30, 2024
1098f25
supports `Max`
StLeoX Sep 3, 2024
aa722cd
supports `Count`
StLeoX Sep 3, 2024
337e2fc
supports `Sum`
StLeoX Sep 3, 2024
7de26d8
supports `Percent`
StLeoX Sep 3, 2024
fde584c
supports `Rate`
StLeoX Sep 3, 2024
792dc1b
add comments
StLeoX Sep 3, 2024
61eb634
Merge branch 'main' into dev_measure_aggregate_function
hanahmily Sep 3, 2024
6fa5573
Merge remote-tracking branch 'origin/dev_measure_aggregate_function' …
StLeoX Sep 5, 2024
806b6d9
fix unit test
StLeoX Sep 5, 2024
be89878
Merge branch 'main' into dev_measure_aggregate_function
StLeoX Sep 5, 2024
b3127eb
removes dynamic type assertion
StLeoX Sep 9, 2024
ad6416f
add to Function interface: FirstCumulation and SecondCumulation
StLeoX Sep 10, 2024
07b537d
Revert "add to Function interface: FirstCumulation and SecondCumulation"
StLeoX Sep 12, 2024
93ca015
change `Result() R` to `Result() (A, B, R)`
StLeoX Sep 12, 2024
89f07d5
introduce step-4 loop unrolling for sum-like aggregate functions
StLeoX Sep 18, 2024
d38a113
Merge branch 'main' into dev_measure_aggregate_function
wu-sheng Sep 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions banyand/measure/aggregate/aggregate_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
)

// Void type contains nothing. It works as a placeholder for type parameters of `Arguments`.
type Void struct{}
// It's implemented as int64, but it won't be used as an int64.
type Void int64

// Input covers possible types of Function's arguments. It synchronizes with `FieldType` in schema.
// Input covers possible types of Function's arguments. It synchronizes with `FieldType`.
// It also covers Void type.
type Input interface {
Void | ~int64 | ~float64
~int64 | ~float64
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
}

// Output covers possible types of Function's return value.
type Output interface {
~int64 | ~float64
}

var errFieldValueType = fmt.Errorf("unsupported input value type on this field")

// Arguments represents the argument array, with one argument or two arguments.
type Arguments[A, B Input] struct {
arg0 []A
Expand All @@ -52,8 +52,9 @@ type Function[A, B Input, R Output] interface {
// It uses a two-dimensional array to represent the argument array.
Combine(arguments Arguments[A, B]) error

// Result gives the result for the aggregation.
Result() R
// Result gives the result for the aggregation. R is the aggregating result,
// A is the first aggregating state, and B is the second aggregating state.
Result() (A, B, R)
}

// NewFunction constructs the aggregate function with given kind and parameter types.
Expand All @@ -62,8 +63,18 @@ func NewFunction[A, B Input, R Output](kind modelv1.MeasureAggregate) (Function[
switch kind {
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN:
function = &Min[A, B, R]{minimum: maxValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX:
function = &Max[A, B, R]{maximum: minValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT:
function = &Count[A, B, R]{count: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM:
function = &Sum[A, B, R]{summation: zeroValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG:
function = &Avg[A, B, R]{summation: zeroValue[R](), count: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_PERCENT:
function = &Percent[A, B, R]{total: 0, match: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_RATE:
function = &Rate[A, B, R]{denominator: 0, numerator: 0}
default:
return nil, fmt.Errorf("MeasureAggregate unknown")
}
Expand All @@ -76,6 +87,20 @@ func zeroValue[R Output]() R {
return r
}

func minValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
*a = math.MinInt64
case *float64:
*a = -math.MaxFloat64
case *string:
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
*a = ""
default:
panic("unreachable")
}
return
}

func maxValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
Expand Down
52 changes: 29 additions & 23 deletions banyand/measure/aggregate/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,46 @@ type Avg[A, B Input, R Output] struct {
}

// Combine takes elements to do the aggregation.
// Avg uses type parameter A and B.
func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
a.summation += R(arg0)
case float64:
a.summation += R(arg0)
default:
return errFieldValueType
}
// Avg uses type parameter A.
func (f *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
i := 0
n := len(arguments.arg0)
// step-4 aggregate
for ; i <= n-4; i += 4 {
f.summation += R(arguments.arg0[i]) + R(arguments.arg0[i+1]) +
R(arguments.arg0[i+2]) + R(arguments.arg0[i+3])
}
// tail aggregate
for ; i < n; i++ {
f.summation += R(arguments.arg0[i])
}

for _, arg1 := range arguments.arg1 {
switch arg1 := any(arg1).(type) {
case int64:
a.count += arg1
default:
return errFieldValueType
}
i = 0
n = len(arguments.arg1)
// step-4 aggregate
for ; i <= n-4; i += 4 {
f.count += int64(arguments.arg1[i]) + int64(arguments.arg1[i+1]) +
int64(arguments.arg1[i+2]) + int64(arguments.arg1[i+3])
}
// tail aggregate
for ; i < n; i++ {
f.count += int64(arguments.arg1[i])
}

return nil
}

// Result gives the result for the aggregation.
func (a *Avg[A, B, R]) Result() R {
func (f *Avg[A, B, R]) Result() (A, B, R) {
var average R
// In unusual situations it returns the zero value.
if a.count == 0 {
return zeroValue[R]()
if f.count == 0 {
average = zeroValue[R]()
}
// According to the semantics of GoLang, the division of one int by another int
// returns an int, instead of a float.
return a.summation / R(a.count)
// returns an int, instead of f float.
average = f.summation / R(f.count)
return A(f.summation), B(f.count), average
}

// NewAvgArguments constructs arguments.
Expand Down
13 changes: 10 additions & 3 deletions banyand/measure/aggregate/avg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ func TestAvg(t *testing.T) {
// case1: input int64 values
avgInt64, _ := aggregate.NewFunction[int64, int64, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG)
err = avgInt64.Combine(aggregate.NewAvgArguments[int64](
[]int64{1, 2, 3}, // mock the "summation" column
[]int64{1, 3, 3}, // mock the "summation" column
[]int64{1, 1, 1}, // mock the "count" column
))
assert.NoError(t, err)
assert.Equal(t, int64(2), avgInt64.Result()) // note that 7/3 becomes 2 as int
a1, b1, r1 := avgInt64.Result()
assert.Equal(t, int64(7), a1)
assert.Equal(t, int64(3), b1)
assert.Equal(t, int64(2), r1) // note that 7/3 becomes 2 as int

// case2: input float64 elements
avgFloat64, _ := aggregate.NewFunction[float64, int64, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG)
Expand All @@ -45,5 +48,9 @@ func TestAvg(t *testing.T) {
[]int64{1, 1, 1}, // mock the "count" column
))
assert.NoError(t, err)
assert.Equal(t, 7.0/3, avgFloat64.Result())
a2, b2, r2 := avgFloat64.Result()
assert.Equal(t, 7.0, a2)
assert.Equal(t, int64(3), b2)
assert.Equal(t, 7.0/3, r2)

}
53 changes: 53 additions & 0 deletions banyand/measure/aggregate/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Count calculates the count value of elements.
type Count[A, B Input, R Output] struct {
count int64
}

// Combine takes elements to do the aggregation.
// Count uses none of type parameters.
func (f *Count[A, B, R]) Combine(arguments Arguments[A, B]) error {
i := 0
n := len(arguments.arg0)
// step-4 aggregate
for ; i <= n-4; i += 4 {
f.count += int64(arguments.arg0[i]) + int64(arguments.arg0[i+1]) +
int64(arguments.arg0[i+2]) + int64(arguments.arg0[i+3])
}
// tail aggregate
for ; i < n; i++ {
f.count += int64(arguments.arg0[i])
}
return nil
}

// Result gives the result for the aggregation.
func (f *Count[A, B, R]) Result() (A, B, R) {
return A(f.count), zeroValue[B](), R(f.count)
}

// NewCountArguments constructs arguments.
func NewCountArguments(a []int64) Arguments[int64, Void] {
return Arguments[int64, Void]{
arg0: a,
arg1: nil,
}
}
40 changes: 40 additions & 0 deletions banyand/measure/aggregate/count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestCount(t *testing.T) {
var err error

// case1: input int64 values
countInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT)
err = countInt64.Combine(aggregate.NewCountArguments(
[]int64{1, 2, 3}, // mock the "count" column
))
assert.NoError(t, err)
_, _, r1 := countInt64.Result()
assert.Equal(t, int64(6), r1)
}
47 changes: 47 additions & 0 deletions banyand/measure/aggregate/max.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Max calculates the maximum value of elements.
type Max[A, B Input, R Output] struct {
maximum R
}

// Combine takes elements to do the aggregation.
// Max uses type parameter A.
func (f *Max[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
if R(arg0) > f.maximum {
f.maximum = R(arg0)
}
}
return nil
}

// Result gives the result for the aggregation.
func (f *Max[A, B, R]) Result() (A, B, R) {
return A(f.maximum), zeroValue[B](), f.maximum
}

// NewMaxArguments constructs arguments.
func NewMaxArguments[A Input](a []A) Arguments[A, Void] {
return Arguments[A, Void]{
arg0: a,
arg1: nil,
}
}
49 changes: 49 additions & 0 deletions banyand/measure/aggregate/max_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestMax(t *testing.T) {
var err error

// case1: input int64 values
maxInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX)
err = maxInt64.Combine(aggregate.NewMaxArguments[int64](
[]int64{1, 2, 3}, // mock the "maximum" column
))
assert.NoError(t, err)
_, _, r1 := maxInt64.Result()
assert.Equal(t, int64(3), r1)

// case2: input float64 values
maxFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX)
err = maxFloat64.Combine(aggregate.NewMaxArguments[float64](
[]float64{1.0, 2.0, 3.0}, // mock the "maximum" column
))
assert.NoError(t, err)
_, _, r2 := maxFloat64.Result()
assert.Equal(t, 3.0, r2)
}
Loading
Loading