Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: track latency for each kind of requests #131

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions api/types/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (r *ResponseErrorStats) Merge(from *ResponseErrorStats) {
type ResponseStats struct {
// ErrorStats means summary of errors.
ErrorStats ResponseErrorStats
// Latencies stores all the observed latencies.
Latencies []float64
// LatenciesByURL stores all the observed latencies for each request.
LatenciesByURL map[string][]float64
// TotalReceivedBytes is total bytes read from apiserver.
TotalReceivedBytes int64
}
Expand All @@ -80,10 +80,12 @@ type RunnerMetricReport struct {
ErrorStats ResponseErrorStats `json:"errorStats"`
// TotalReceivedBytes is total bytes read from apiserver.
TotalReceivedBytes int64 `json:"totalReceivedBytes"`
// Latencies stores all the observed latencies.
Latencies []float64 `json:"latencies,omitempty"`
// LatenciesByURL stores all the observed latencies.
LatenciesByURL map[string][]float64 `json:"latenciesByURL,omitempty"`
// PercentileLatencies represents the latency distribution in seconds.
PercentileLatencies [][2]float64 `json:"percentileLatencies,omitempty"`
// PercentileLatenciesByURL represents the latency distribution in seconds per request.
PercentileLatenciesByURL map[string][][2]float64 `json:"percentileLatenciesByURL,omitempty"`
}

// TODO(weifu): build brand new struct for RunnerGroupsReport to include more
Expand Down
34 changes: 24 additions & 10 deletions cmd/kperf/commands/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,35 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) {
// printResponseStats prints types.RunnerMetricReport into underlying file.
func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Result) error {
output := types.RunnerMetricReport{
Total: stats.Total,
ErrorStats: stats.ErrorStats,
Duration: stats.Duration.String(),
Latencies: stats.Latencies,
TotalReceivedBytes: stats.TotalReceivedBytes,
PercentileLatencies: metrics.BuildPercentileLatencies(stats.Latencies),
Total: stats.Total,
ErrorStats: stats.ErrorStats,
Duration: stats.Duration.String(),
TotalReceivedBytes: stats.TotalReceivedBytes,

PercentileLatenciesByURL: map[string][][2]float64{},
}

encoder := json.NewEncoder(f)
encoder.SetIndent("", " ")
total := 0
for _, latencies := range stats.LatenciesByURL {
total += len(latencies)
}
latencies := make([]float64, 0, total)
for _, l := range stats.LatenciesByURL {
latencies = append(latencies, l...)
}
output.PercentileLatencies = metrics.BuildPercentileLatencies(latencies)

if !rawDataFlagIncluded {
output.Latencies = nil
for u, l := range stats.LatenciesByURL {
output.PercentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(l)
}

if rawDataFlagIncluded {
output.LatenciesByURL = stats.LatenciesByURL
}

encoder := json.NewEncoder(f)
encoder.SetIndent("", " ")

err := encoder.Encode(output)
if err != nil {
return fmt.Errorf("failed to encode json: %w", err)
Expand Down
39 changes: 25 additions & 14 deletions metrics/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// ResponseMetric is a measurement related to http response.
type ResponseMetric interface {
// ObserveLatency observes latency.
ObserveLatency(seconds float64)
ObserveLatency(url string, seconds float64)
// ObserveFailure observes failure response.
ObserveFailure(err error)
// ObserveReceivedBytes observes the bytes read from apiserver.
Expand All @@ -21,24 +21,30 @@ type ResponseMetric interface {
}

type responseMetricImpl struct {
mu sync.Mutex
errorStats *types.ResponseErrorStats
latencies *list.List
receivedBytes int64
mu sync.Mutex
errorStats *types.ResponseErrorStats
receivedBytes int64
latenciesByURLs map[string]*list.List
}

func NewResponseMetric() ResponseMetric {
return &responseMetricImpl{
latencies: list.New(),
errorStats: types.NewResponseErrorStats(),
errorStats: types.NewResponseErrorStats(),
latenciesByURLs: map[string]*list.List{},
}
}

// ObserveLatency implements ResponseMetric.
func (m *responseMetricImpl) ObserveLatency(seconds float64) {
func (m *responseMetricImpl) ObserveLatency(url string, seconds float64) {
m.mu.Lock()
defer m.mu.Unlock()
m.latencies.PushBack(seconds)

l, ok := m.latenciesByURLs[url]
if !ok {
m.latenciesByURLs[url] = list.New()
l = m.latenciesByURLs[url]
}
l.PushBack(seconds)
}

// ObserveFailure implements ResponseMetric.
Expand Down Expand Up @@ -73,17 +79,22 @@ func (m *responseMetricImpl) ObserveReceivedBytes(bytes int64) {
func (m *responseMetricImpl) Gather() types.ResponseStats {
return types.ResponseStats{
ErrorStats: m.dumpErrorStats(),
Latencies: m.dumpLatencies(),
LatenciesByURL: m.dumpLatencies(),
TotalReceivedBytes: atomic.LoadInt64(&m.receivedBytes),
}
}

func (m *responseMetricImpl) dumpLatencies() []float64 {
func (m *responseMetricImpl) dumpLatencies() map[string][]float64 {
m.mu.Lock()
defer m.mu.Unlock()
res := make([]float64, 0, m.latencies.Len())
for e := m.latencies.Front(); e != nil; e = e.Next() {
res = append(res, e.Value.(float64))

res := make(map[string][]float64)
for u, latencies := range m.latenciesByURLs {
res[u] = make([]float64, 0, latencies.Len())

for e := latencies.Front(); e != nil; e = e.Next() {
res[u] = append(res[u], e.Value.(float64))
}
}
return res
}
Expand Down
2 changes: 1 addition & 1 deletion request/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I
klog.V(5).Infof("Request stream failed: %v", err)
return
}
respMetric.ObserveLatency(latency)
respMetric.ObserveLatency(req.URL().String(), latency)
}()
}
}(cli)
Expand Down
36 changes: 28 additions & 8 deletions runner/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func buildNetListeners(addrs []string) (_ []net.Listener, retErr error) {
// buildRunnerGroupSummary returns aggrecated summary from runner groups' report.
func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *types.RunnerMetricReport {
totalBytes := int64(0)
latencies := list.New()
totalResp := 0
latenciesByURL := map[string]*list.List{}
errStats := types.NewResponseErrorStats()
maxDuration := 0 * time.Second

Expand Down Expand Up @@ -90,8 +91,16 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type
totalBytes += report.TotalReceivedBytes

// update latencies
for _, v := range report.Latencies {
latencies.PushBack(v)
for u, l := range report.LatenciesByURL {
latencies, ok := latenciesByURL[u]
if !ok {
latenciesByURL[u] = list.New()
latencies = latenciesByURL[u]
}
for _, v := range l {
totalResp++
latencies.PushBack(v)
}
}

// update error stats
Expand All @@ -109,12 +118,23 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type
}
}

percentileLatenciesByURL := map[string][][2]float64{}

latencies := make([]float64, 0, totalResp)
for u, l := range latenciesByURL {
lInSlice := listToSliceFloat64(l)

latencies = append(latencies, lInSlice...)
percentileLatenciesByURL[u] = metrics.BuildPercentileLatencies(lInSlice)
}

return &types.RunnerMetricReport{
Total: latencies.Len(),
ErrorStats: *errStats,
Duration: maxDuration.String(),
TotalReceivedBytes: totalBytes,
PercentileLatencies: metrics.BuildPercentileLatencies(listToSliceFloat64(latencies)),
Total: totalResp,
ErrorStats: *errStats,
Duration: maxDuration.String(),
TotalReceivedBytes: totalBytes,
PercentileLatencies: metrics.BuildPercentileLatencies(latencies),
PercentileLatenciesByURL: percentileLatenciesByURL,
}
}

Expand Down
Loading