Skip to content

Commit

Permalink
feature: redis-space refact (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremylv1992 authored Nov 2, 2023
1 parent 159b1d3 commit cc3dfcd
Show file tree
Hide file tree
Showing 56 changed files with 6,165 additions and 819 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
数据链路为蓝鲸可观测平台提供通用统一的数据采集、转换和存查能力。



## 总览
- 设计理念:

Expand Down
4 changes: 2 additions & 2 deletions pkg/offline-data-archive/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ require (
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae // indirect
github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/philhofer/fwd v1.0.0 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.3.0 // indirect
Expand All @@ -119,7 +119,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/tinylib/msgp v1.0.2 // indirect
github.com/tinylib/msgp v1.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
Expand Down
3 changes: 3 additions & 0 deletions pkg/offline-data-archive/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZ
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -458,6 +459,7 @@ github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8=
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
Expand Down Expand Up @@ -756,6 +758,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
Expand Down
77 changes: 48 additions & 29 deletions pkg/unify-query/cmdb/v1beta1/v1beta1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ func mockData(ctx context.Context) *curl.TestCurl {

metadata.GetQueryRouter().MockSpaceUid(consul.VictoriaMetricsStorageType)

tsdb.SetStorage(consul.VictoriaMetricsStorageType, &tsdb.Storage{
vmStorageID := "1"
vmStorageIDInt := int64(1)
influxdbStorageID := "2"
influxdbStorageIDInt := int64(2)

tsdb.SetStorage(vmStorageID, &tsdb.Storage{
Type: consul.VictoriaMetricsStorageType,
Instance: &victoriaMetrics.Instance{
Ctx: ctx,
Expand All @@ -197,7 +202,7 @@ func mockData(ctx context.Context) *curl.TestCurl {
AuthenticationMethod: "token",
},
})
tsdb.SetStorage(consul.InfluxDBStorageType, &tsdb.Storage{
tsdb.SetStorage(influxdbStorageID, &tsdb.Storage{
Type: consul.InfluxDBStorageType,
Instance: tsdbInfluxdb.NewInstance(
context.TODO(),
Expand All @@ -213,33 +218,47 @@ func mockData(ctx context.Context) *curl.TestCurl {
),
})
mock.SetRedisClient(ctx, "test_model")
mock.SetSpaceAndProxyMockData(ctx, "v1beat1_test", "v1beat1_test", consul.InfluxDBStorageType, &redis.TsDB{
TableID: "db.measurement",
Field: []string{"node_with_system_relation", "node_with_pod_relation"},
MeasurementType: redis.BkSplitMeasurement,
Filters: []redis.Filter{},
SegmentedEnable: false,
DataLabel: "datalabel",
}, &ir.Proxy{
StorageID: consul.InfluxDBStorageType,
Db: "2_bkmonitor_time_series_1572864",
Measurement: "__default__",
})

mock.SetSpaceAndProxyMockData(ctx, "v1beat1_test", "v1beat1_test", consul.VictoriaMetricsStorageType, &redis.TsDB{
TableID: "db_vm.measurement",
Field: []string{"node_with_system_relation", "node_with_pod_relation"},
MeasurementType: redis.BkSplitMeasurement,
Filters: []redis.Filter{},
SegmentedEnable: false,
DataLabel: "datalabel",
}, &ir.Proxy{
StorageID: consul.VictoriaMetricsStorageType,
Db: "db_vm",
Measurement: "__default__",
VmRt: "2_bkmonitor_time_series_1572864_vm_rt",
})

mock.SetSpaceTsDbMockData(
ctx, "v1beat1_test", "v1beat1_test",
ir.SpaceInfo{
consul.InfluxDBStorageType: ir.Space{
"db.measurement": &ir.SpaceResultTable{
TableId: "db.measurement",
Filters: []map[string]string{},
},
},
consul.VictoriaMetricsStorageType: ir.Space{
"db_vm.measurement": &ir.SpaceResultTable{
TableId: "db_vm.measurement",
Filters: []map[string]string{},
},
},
},
ir.ResultTableDetailInfo{
"db.measurement": &ir.ResultTableDetail{
Fields: []string{"node_with_system_relation", "node_with_pod_relation"},
MeasurementType: redis.BkSplitMeasurement,
DataLabel: "datalabel",
StorageId: influxdbStorageIDInt,
DB: "2_bkmonitor_time_series_1572864",
Measurement: "__default__",
},
"db_vm.measurement": &ir.ResultTableDetail{
Fields: []string{"node_with_system_relation", "node_with_pod_relation"},
MeasurementType: redis.BkSplitMeasurement,
DataLabel: "datalabel",
StorageId: vmStorageIDInt,
DB: "db_vm",
Measurement: "__default__",
VmRt: "2_bkmonitor_time_series_1572864_vm_rt",
},
},
ir.FieldToResultTable{
"node_with_system_relation": ir.ResultTableList{"db.measurement", "db_vm.measurement"},
"node_with_pod_relation": ir.ResultTableList{"db.measurement", "db_vm.measurement"},
},
nil,
)
return mockCurl
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/unify-query/consul/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func ReloadRouterInfo() (map[string][]*PipelineConfig, error) {
if err != nil {
return nil, err
}
log.Infof(context.TODO(), "get meatadata path :%v", paths)
log.Debugf(context.TODO(), "get meatadata path :%v", paths)

pipelineConfMap := make(map[string][]*PipelineConfig, len(paths))

Expand Down
2 changes: 1 addition & 1 deletion pkg/unify-query/curl/curl.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *HttpCurl) Request(ctx context.Context, method string, opt Options) (*ht
Transport: otelhttp.NewTransport(http.DefaultTransport),
}

c.Log.Ctx(ctx).Info(fmt.Sprintf("[%s] %s", method, opt.UrlPath))
c.Log.Ctx(ctx).Debug(fmt.Sprintf("[%s] %s", method, opt.UrlPath))

req, err := http.NewRequestWithContext(ctx, method, opt.UrlPath, bytes.NewBuffer(opt.Body))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/unify-query/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Downsample(points []promql.Point, factor float64) []promql.Point {
threshold = int(math.Ceil(float64(len(points)) * factor))
downSamplePoints = lttbFunc(points, threshold)

log.Infof(context.TODO(), "downsample series done %s %d %s %d %s %d",
log.Debugf(context.TODO(), "downsample series done %s %d %s %d %s %d",
"threshold", threshold,
"rawPointCount", len(points),
"downsamplePointCount", len(downSamplePoints),
Expand Down
2 changes: 1 addition & 1 deletion pkg/unify-query/featureFlag/featureFlag.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func setEvent(ctx context.Context, events []exporter.FeatureEvent) error {
if err != nil {
return err
}
log.Infof(ctx, string(info))
log.Debugf(ctx, string(info))
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/unify-query/influxdb/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (e *endpointSet) Update(ctx context.Context) {
continue
}

log.Infof(ctx, "delete endpoint %s with address: %s, protocol: %s", addr, er.address, er.protocol)
log.Debugf(ctx, "delete endpoint %s with address: %s, protocol: %s", addr, er.address, er.protocol)
er.Close()
delete(endpoints, addr)
}
Expand All @@ -151,12 +151,12 @@ func (e *endpointSet) Update(ctx context.Context) {
continue
}

log.Infof(ctx, "connect endpoint %s with address: %s, protocol: %s", addr, er.address, er.protocol)
log.Debugf(ctx, "connect endpoint %s with address: %s, protocol: %s", addr, er.address, er.protocol)
endpoints[addr] = er
}

if len(endpoints) > 0 {
log.Infof(ctx, "old: %+v(%d) => new: %+v(%d)", endpoints, len(endpoints), activeEndpoints, len(activeEndpoints))
log.Debugf(ctx, "old: %+v(%d) => new: %+v(%d)", endpoints, len(endpoints), activeEndpoints, len(activeEndpoints))

e.endpointsMtx.Lock()
e.endpoints = endpoints
Expand Down
8 changes: 4 additions & 4 deletions pkg/unify-query/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func QueryInfosAsync(ctx context.Context, sqlInfos []SQLInfo, precision string,
trace.InsertIntIntoSpan("sql-nums", len(sqlInfos), span)
trace.InsertIntIntoSpan("query-max-goroutine", perQueryMaxGoroutine, span)

log.Infof(ctx, "query sql async length:%d", length)
log.Debugf(ctx, "query sql async length:%d", length)

go func() {
defer func() { recvDone <- struct{}{} }()
Expand Down Expand Up @@ -203,7 +203,7 @@ func QueryInfosAsync(ctx context.Context, sqlInfos []SQLInfo, precision string,
if tables != nil && tables.Length() > 0 {

trace.InsertIntIntoSpan(fmt.Sprintf("table_num_%d", i), len(tables.Tables), span)
log.Infof(ctx,
log.Debugf(ctx,
"influxdb query info async:db:[%s], sql:[%s], table:[%d]", db, sql, len(tables.Tables),
)

Expand Down Expand Up @@ -231,7 +231,7 @@ func QueryInfosAsync(ctx context.Context, sqlInfos []SQLInfo, precision string,

trace.InsertIntIntoSpan("total_table_num", totalTables.Length(), span)

log.Infof(ctx, "influxdb query info async:%v, query total cost:%s", sqlInfos, time.Since(start))
log.Debugf(ctx, "influxdb query info async:%v, query total cost:%s", sqlInfos, time.Since(start))

// return mergeTablesInfo(totalTables), nil
// totalTables 后续的处理中有做Fill调整格式,同时做了去重。这里暂时先不做去重
Expand Down Expand Up @@ -264,7 +264,7 @@ func QueryAsync(ctx context.Context, sqlInfos []SQLInfo, precision string) (*Tab
trace.InsertIntIntoSpan("sql-nums", len(sqlInfos), span)
trace.InsertIntIntoSpan("query-max-goroutine", perQueryMaxGoroutine, span)

log.Infof(ctx, "query sql async length:%d", length)
log.Debugf(ctx, "query sql async length:%d", length)

go func() {
defer func() { recvDone <- struct{}{} }()
Expand Down
33 changes: 0 additions & 33 deletions pkg/unify-query/influxdb/influxdb_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"encoding/json"
"fmt"
"math"
"strings"
"sync"

goRedis "github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -423,38 +422,6 @@ func (r *Router) loadRouter(ctx context.Context, key string) error {
return err
}

func (r *Router) GetProxyByTableID(tableId, field string, isProxy bool) (*influxdb.Proxy, error) {
r.lock.RLock()
defer r.lock.RUnlock()

route := strings.Split(tableId, ".")
if len(route) != 2 {
return nil, fmt.Errorf("tableid format is wrong %s", tableId)
}

var ckList []string
if isProxy {
// 判断是否需要路由
ckList = []string{
fmt.Sprintf("%s.%s", route[0], field),
tableId,
fmt.Sprintf("%s.__default__", route[0]),
}
} else {
ckList = []string{
tableId,
}
}

for _, ck := range ckList {
if v, ok := r.proxyInfo[ck]; ok {
return v, nil
}
}

return nil, fmt.Errorf("influxdb proxy router is empty, with talbeID: %s, field: %s", tableId, field)
}

func GetTagRouter(ctx context.Context, tagsKey []string, condition string) (string, error) {
if len(tagsKey) == 0 || condition == "" {
return "", nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/unify-query/influxdb/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (i *Instance) QueryInfos(ctx context.Context, metricName, db, stmt, precisi
trace.InsertStringIntoSpan("query-sql", stmt, span)
trace.InsertStringIntoSpan("query-cost", startAnaylize.Sub(startQuery).String(), span)

log.Infof(ctx,
log.Debugf(ctx,
fmt.Sprintf("influxdb query:[%s][%s], query cost:%s", db, stmt, startAnaylize.Sub(startQuery)),
)
if resp == nil {
Expand Down Expand Up @@ -173,7 +173,7 @@ func (i *Instance) QueryInfos(ctx context.Context, metricName, db, stmt, precisi

trace.InsertStringIntoSpan("analyzer-cost", time.Since(startAnaylize).String(), span)

log.Infof(ctx,
log.Debugf(ctx,
"influxdb query:[%s][%s], result anaylize cost:%s", db, stmt, time.Since(startAnaylize),
)

Expand Down Expand Up @@ -290,7 +290,7 @@ func (i *Instance) Query(
startAnaylize = time.Now()

trace.InsertStringIntoSpan("query-cost", startAnaylize.Sub(startQuery).String(), span)
log.Infof(ctx, "influxdb query:%s, query cost:%s", stmt, startAnaylize.Sub(startQuery))
log.Debugf(ctx, "influxdb query:%s, query cost:%s", stmt, startAnaylize.Sub(startQuery))
if resp == nil {
log.Warnf(ctx, "query:%s get nil response", stmt)
return nil, errors.New("get nil response")
Expand Down Expand Up @@ -351,7 +351,7 @@ func (i *Instance) Query(

trace.InsertStringIntoSpan("analyzer_cost", time.Since(startAnaylize).String(), span)

log.Infof(ctx, fmt.Sprintf(
log.Debugf(ctx, fmt.Sprintf(
"influxdb query:%s, result anaylize cost:%s, result num: %d, series num: %d, point num: %d",
stmt, time.Since(startAnaylize), resultNum, seriesNum, pointNum,
))
Expand Down
Loading

0 comments on commit cc3dfcd

Please sign in to comment.