Skip to content

Commit

Permalink
perf: opt query metric
Browse files Browse the repository at this point in the history
  • Loading branch information
juexiaolin(林觉霄) authored and juexiaolin committed Dec 17, 2020
1 parent 681a20a commit 2ee85f9
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 81 deletions.
16 changes: 11 additions & 5 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
DefaultDelaySeconds = 300
DefaultRelodIntervalMinutes = 60
DefaultRateLimit = 15
DefaultQueryMetricBatchSize = 50

EnvAccessKey = "TENCENTCLOUD_SECRET_ID"
EnvSecretKey = "TENCENTCLOUD_SECRET_KEY"
Expand Down Expand Up @@ -103,11 +104,12 @@ func (p *TencentProduct) IsReloadEnable() bool {
}

type TencentConfig struct {
Credential TencentCredential `yaml:"credential"`
Metrics []TencentMetric `yaml:"metrics"`
Products []TencentProduct `yaml:"products"`
RateLimit float64 `yaml:"rate_limit"`
Filename string `yaml:"filename"`
Credential TencentCredential `yaml:"credential"`
Metrics []TencentMetric `yaml:"metrics"`
Products []TencentProduct `yaml:"products"`
RateLimit float64 `yaml:"rate_limit"`
MetricQueryBatchSize int `yaml:"metric_query_batch_size"`
Filename string `yaml:"filename"`
}

func NewConfig() *TencentConfig {
Expand Down Expand Up @@ -193,6 +195,10 @@ func (c *TencentConfig) fillDefault() {
c.RateLimit = DefaultRateLimit
}

if c.MetricQueryBatchSize <= 0 || c.MetricQueryBatchSize > 100 {
c.MetricQueryBatchSize = DefaultQueryMetricBatchSize
}

for index, metric := range c.Metrics {
if metric.PeriodSeconds == 0 {
c.Metrics[index].PeriodSeconds = DefaultPeriodSeconds
Expand Down
6 changes: 6 additions & 0 deletions pkg/constant/cache.go
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
package constant

import "time"

const (
DefaultReloadInterval = 60 * time.Minute
)
5 changes: 5 additions & 0 deletions pkg/constant/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package constant

const (
DefaultQueryMetricBatchSize = 50
)
4 changes: 0 additions & 4 deletions pkg/instance/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ import (
"github.com/go-kit/kit/log/level"
)

const (
DefaultReloadInterval = 60 * time.Minute
)

// 可用于产品的实例的缓存, TcInstanceRepository
type TcInstanceCache struct {
Raw TcInstanceRepository
Expand Down
177 changes: 105 additions & 72 deletions pkg/metric/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"golang.org/x/time/rate"

monitor "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/monitor/v20180724"
"github.com/tencentyun/tencentcloud-exporter/pkg/client"
"github.com/tencentyun/tencentcloud-exporter/pkg/config"
"golang.org/x/time/rate"
)

var (
Expand All @@ -33,7 +34,10 @@ type TcmMetricRepositoryImpl struct {
monitorClient *monitor.Client
limiter *rate.Limiter // 限速
ctx context.Context
logger log.Logger

queryMetricBatchSize int

logger log.Logger
}

func (repo *TcmMetricRepositoryImpl) GetMeta(namespace string, name string) (meta *TcmMeta, err error) {
Expand Down Expand Up @@ -136,83 +140,111 @@ func (repo *TcmMetricRepositoryImpl) GetSamples(s *TcmSeries, st int64, et int64
return
}

func (repo *TcmMetricRepositoryImpl) ListSamples(m *TcmMetric, st int64, et int64) (samplesList []*TcmSamples, err error) {
for _, seriesList := range m.GetSeriesSplitByBatch(10) {
ctx, cancel := context.WithCancel(repo.ctx)
err = repo.limiter.Wait(ctx)
func (repo *TcmMetricRepositoryImpl) ListSamples(m *TcmMetric, st int64, et int64) ([]*TcmSamples, error) {
var samplesList []*TcmSamples
for _, seriesList := range m.GetSeriesSplitByBatch(repo.queryMetricBatchSize) {
sl, err := repo.listSampleByBatch(m, seriesList, st, et)
if err != nil {
return
level.Error(repo.logger).Log("msg", err.Error())
continue
}
samplesList = append(samplesList, sl...)
}
return samplesList, nil
}

request := monitor.NewGetMonitorDataRequest()
request.Namespace = &m.Meta.Namespace
request.MetricName = &m.Meta.MetricName

period := uint64(m.Conf.StatPeriodSeconds)
request.Period = &period

for _, series := range seriesList {
ifilters := &monitor.Instance{
Dimensions: []*monitor.Dimension{},
}
for k, v := range series.QueryLabels {
tk := k
tv := v
ifilters.Dimensions = append(ifilters.Dimensions, &monitor.Dimension{Name: &tk, Value: &tv})
}
request.Instances = append(request.Instances, ifilters)
}
func (repo *TcmMetricRepositoryImpl) listSampleByBatch(
m *TcmMetric,
seriesList []*TcmSeries,
st int64,
et int64,
) ([]*TcmSamples, error) {
var samplesList []*TcmSamples

stStr := time.Unix(st, 0).Format(timeStampFormat)
request.StartTime = &stStr
if et != 0 {
etStr := time.Unix(et, 0).Format(timeStampFormat)
request.StartTime = &etStr
}
ctx, cancel := context.WithCancel(repo.ctx)
defer cancel()

response, err := repo.monitorClient.GetMonitorData(request)
if err != nil {
return nil, err
err := repo.limiter.Wait(ctx)
if err != nil {
return nil, err
}

request := repo.buildGetMonitorDataRequest(m, seriesList, st, et)
response, err := repo.monitorClient.GetMonitorData(request)
if err != nil {
return nil, err
}

for _, points := range response.Response.DataPoints {
samples, ql, e := repo.buildSamples(m, points)
if e != nil {
level.Debug(repo.logger).Log(
"msg", e.Error(),
"metric", m.Meta.MetricName,
"dimension", fmt.Sprintf("%v", ql))
continue
}
samplesList = append(samplesList, samples)
}
return samplesList, nil
}

for _, points := range response.Response.DataPoints {
ql := map[string]string{}
for _, dimension := range points.Dimensions {
if *dimension.Value != "" {
ql[*dimension.Name] = *dimension.Value
}
}
sid, e := GetTcmSeriesId(m, ql)
if e != nil {
level.Warn(repo.logger).Log(
"msg", "Get series id fail",
"metric", m.Meta.MetricName,
"dimension", fmt.Sprintf("%v", ql))
continue
}
s, ok := m.Series[sid]
if !ok {
level.Warn(repo.logger).Log(
"msg", "Response data point not match series",
"metric", m.Meta.MetricName,
"dimension", fmt.Sprintf("%v", ql))
continue
}
samples, e := NewTcmSamples(s, points)
if e != nil {
level.Debug(repo.logger).Log(
"msg", "The instance has no metric data and may not have traffic",
"metric", m.Meta.MetricName,
"dimension", fmt.Sprintf("%v", ql))
} else {
samplesList = append(samplesList, samples)
}
func (repo *TcmMetricRepositoryImpl) buildGetMonitorDataRequest(
m *TcmMetric,
seriesList []*TcmSeries,
st int64, et int64,
) *monitor.GetMonitorDataRequest {
request := monitor.NewGetMonitorDataRequest()
request.Namespace = &m.Meta.Namespace
request.MetricName = &m.Meta.MetricName

period := uint64(m.Conf.StatPeriodSeconds)
request.Period = &period

for _, series := range seriesList {
ifilters := &monitor.Instance{
Dimensions: []*monitor.Dimension{},
}
for k, v := range series.QueryLabels {
tk := k
tv := v
ifilters.Dimensions = append(ifilters.Dimensions, &monitor.Dimension{Name: &tk, Value: &tv})
}
request.Instances = append(request.Instances, ifilters)
}

stStr := time.Unix(st, 0).Format(timeStampFormat)
request.StartTime = &stStr
if et != 0 {
etStr := time.Unix(et, 0).Format(timeStampFormat)
request.StartTime = &etStr
}
return request
}

cancel()
func (repo *TcmMetricRepositoryImpl) buildSamples(
m *TcmMetric,
points *monitor.DataPoint,
) (*TcmSamples, map[string]string, error) {
ql := map[string]string{}
for _, dimension := range points.Dimensions {
if *dimension.Value != "" {
ql[*dimension.Name] = *dimension.Value
}
}
return
sid, e := GetTcmSeriesId(m, ql)
if e != nil {
return nil, ql, fmt.Errorf("get series id fail")
}
s, ok := m.Series[sid]
if !ok {
return nil, ql, fmt.Errorf("response data point not match series")
}
samples, e := NewTcmSamples(s, points)
if e != nil {
return nil, ql, fmt.Errorf("this instance may not have metric data")
}
return samples, ql, nil
}

func NewTcmMetricRepository(conf *config.TencentConfig, logger log.Logger) (repo TcmMetricRepository, err error) {
Expand All @@ -222,10 +254,11 @@ func NewTcmMetricRepository(conf *config.TencentConfig, logger log.Logger) (repo
}

repo = &TcmMetricRepositoryImpl{
monitorClient: monitorClient,
limiter: rate.NewLimiter(rate.Limit(conf.RateLimit), 1),
ctx: context.Background(),
logger: logger,
monitorClient: monitorClient,
limiter: rate.NewLimiter(rate.Limit(conf.RateLimit), 1),
ctx: context.Background(),
queryMetricBatchSize: conf.MetricQueryBatchSize,
logger: logger,
}

return
Expand Down

0 comments on commit 2ee85f9

Please sign in to comment.