From 27977040daad138627da34c2b6b0453fc9f95538 Mon Sep 17 00:00:00 2001 From: illia-li Date: Mon, 10 Jul 2023 14:03:26 -0400 Subject: [PATCH] add(count) added counters --- pkg/count/counters.go | 108 +++++++++++++++++++++ pkg/count/counters_test.go | 143 ++++++++++++++++++++++++++++ pkg/count/groups.go | 133 ++++++++++++++++++++++++++ pkg/count/print.go | 134 ++++++++++++++++++++++++++ pkg/count/total_counters.go | 182 ++++++++++++++++++++++++++++++++++++ pkg/jobs/jobs.go | 17 +++- pkg/status/status.go | 2 + pkg/store/store.go | 9 ++ 8 files changed, 724 insertions(+), 4 deletions(-) create mode 100644 pkg/count/counters.go create mode 100644 pkg/count/counters_test.go create mode 100644 pkg/count/groups.go create mode 100644 pkg/count/print.go create mode 100644 pkg/count/total_counters.go diff --git a/pkg/count/counters.go b/pkg/count/counters.go new file mode 100644 index 00000000..b11fcca5 --- /dev/null +++ b/pkg/count/counters.go @@ -0,0 +1,108 @@ +// Copyright 2019 ScyllaDB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package count + +import ( + "fmt" + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" +) + +type SimpleCounters []*SimpleCounter + +func (l SimpleCounters) Add(idx, in int) { + l[idx].Add(in) +} + +func (l SimpleCounters) Inc(idx int) { + l[idx].Inc() +} + +func (l SimpleCounters) Get(idx int) uint64 { + return l[idx].Get() +} + +func (l SimpleCounters) GetCounter(idx int) *SimpleCounter { + return l[idx] +} + +func (l SimpleCounters) printFull() printRows { + out := make(printRows, 0, len(l)) + for idx := range l { + if l.Get(idx) == 0 { + continue + } + out = append(out, l[idx].printFull()) + } + out.alignRows() + return out +} + +type SimpleCounter struct { + group *Group + name string + prometheus prometheus.Counter + unit string + description string + val atomic.Uint64 + inPrometheus bool + _ noCopy +} + +func (c *SimpleCounter) Add(in int) { + if in > 0 { + c.val.Add(uint64(in)) + if c.inPrometheus { + c.prometheus.Add(float64(in)) + } + } + if in < 0 { + panic("add value should be >0") + } +} + +func (c *SimpleCounter) Inc() { + c.val.Add(1) + if c.inPrometheus { + c.prometheus.Inc() + } +} + +func (c *SimpleCounter) Get() uint64 { + return c.val.Load() +} + +func (c *SimpleCounter) printFull() printRow { + prometh := "prometheus:no " + if c.inPrometheus { + prometh = "prometheus:yes" + } + return printRow{ + "", + simpleCounterName, + c.name + ":", + fmt.Sprintf("%d", c.val.Load()), + separator + c.unit, + separator + prometh, + separator + "description:" + c.description, + } +} + +type noCopy struct{} + +func (*noCopy) Lock() {} + +func (*noCopy) Unlock() {} diff --git a/pkg/count/counters_test.go b/pkg/count/counters_test.go new file mode 100644 index 00000000..10e0fcbb --- /dev/null +++ b/pkg/count/counters_test.go @@ -0,0 +1,143 @@ +// Copyright 2019 ScyllaDB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package count_test + +import ( + "sync" + "testing" + "time" + + "golang.org/x/exp/rand" + + "github.com/scylladb/gemini/pkg/count" +) + +var rnd = rand.New(rand.NewSource(uint64(time.Now().Unix()))) + +func TestSimpleCounters(t *testing.T) { + t.Parallel() + countersInfo := []count.Info{{ + Name: "test simple counter 1", + Unit: "ms", + PrometheusIntegration: false, + Description: "test counts", + }, { + Name: "test simple counter 123123", + Unit: "ssm", + PrometheusIntegration: true, + Description: "test counts", + }, { + Name: "test simple counter 5656565", + Unit: "s", + PrometheusIntegration: true, + Description: "test counts", + }} + group := count.InitGroup("test group", "testing", true) + sCounters := group.AddSimpleCounters(countersInfo) + workers := 10 + adds := 100000 + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + scOperations(sCounters, adds) + wg.Done() + }() + } + wg.Wait() + sum := getSimpleCounterSum(sCounters) + if sum != workers*adds { + t.Errorf("wrong simple counters work. expected sum:%d, received sum:%d", workers*adds, sum) + } + count.PrintAllGroups() +} + +func TestTotalCounters(t *testing.T) { + t.Parallel() + countersNames := []string{ + "sub counter1", + "sub counter200", + "sub counter3", + "sub counter4", + "sub counter5000", + "sub counter6", + "sub counter7", + "sub counter800000", + "sub counter9", + "sub counter10", + } + group := count.InitGroup("test group", "testing", true) + group2 := group.AddGroup("test group222", "testing222", true) + tCounter1 := group.AddTotalCounter(count.Info{Name: "total counter 1", Unit: "qty", PrometheusIntegration: false, Description: "count qty"}, "count", countersNames) + tCounter2 := group2.AddTotalCounter(count.Info{Name: "total counter 2", Unit: "ps", PrometheusIntegration: true, Description: "count ps"}, "count", countersNames) + tCounters := count.TotalCounters{tCounter1, tCounter2} + workers := 10 + adds := 100000 + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + totalCounterOperations(tCounters, adds) + wg.Done() + }() + } + wg.Wait() + tSum, sum := getTotalCounterSum(tCounters) + if sum != workers*adds { + t.Errorf("wrong simple counters work. expected sum:%d, received sum:%d", workers*adds, sum) + } + if tSum != workers*adds { + t.Errorf("wrong simple counters work. expected sum:%d, received sum:%d", workers*adds, sum) + } + count.PrintAllGroups() +} + +func scOperations(counters count.SimpleCounters, adds int) { + cl := len(counters) + for c := 0; c < adds; c++ { + counters[rnd.Intn(cl)].Inc() + counters[rnd.Intn(cl)].Get() + } +} + +func getSimpleCounterSum(counters count.SimpleCounters) int { + sum := 0 + for idx := range counters { + sum += int(counters[idx].Get()) + } + return sum +} + +func totalCounterOperations(counters count.TotalCounters, adds int) { + cl := len(counters) + for c := 0; c < adds; c++ { + n := rnd.Intn(10) + counters[rnd.Intn(cl)].Inc(n) + counters[rnd.Intn(cl)].Get(n) + } +} + +func getTotalCounterSum(counters count.TotalCounters) (int, int) { + sumTotal := 0 + sum := 0 + for idx := range counters { + sumTotal += int(counters[idx].GetTotal()) + subCounters := counters[idx].GetSubCounters() + for _, sub := range subCounters { + sum += int(sub.Get()) + } + } + return sumTotal, sum +} diff --git a/pkg/count/groups.go b/pkg/count/groups.go new file mode 100644 index 00000000..4950c56a --- /dev/null +++ b/pkg/count/groups.go @@ -0,0 +1,133 @@ +// Copyright 2019 ScyllaDB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package count + +import ( + "sync" + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + simpleCounterName = "sc." + totalCounterName = "tc." + subCounterName = "uc." + groupName = "gr." +) + +var ( + allGroups = make(Groups, 0) + allGroupsMute sync.RWMutex +) + +var StmtsCounters = InitGroup("generated stmt`s", "count of all generated stmt`s", true) + +type Group struct { + parentGroup *Group + description string + name string + groups Groups + simpleCounters SimpleCounters + totalCounters TotalCounters + active bool + mut sync.RWMutex +} + +type Groups []*Group + +type Info struct { + Name string + Unit string + Description string + PrometheusIntegration bool +} + +func InitGroup(name, description string, active bool) *Group { + group := Group{ + parentGroup: nil, + name: name, + description: description, + active: active, + } + allGroupsMute.Lock() + allGroups = append(allGroups, &group) + allGroupsMute.Unlock() + return &group +} + +func (g *Group) AddGroup(name, description string, active bool) *Group { + group := Group{ + parentGroup: g, + name: name, + description: description, + active: active, + } + if !g.active { + group.active = false + } + g.mut.Lock() + g.groups = append(g.groups, &group) + g.mut.Unlock() + return &group +} + +func (g *Group) AddSimpleCounters(counters []Info) SimpleCounters { + sCounters := make(SimpleCounters, len(counters)) + for idx := range sCounters { + sCounters[idx] = g.initCounter(counters[idx]) + } + defer g.mut.Unlock() + g.mut.Lock() + g.simpleCounters = sCounters + return sCounters +} + +func (g *Group) AddSimpleCounter(counter Info) *SimpleCounter { + defer g.mut.Unlock() + g.mut.Lock() + + sCounter := g.initCounter(counter) + g.simpleCounters = append(g.simpleCounters, sCounter) + return sCounter +} + +func (g *Group) initCounter(counter Info) *SimpleCounter { + newCounter := &SimpleCounter{ + name: counter.Name, + unit: counter.Unit, + inPrometheus: counter.PrometheusIntegration, + description: counter.Description, + group: g, + val: atomic.Uint64{}, + } + if counter.PrometheusIntegration { + newCounter.prometheus = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: g.getParentGroupName(), + Subsystem: g.name, + Name: counter.Name, + Help: counter.Description, + }) + } + return newCounter +} + +func (g *Group) getParentGroupName() string { + pgName := "" + if g.parentGroup != nil { + pgName = g.parentGroup.name + } + return pgName +} diff --git a/pkg/count/print.go b/pkg/count/print.go new file mode 100644 index 00000000..2e02f1d8 --- /dev/null +++ b/pkg/count/print.go @@ -0,0 +1,134 @@ +// Copyright 2019 ScyllaDB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package count + +import ( + "fmt" + "strings" +) + +const separator = " " + +type printRows []printRow + +type printRow [7]string + +func (l printRows) alignRows() { + maxNameAndValLen := 0 + maxUnitLen := 0 + for r := range l { + nameAndValLen := len(l[r][2]) + len(l[r][3]) + if nameAndValLen > maxNameAndValLen { + maxNameAndValLen = nameAndValLen + } + unitLen := len(l[r][4]) + if unitLen > maxUnitLen { + maxUnitLen = unitLen + } + } + for r := range l { + nameAndValLen := len(l[r][2]) + len(l[r][3]) + if nameAndValLen < maxNameAndValLen { + l[r][3] = strings.Repeat(" ", maxNameAndValLen-nameAndValLen) + l[r][3] + } + unitLen := len(l[r][4]) + if unitLen < maxUnitLen { + l[r][4] += strings.Repeat(" ", maxUnitLen-unitLen) + } + } +} + +func (p printRow) getString() string { + out := "" + for i := range p { + out += p[i] + } + + return out +} + +func (l printRows) getStrings() []string { + out := make([]string, len(l)) + for i := range l { + out[i] = l[i].getString() + } + return out +} + +func getPlHolder(idx, lastIdx int) string { + plHolder := " ├" + if idx == lastIdx-1 { + plHolder = " └" + } + return plHolder +} + +func (g *Group) printFull() printRows { + out := make(printRows, 0, 20) + + active := "off" + if g.active { + active = "on " + } + + fistRow := printRow{ + "", + groupName, + g.name + ":", + "", + "", + separator + active, + separator + "description:" + g.description, + } + out = append(out, fistRow) + out = append(out, g.simpleCounters.printFull()...) + out = append(out, g.totalCounters.printFull()...) + if len(g.groups) != 0 { + for idx := range g.groups { + out = append(out, g.groups[idx].printFull()...) + } + } + lastElem := false + + // add pretty grouping marks + for idx := len(out) - 1; idx > 0; idx-- { + plHolder := " " + if lastElem { + plHolder = " │" + } + if out[idx][0] == "" { + if !lastElem { + lastElem = true + plHolder = " └" + } else { + plHolder = " ├" + } + } + out[idx][0] = plHolder + out[idx][0] + } + return out +} + +func (g *Group) Print() { + tmp := g.printFull() + out := tmp.getStrings() + fmt.Println(strings.Join(out, "\n")) +} + +func PrintAllGroups() { + for idx := range allGroups { + allGroups[idx].Print() + } +} diff --git a/pkg/count/total_counters.go b/pkg/count/total_counters.go new file mode 100644 index 00000000..3937bced --- /dev/null +++ b/pkg/count/total_counters.go @@ -0,0 +1,182 @@ +// Copyright 2019 ScyllaDB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package count + +import ( + "fmt" + "strings" + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" +) + +type TotalCounters []*TotalCounter + +func (l TotalCounters) printFull() printRows { + out := make(printRows, 0, len(l)*5) + for idx := range l { + out = append(out, l[idx].printFull()...) + } + return out +} + +type TotalCounter struct { + prometheus *prometheus.CounterVec + group *Group + name string + unit string + description string + subCounters SubCounters + val atomic.Uint64 + prometheusIntegration bool + _ noCopy +} + +type SubCounters []*SubCounter + +type SubCounter struct { + tc *TotalCounter + prometheus prometheus.Counter + name string + val atomic.Uint64 +} + +func (c *TotalCounter) Add(idx, in int) { + c.subCounters[idx].Add(in) +} + +func (c *TotalCounter) Inc(idx int) { + c.subCounters[idx].Inc() +} + +func (c *TotalCounter) Get(idx int) uint64 { + return c.subCounters[idx].val.Load() +} + +func (c *TotalCounter) GetTotal() uint64 { + return c.val.Load() +} + +func (c *TotalCounter) GetSubCounters() SubCounters { + return c.subCounters +} + +func (c *SubCounter) Add(in int) { + if in > 0 { + c.tc.val.Add(uint64(in)) + c.val.Add(uint64(in)) + if c.tc.prometheusIntegration { + c.prometheus.Add(float64(in)) + } + } + if in < 0 { + panic("add value should be >0") + } +} + +func (c *SubCounter) Inc() { + c.tc.val.Add(1) + c.val.Add(1) + if c.tc.prometheusIntegration { + c.prometheus.Inc() + } +} + +func (c *SubCounter) Get() uint64 { + return c.val.Load() +} + +func (c *TotalCounter) printFull() printRows { + out := make(printRows, 0, len(c.subCounters)+1) + prometh := "prometheus:no " + if c.prometheusIntegration { + prometh = "prometheus:yes" + } + + fistRow := printRow{ + "", + totalCounterName, + c.name + ":", + fmt.Sprintf("%d", c.val.Load()), + separator + c.unit, + separator + prometh, + separator + "description:" + c.description, + } + out = append(out, fistRow) + + subCountRows := make(printRows, 0, len(c.subCounters)) + for idx := range c.subCounters { + if c.subCounters[idx].Get() == 0 { + continue + } + percent := separator + fmt.Sprintf("%.3f", 100*float64(c.subCounters[idx].Get())/float64(c.val.Load())) + if len(percent) < 7 { + percent = strings.Repeat(" ", 7-len(percent)) + percent + } + subCountRows = append(subCountRows, printRow{ + "", + subCounterName, + c.subCounters[idx].name + ":", + fmt.Sprintf("%d", c.subCounters[idx].Get()), + separator + c.unit, + percent, + separator + "%", + }) + } + for idx := range subCountRows { + subCountRows[idx][0] = getPlHolder(idx, len(subCountRows)) + } + subCountRows.alignRows() + out = append(out, subCountRows...) + + return out +} + +func (g *Group) AddTotalCounter(counter Info, prometheusLabel string, subCounters []string) *TotalCounter { + tCounter := TotalCounter{ + group: g, + name: counter.Name, + unit: counter.Unit, + prometheusIntegration: counter.PrometheusIntegration, + description: counter.Description, + val: atomic.Uint64{}, + } + + sCounters := make(SubCounters, len(subCounters)) + if counter.PrometheusIntegration { + tCounter.prometheus = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: g.getParentGroupName(), + Subsystem: g.name, + Name: counter.Name, + Help: counter.Description, + }, []string{prometheusLabel}) + } + + for idx := range sCounters { + sCounters[idx] = &SubCounter{ + tc: &tCounter, + val: atomic.Uint64{}, + name: subCounters[idx], + } + if counter.PrometheusIntegration { + sCounters[idx].prometheus = tCounter.prometheus.WithLabelValues(sCounters[idx].name) + } + } + tCounter.subCounters = sCounters + defer g.mut.Unlock() + g.mut.Lock() + g.totalCounters = append(g.totalCounters, &tCounter) + return &tCounter +} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index ad4778ac..e3a2bad8 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -21,13 +21,13 @@ import ( "fmt" "time" + "github.com/scylladb/gemini/pkg/count" "github.com/scylladb/gemini/pkg/generators" - "github.com/scylladb/gemini/pkg/store" - "github.com/scylladb/gemini/pkg/typedef" - "github.com/scylladb/gemini/pkg/joberror" "github.com/scylladb/gemini/pkg/status" "github.com/scylladb/gemini/pkg/stop" + "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/typedef" "go.uber.org/zap" "golang.org/x/exp/rand" @@ -53,6 +53,13 @@ var ( mutate = job{name: mutateName, function: mutationJob} ) +var stmtsCounter = count.StmtsCounters.AddTotalCounter(count.Info{ + Name: "total statement`s count", + Unit: "pc.", + Description: "count of all generated statement`s by its type", + PrometheusIntegration: false, +}, "", []string{"Select", "SelectRange", "SelectByIndex", "SelectFromMaterializedView", "Delete", "Insert", "InsertJSON", "Update", "AlterColumn", "DropColumn", "AddColumn"}) + type List struct { name string jobs []job @@ -325,6 +332,7 @@ func ddl( if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil { w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL())) } + stmtsCounter.Inc(int(ddlStmt.QueryType)) if err = s.Mutate(ctx, ddlStmt.Query); err != nil { if errors.Is(err, context.Canceled) { return nil @@ -374,6 +382,7 @@ func mutation( } mutateQuery := mutateStmt.Query mutateValues := mutateStmt.Values + stmtsCounter.Inc(int(mutateStmt.QueryType)) if mutateStmt.ValuesWithToken != nil { defer func() { g.GiveOld(mutateStmt.ValuesWithToken) @@ -426,7 +435,7 @@ func validation( } delay = sc.AsyncObjectStabilizationDelay } - + stmtsCounter.Inc(int(stmt.QueryType)) var lastErr, err error attempt := 1 for { diff --git a/pkg/status/status.go b/pkg/status/status.go index a81988b7..e396fe30 100644 --- a/pkg/status/status.go +++ b/pkg/status/status.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" + "github.com/scylladb/gemini/pkg/count" "github.com/scylladb/gemini/pkg/joberror" "github.com/scylladb/gemini/pkg/typedef" ) @@ -96,6 +97,7 @@ func (gs *GlobalStatus) PrintResult(w io.Writer, schema *typedef.Schema, version jsonSchema, _ := json.MarshalIndent(schema, "", " ") fmt.Printf("Schema: %v\n", string(jsonSchema)) } + count.PrintAllGroups() } func NewGlobalStatus(limit int32) *GlobalStatus { diff --git a/pkg/store/store.go b/pkg/store/store.go index 267c6d50..66ee7145 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -35,9 +35,17 @@ import ( "go.uber.org/multierr" "gopkg.in/inf.v0" + "github.com/scylladb/gemini/pkg/count" "github.com/scylladb/gemini/pkg/typedef" ) +var nilRowsValidate = count.StmtsCounters.AddSimpleCounter(count.Info{ + Name: "nil rows validate responses", + Unit: "pc.", + Description: "count validate responses with nil rows", + PrometheusIntegration: false, +}) + type loader interface { load(context.Context, qb.Builder, []interface{}) ([]map[string]interface{}, error) } @@ -189,6 +197,7 @@ func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, build return nil } if len(testRows) == 0 && len(oracleRows) == 0 { + nilRowsValidate.Inc() return nil } if len(testRows) != len(oracleRows) {