Skip to content

Commit

Permalink
Merge pull request #1 from StLeoX/dev_measure_aggregate_function
Browse files Browse the repository at this point in the history
Dev measure aggregate function
  • Loading branch information
StLeoX committed Sep 5, 2024
2 parents 17dc8c0 + be89878 commit 5bfbb47
Show file tree
Hide file tree
Showing 13 changed files with 545 additions and 16 deletions.
24 changes: 24 additions & 0 deletions banyand/measure/aggregate/aggregate_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,18 @@ func NewFunction[A, B Input, R Output](kind modelv1.MeasureAggregate) (Function[
switch kind {
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN:
function = &Min[A, B, R]{minimum: maxValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX:
function = &Max[A, B, R]{maximum: minValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT:
function = &Count[A, B, R]{count: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM:
function = &Sum[A, B, R]{summation: zeroValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG:
function = &Avg[A, B, R]{summation: zeroValue[R](), count: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_PERCENT:
function = &Percent[A, B, R]{total: 0, match: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_RATE:
function = &Rate[A, B, R]{denominator: 0, numerator: 0}
default:
return nil, fmt.Errorf("MeasureAggregate unknown")
}
Expand All @@ -76,6 +86,20 @@ func zeroValue[R Output]() R {
return r
}

func minValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
*a = math.MinInt64
case *float64:
*a = -math.MaxFloat64
case *string:
*a = ""
default:
panic("unreachable")
}
return
}

func maxValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
Expand Down
18 changes: 9 additions & 9 deletions banyand/measure/aggregate/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ type Avg[A, B Input, R Output] struct {
}

// Combine takes elements to do the aggregation.
// Avg uses type parameter A and B.
func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
// Avg uses type parameter A.
func (f *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
a.summation += R(arg0)
f.summation += R(arg0)
case float64:
a.summation += R(arg0)
f.summation += R(arg0)
default:
return errFieldValueType
}
Expand All @@ -40,7 +40,7 @@ func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg1 := range arguments.arg1 {
switch arg1 := any(arg1).(type) {
case int64:
a.count += arg1
f.count += arg1
default:
return errFieldValueType
}
Expand All @@ -50,14 +50,14 @@ func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
}

// Result gives the result for the aggregation.
func (a *Avg[A, B, R]) Result() R {
func (f *Avg[A, B, R]) Result() R {
// In unusual situations it returns the zero value.
if a.count == 0 {
if f.count == 0 {
return zeroValue[R]()
}
// According to the semantics of GoLang, the division of one int by another int
// returns an int, instead of a float.
return a.summation / R(a.count)
// returns an int, instead of f float.
return f.summation / R(f.count)
}

// NewAvgArguments constructs arguments.
Expand Down
50 changes: 50 additions & 0 deletions banyand/measure/aggregate/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Count calculates the count value of elements.
type Count[A, B Input, R Output] struct {
count int64
}

// Combine takes elements to do the aggregation.
// Count uses none of type parameters.
func (f *Count[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
f.count += arg0
default:
return errFieldValueType
}
}
return nil
}

// Result gives the result for the aggregation.
func (f *Count[A, B, R]) Result() R {
return R(f.count)
}

// NewCountArguments constructs arguments.
func NewCountArguments(a []int64) Arguments[int64, Void] {
return Arguments[int64, Void]{
arg0: a,
arg1: nil,
}
}
39 changes: 39 additions & 0 deletions banyand/measure/aggregate/count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestCount(t *testing.T) {
var err error

// case1: input int64 values
countInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT)
err = countInt64.Combine(aggregate.NewCountArguments(
[]int64{1, 2, 3}, // mock the "count" column
))
assert.NoError(t, err)
assert.Equal(t, int64(6), countInt64.Result())
}
56 changes: 56 additions & 0 deletions banyand/measure/aggregate/max.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Max calculates the maximum value of elements.
type Max[A, B Input, R Output] struct {
maximum R
}

// Combine takes elements to do the aggregation.
// Max uses type parameter A.
func (f *Max[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
if R(arg0) > f.maximum {
f.maximum = R(arg0)
}
case float64:
if R(arg0) > f.maximum {
f.maximum = R(arg0)
}
default:
return errFieldValueType
}
}
return nil
}

// Result gives the result for the aggregation.
func (f *Max[A, B, R]) Result() R {
return f.maximum
}

// NewMaxArguments constructs arguments.
func NewMaxArguments[A Input](a []A) Arguments[A, Void] {
return Arguments[A, Void]{
arg0: a,
arg1: nil,
}
}
47 changes: 47 additions & 0 deletions banyand/measure/aggregate/max_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestMax(t *testing.T) {
var err error

// case1: input int64 values
maxInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX)
err = maxInt64.Combine(aggregate.NewMaxArguments[int64](
[]int64{1, 2, 3}, // mock the "maximum" column
))
assert.NoError(t, err)
assert.Equal(t, int64(3), maxInt64.Result())

// case2: input float64 values
maxFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX)
err = maxFloat64.Combine(aggregate.NewMaxArguments[float64](
[]float64{1.0, 2.0, 3.0}, // mock the "maximum" column
))
assert.NoError(t, err)
assert.Equal(t, 3.0, maxFloat64.Result())
}
14 changes: 7 additions & 7 deletions banyand/measure/aggregate/min.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ type Min[A, B Input, R Output] struct {

// Combine takes elements to do the aggregation.
// Min uses type parameter A.
func (m *Min[A, B, R]) Combine(arguments Arguments[A, B]) error {
func (f *Min[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
if R(arg0) < m.minimum {
m.minimum = R(arg0)
if R(arg0) < f.minimum {
f.minimum = R(arg0)
}
case float64:
if R(arg0) < m.minimum {
m.minimum = R(arg0)
if R(arg0) < f.minimum {
f.minimum = R(arg0)
}
default:
return errFieldValueType
Expand All @@ -43,8 +43,8 @@ func (m *Min[A, B, R]) Combine(arguments Arguments[A, B]) error {
}

// Result gives the result for the aggregation.
func (m *Min[A, B, R]) Result() R {
return m.minimum
func (f *Min[A, B, R]) Result() R {
return f.minimum
}

// NewMinArguments constructs arguments.
Expand Down
67 changes: 67 additions & 0 deletions banyand/measure/aggregate/percent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Percent calculates the average value of elements.
type Percent[A, B Input, R Output] struct {
total int64
match int64
}

// Combine takes elements to do the aggregation.
// Percent uses none of type parameters.
func (f *Percent[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
f.total += arg0
default:
return errFieldValueType
}
}

for _, arg1 := range arguments.arg1 {
switch arg1 := any(arg1).(type) {
case int64:
f.match += arg1
default:
return errFieldValueType
}
}

return nil
}

// Result gives the result for the aggregation.
func (f *Percent[A, B, R]) Result() R {
// In unusual situations it returns the zero value.
if f.total == 0 {
return zeroValue[R]()
}
// Factory 10000 is used to improve accuracy. This factory is same as OAP.
// For example, "10 percent" will return 1000.
return R(f.match) * 10000 / R(f.total)
}

// NewPercentArguments constructs arguments.
func NewPercentArguments(a []int64, b []int64) Arguments[int64, int64] {
return Arguments[int64, int64]{
arg0: a,
arg1: b,
}
}
Loading

0 comments on commit 5bfbb47

Please sign in to comment.