Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ospp] Supports measure aggregate function avg and min, and test cases. #521

Merged
merged 22 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/proto/banyandb/database/v1/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ message FieldSpec {
// compression_method indicates how to compress data during writing
CompressionMethod compression_method = 4 [(validate.rules).enum.defined_only = true];
// aggregate_function indicates how to aggregate data
model.v1.AggregationFunction aggregate_function = 5;
model.v1.MeasureAggregate aggregate_function = 5;
}

// Measure intends to store data point
Expand Down
97 changes: 97 additions & 0 deletions banyand/measure/aggregate/aggregate_function.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Package aggregate for measure aggregate function.
package aggregate

import (
"fmt"
"math"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
)

// Void type contains nothing. It works as a placeholder for type parameter.
type Void struct{}
StLeoX marked this conversation as resolved.
Show resolved Hide resolved

// MAFInput synchronizes with `pbv1.ValueType`, excluding `ValueTypeUnknown`
// and `ValueTypeBinaryData`.
type MAFInput interface {
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
~string | ~int64 | ~float64 | ~[]string | ~[]int64 | Void
}

// MAFKeep represents the only two types of value hold by MAF.
type MAFKeep interface {
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
~int64 | ~float64
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
}

var errFieldValueType = fmt.Errorf("unsupported input value type on this field")

// MAFArgument represents a segment of elements as an argument.
type MAFArgument[T MAFInput] struct {
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
elements []T
}

// MAFArguments represents the argument array, with at most three arguments.
type MAFArguments[A, B, C MAFInput] struct {
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
arg0 MAFArgument[A]
arg1 MAFArgument[B]
arg2 MAFArgument[C]
}

// MAF describes two stages of aggregation.
type MAF[A, B, C MAFInput, K MAFKeep] interface {
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
// Combine takes elements to do the aggregation.
// It uses a two-dimensional array to represent the argument array.
Combine(arguments MAFArguments[A, B, C]) error

// Result gives the result for the aggregation.
// It uses "keep" value type to represent output value type.
Result() K
}

// NewMeasureAggregateFunction is the factory for MAF.
func NewMeasureAggregateFunction[A, B, C MAFInput, K MAFKeep](aggregate modelv1.MeasureAggregate) (MAF[A, B, C, K], error) {
var function MAF[A, B, C, K]
switch aggregate {
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN:
function = &Min[A, B, C, K]{minimum: maxValue[K]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG:
function = &Avg[A, B, C, K]{summation: zeroValue[K](), count: 0}
default:
return nil, fmt.Errorf("MeasureAggregate unknown")
}

return function, nil
}

func zeroValue[K MAFKeep]() K {
var z K
return z
}

func maxValue[K MAFKeep]() (r K) {
switch x := any(&r).(type) {
case *int64:
*x = math.MaxInt64
case *float64:
*x = math.MaxFloat64
default:
panic("unreachable")
}
return
}
65 changes: 65 additions & 0 deletions banyand/measure/aggregate/avg.go
StLeoX marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Avg calculates the average value of elements.
type Avg[A, B, C MAFInput, K MAFKeep] struct {
summation K
count int64
}

// Combine takes elements to do the aggregation.
// Avg uses type parameter A and B.
func (a *Avg[A, B, C, K]) Combine(arguments MAFArguments[A, B, C]) error {
for _, arg0 := range arguments.arg0.elements {
switch arg0 := any(arg0).(type) {
case int64:
a.summation += K(arg0)
case float64:
a.summation += K(arg0)
case []int64:
for _, v := range arg0 {
a.summation += K(v)
}
default:
return errFieldValueType
}
}

for _, arg1 := range arguments.arg1.elements {
switch arg1 := any(arg1).(type) {
case int64:
a.count += arg1
default:
return errFieldValueType
}
}

return nil
}

// Result gives the result for the aggregation.
func (a *Avg[A, B, C, K]) Result() K {
// In unusual situations it returns the zero value.
if a.count == 0 {
return zeroValue[K]()
}
// According to the semantics of GoLang, the division of one int by another int
// returns an int, instead of a float.
return a.summation / K(a.count)
}
39 changes: 39 additions & 0 deletions banyand/measure/aggregate/avg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate
StLeoX marked this conversation as resolved.
Show resolved Hide resolved

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
)

func TestAvg(t *testing.T) {
var err error

avgInt64, _ := NewMeasureAggregateFunction[int64, int64, Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG)
err = avgInt64.Combine(MAFArguments[int64, int64, Void]{
arg0: MAFArgument[int64]{[]int64{1, 3, 3}}, // mock the "summation" column
arg1: MAFArgument[int64]{[]int64{1, 1, 1}}, // mock the "count" column
arg2: MAFArgument[Void]{},
})
assert.NoError(t, err)
assert.Equal(t, int64(2), avgInt64.Result()) // note that 7/3 becomes 2 as int
}
54 changes: 54 additions & 0 deletions banyand/measure/aggregate/min.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Min calculates the minimum value of elements.
type Min[A, B, C MAFInput, K MAFKeep] struct {
minimum K
}

// Combine takes elements to do the aggregation.
// Min uses type parameter A.
func (m *Min[A, B, C, K]) Combine(arguments MAFArguments[A, B, C]) error {
for _, arg0 := range arguments.arg0.elements {
switch arg0 := any(arg0).(type) {
case int64:
if K(arg0) < m.minimum {
m.minimum = K(arg0)
}
case float64:
if K(arg0) < m.minimum {
m.minimum = K(arg0)
}
case []int64:
for _, v := range arg0 {
if K(v) < m.minimum {
m.minimum = K(v)
}
}
default:
return errFieldValueType
}
}
return nil
}

// Result gives the result for the aggregation.
func (m *Min[A, B, C, K]) Result() K {
return m.minimum
}
70 changes: 70 additions & 0 deletions banyand/measure/aggregate/min_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
)

func TestMin(t *testing.T) {
var err error

// case1: input int64 elements
minInt64, _ := NewMeasureAggregateFunction[int64, Void, Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN)
err = minInt64.Combine(MAFArguments[int64, Void, Void]{
arg0: MAFArgument[int64]{[]int64{1, 2, 3}},
arg1: MAFArgument[Void]{},
arg2: MAFArgument[Void]{},
})
assert.NoError(t, err)
assert.Equal(t, int64(1), minInt64.Result())

// case2: input float64 elements
minFloat64, _ := NewMeasureAggregateFunction[float64, Void, Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN)
err = minFloat64.Combine(MAFArguments[float64, Void, Void]{
arg0: MAFArgument[float64]{[]float64{1.0, 2.0, 3.0}},
arg1: MAFArgument[Void]{},
arg2: MAFArgument[Void]{},
})
assert.NoError(t, err)
assert.Equal(t, 1.0, minFloat64.Result())

// case3: input []int64 elements
minInt64Arr, _ := NewMeasureAggregateFunction[[]int64, Void, Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN)
err = minInt64Arr.Combine(MAFArguments[[]int64, Void, Void]{
arg0: MAFArgument[[]int64]{[][]int64{{1, 2}, {10, 20}}},
arg1: MAFArgument[Void]{},
arg2: MAFArgument[Void]{},
})
assert.NoError(t, err)
assert.Equal(t, int64(1), minInt64Arr.Result())

// case4: unexpected input type
minStr, _ := NewMeasureAggregateFunction[string, Void, Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN)
err = minStr.Combine(MAFArguments[string, Void, Void]{
// fixme If there is no element, can't recognize the wrong input type. It needs at least one variable.
arg0: MAFArgument[string]{[]string{"a"}},
arg1: MAFArgument[Void]{},
arg2: MAFArgument[Void]{},
})
assert.Errorf(t, err, errFieldValueType.Error())
}
2 changes: 1 addition & 1 deletion docs/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ FieldSpec is the specification of field
| field_type | [FieldType](#banyandb-database-v1-FieldType) | | field_type denotes the type of field value |
| encoding_method | [EncodingMethod](#banyandb-database-v1-EncodingMethod) | | encoding_method indicates how to encode data during writing |
| compression_method | [CompressionMethod](#banyandb-database-v1-CompressionMethod) | | compression_method indicates how to compress data during writing |
| aggregate_function | [banyandb.model.v1.AggregationFunction](#banyandb-model-v1-AggregationFunction) | | aggregate_function indicates how to aggregate data |
| aggregate_function | [banyandb.model.v1.MeasureAggregate](#banyandb-model-v1-MeasureAggregate) | | aggregate_function indicates how to aggregate data |



Expand Down