From 3d20c9ce4ae780bec8feadeb285c76d92f8ac716 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Mon, 14 Oct 2024 22:30:30 +0000 Subject: [PATCH] *: track latency for each kind of requests Signed-off-by: Wei Fu --- api/types/metric.go | 10 +++++--- cmd/kperf/commands/runner/runner.go | 34 +++++++++++++++++-------- metrics/request.go | 39 ++++++++++++++++++----------- request/schedule.go | 2 +- runner/utils.go | 36 ++++++++++++++++++++------ 5 files changed, 84 insertions(+), 37 deletions(-) diff --git a/api/types/metric.go b/api/types/metric.go index f03ca38..ea3de3b 100644 --- a/api/types/metric.go +++ b/api/types/metric.go @@ -65,8 +65,8 @@ func (r *ResponseErrorStats) Merge(from *ResponseErrorStats) { type ResponseStats struct { // ErrorStats means summary of errors. ErrorStats ResponseErrorStats - // Latencies stores all the observed latencies. - Latencies []float64 + // LatenciesByURL stores all the observed latencies for each request. + LatenciesByURL map[string][]float64 // TotalReceivedBytes is total bytes read from apiserver. TotalReceivedBytes int64 } @@ -80,10 +80,12 @@ type RunnerMetricReport struct { ErrorStats ResponseErrorStats `json:"errorStats"` // TotalReceivedBytes is total bytes read from apiserver. TotalReceivedBytes int64 `json:"totalReceivedBytes"` - // Latencies stores all the observed latencies. - Latencies []float64 `json:"latencies,omitempty"` + // LatenciesByURL stores all the observed latencies. + LatenciesByURL map[string][]float64 `json:"latenciesByURL,omitempty"` // PercentileLatencies represents the latency distribution in seconds. PercentileLatencies [][2]float64 `json:"percentileLatencies,omitempty"` + // PercentileLatenciesByURL represents the latency distribution in seconds per request. + PercentileLatenciesByURL map[string][][2]float64 `json:"percentileLatenciesByURL,omitempty"` } // TODO(weifu): build brand new struct for RunnerGroupsReport to include more diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index 6a08dbb..eafaece 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -188,21 +188,35 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { // printResponseStats prints types.RunnerMetricReport into underlying file. func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Result) error { output := types.RunnerMetricReport{ - Total: stats.Total, - ErrorStats: stats.ErrorStats, - Duration: stats.Duration.String(), - Latencies: stats.Latencies, - TotalReceivedBytes: stats.TotalReceivedBytes, - PercentileLatencies: metrics.BuildPercentileLatencies(stats.Latencies), + Total: stats.Total, + ErrorStats: stats.ErrorStats, + Duration: stats.Duration.String(), + TotalReceivedBytes: stats.TotalReceivedBytes, + + PercentileLatenciesByURL: map[string][][2]float64{}, } - encoder := json.NewEncoder(f) - encoder.SetIndent("", " ") + total := 0 + for _, latencies := range stats.LatenciesByURL { + total += len(latencies) + } + latencies := make([]float64, 0, total) + for _, l := range stats.LatenciesByURL { + latencies = append(latencies, l...) + } + output.PercentileLatencies = metrics.BuildPercentileLatencies(latencies) - if !rawDataFlagIncluded { - output.Latencies = nil + for u, l := range stats.LatenciesByURL { + output.PercentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(l) } + if rawDataFlagIncluded { + output.LatenciesByURL = stats.LatenciesByURL + } + + encoder := json.NewEncoder(f) + encoder.SetIndent("", " ") + err := encoder.Encode(output) if err != nil { return fmt.Errorf("failed to encode json: %w", err) diff --git a/metrics/request.go b/metrics/request.go index 08c680c..08c6afc 100644 --- a/metrics/request.go +++ b/metrics/request.go @@ -11,7 +11,7 @@ import ( // ResponseMetric is a measurement related to http response. type ResponseMetric interface { // ObserveLatency observes latency. - ObserveLatency(seconds float64) + ObserveLatency(url string, seconds float64) // ObserveFailure observes failure response. ObserveFailure(err error) // ObserveReceivedBytes observes the bytes read from apiserver. @@ -21,24 +21,30 @@ type ResponseMetric interface { } type responseMetricImpl struct { - mu sync.Mutex - errorStats *types.ResponseErrorStats - latencies *list.List - receivedBytes int64 + mu sync.Mutex + errorStats *types.ResponseErrorStats + receivedBytes int64 + latenciesByURLs map[string]*list.List } func NewResponseMetric() ResponseMetric { return &responseMetricImpl{ - latencies: list.New(), - errorStats: types.NewResponseErrorStats(), + errorStats: types.NewResponseErrorStats(), + latenciesByURLs: map[string]*list.List{}, } } // ObserveLatency implements ResponseMetric. -func (m *responseMetricImpl) ObserveLatency(seconds float64) { +func (m *responseMetricImpl) ObserveLatency(url string, seconds float64) { m.mu.Lock() defer m.mu.Unlock() - m.latencies.PushBack(seconds) + + l, ok := m.latenciesByURLs[url] + if !ok { + m.latenciesByURLs[url] = list.New() + l = m.latenciesByURLs[url] + } + l.PushBack(seconds) } // ObserveFailure implements ResponseMetric. @@ -73,17 +79,22 @@ func (m *responseMetricImpl) ObserveReceivedBytes(bytes int64) { func (m *responseMetricImpl) Gather() types.ResponseStats { return types.ResponseStats{ ErrorStats: m.dumpErrorStats(), - Latencies: m.dumpLatencies(), + LatenciesByURL: m.dumpLatencies(), TotalReceivedBytes: atomic.LoadInt64(&m.receivedBytes), } } -func (m *responseMetricImpl) dumpLatencies() []float64 { +func (m *responseMetricImpl) dumpLatencies() map[string][]float64 { m.mu.Lock() defer m.mu.Unlock() - res := make([]float64, 0, m.latencies.Len()) - for e := m.latencies.Front(); e != nil; e = e.Next() { - res = append(res, e.Value.(float64)) + + res := make(map[string][]float64) + for u, latencies := range m.latenciesByURLs { + res[u] = make([]float64, 0, latencies.Len()) + + for e := latencies.Front(); e != nil; e = e.Next() { + res[u] = append(res[u], e.Value.(float64)) + } } return res } diff --git a/request/schedule.go b/request/schedule.go index 24de8be..f2cbc60 100644 --- a/request/schedule.go +++ b/request/schedule.go @@ -87,7 +87,7 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I klog.V(5).Infof("Request stream failed: %v", err) return } - respMetric.ObserveLatency(latency) + respMetric.ObserveLatency(req.URL().String(), latency) }() } }(cli) diff --git a/runner/utils.go b/runner/utils.go index a2c8329..2eb4a78 100644 --- a/runner/utils.go +++ b/runner/utils.go @@ -58,7 +58,8 @@ func buildNetListeners(addrs []string) (_ []net.Listener, retErr error) { // buildRunnerGroupSummary returns aggrecated summary from runner groups' report. func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *types.RunnerMetricReport { totalBytes := int64(0) - latencies := list.New() + totalResp := 0 + latenciesByURL := map[string]*list.List{} errStats := types.NewResponseErrorStats() maxDuration := 0 * time.Second @@ -90,8 +91,16 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type totalBytes += report.TotalReceivedBytes // update latencies - for _, v := range report.Latencies { - latencies.PushBack(v) + for u, l := range report.LatenciesByURL { + latencies, ok := latenciesByURL[u] + if !ok { + latenciesByURL[u] = list.New() + latencies = latenciesByURL[u] + } + for _, v := range l { + totalResp++ + latencies.PushBack(v) + } } // update error stats @@ -109,12 +118,23 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type } } + percentileLatenciesByURL := map[string][][2]float64{} + + latencies := make([]float64, 0, totalResp) + for u, l := range latenciesByURL { + lInSlice := listToSliceFloat64(l) + + latencies = append(latencies, lInSlice...) + percentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(lInSlice) + } + return &types.RunnerMetricReport{ - Total: latencies.Len(), - ErrorStats: *errStats, - Duration: maxDuration.String(), - TotalReceivedBytes: totalBytes, - PercentileLatencies: metrics.BuildPercentileLatencies(listToSliceFloat64(latencies)), + Total: totalResp, + ErrorStats: *errStats, + Duration: maxDuration.String(), + TotalReceivedBytes: totalBytes, + PercentileLatencies: metrics.BuildPercentileLatencies(latencies), + PercentileLatenciesByURL: percentileLatenciesByURL, } }