From a1825510abea7e14022d79e633190cbe35a6cd02 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Fri, 13 Dec 2024 21:50:04 +0000 Subject: [PATCH] *: refactor error stats Introduce ResponseError type to record error client received. It includes timestamp, timespan in seconds and error message. The RunnerMetricReport will export raw data about each response error. It can help us build view about error. ```go // ResponseError is the record about that error. type ResponseError struct { // Timestamp indicates when this error was received. Timestamp time.Time `json:"timestamp"` // Duration records timespan in seconds. Duration float64 `json:"duration"` // Type indicates that category to which the error belongs. Type ResponseErrorType `json:"type"` // Code only works when Type is http. Code int `json:"code,omitempty"` // Message shows error message for this error. // // NOTE: When Type is http, this field will be empty. Message string `json:"message,omitempty"` } ``` Signed-off-by: Wei Fu --- api/types/metric.go | 108 ++++++++---------------- cmd/kperf/commands/runner/runner.go | 3 +- metrics/request.go | 45 ++++++---- metrics/request_test.go | 125 +++++++++++++++++++++------- metrics/utils.go | 97 ++++++++++----------- request/schedule.go | 5 +- runner/utils.go | 17 +++- 7 files changed, 223 insertions(+), 177 deletions(-) diff --git a/api/types/metric.go b/api/types/metric.go index 6cf0fe4..8c89a05 100644 --- a/api/types/metric.go +++ b/api/types/metric.go @@ -3,71 +3,43 @@ package types -// HTTP2ErrorStats is the report about http2 error during testing. -type HTTP2ErrorStats struct { - // ConnectionErrors represents connection level errors. - ConnectionErrors map[string]int32 `json:"connectionErrors,omitempty"` - // StreamErrors represents stream level errors. - StreamErrors map[string]int32 `json:"streamErrors,omitempty"` -} - -// NewHTTP2ErrorStats returns new instance of HTTP2ErrorStats. -func NewHTTP2ErrorStats() *HTTP2ErrorStats { - return &HTTP2ErrorStats{ - ConnectionErrors: make(map[string]int32, 10), - StreamErrors: make(map[string]int32, 10), - } -} - -// ResponseErrorStats is the report about errors. -type ResponseErrorStats struct { - // UnknownErrors is all unknown errors. - UnknownErrors []string `json:"unknownErrors"` - // NetErrors is to track errors from net. - NetErrors map[string]int32 `json:"netErrors"` - // ResponseCodes records request number grouped by response - // code between 400 and 600. - ResponseCodes map[int]int32 `json:"responseCodes"` - // HTTP2Errors records http2 related errors. - HTTP2Errors HTTP2ErrorStats `json:"http2Errors"` -} - -// NewResponseErrorStats returns empty ResponseErrorStats. -func NewResponseErrorStats() *ResponseErrorStats { - return &ResponseErrorStats{ - UnknownErrors: make([]string, 0, 1024), - NetErrors: make(map[string]int32, 10), - ResponseCodes: map[int]int32{}, - HTTP2Errors: *NewHTTP2ErrorStats(), - } -} +import "time" -// Copy clones self. -func (r *ResponseErrorStats) Copy() ResponseErrorStats { - res := NewResponseErrorStats() +// ResponseErrorType is error type of response. +type ResponseErrorType string - res.UnknownErrors = make([]string, len(r.UnknownErrors)) - copy(res.UnknownErrors, r.UnknownErrors) - res.NetErrors = cloneMap(r.NetErrors) - res.ResponseCodes = cloneMap(r.ResponseCodes) - res.HTTP2Errors.ConnectionErrors = cloneMap(r.HTTP2Errors.ConnectionErrors) - res.HTTP2Errors.StreamErrors = cloneMap(r.HTTP2Errors.StreamErrors) - return *res -} +const ( + // ResponseErrorTypeUnknown indicates we don't have correct category for errors. + ResponseErrorTypeUnknown ResponseErrorType = "unknown" + // ResponseErrorTypeHTTP indicates that the response returns http code >= 400. + ResponseErrorTypeHTTP ResponseErrorType = "http" + // ResponseErrorTypeHTTP2Protocol indicates that error comes from http2 layer. + ResponseErrorTypeHTTP2Protocol ResponseErrorType = "http2-protocol" + // ResponseErrorTypeConnection indicates that error is related to connection. + // For instance, connection refused caused by server down. + ResponseErrorTypeConnection ResponseErrorType = "connection" +) -// Merge merges two ResponseErrorStats. -func (r *ResponseErrorStats) Merge(from *ResponseErrorStats) { - r.UnknownErrors = append(r.UnknownErrors, from.UnknownErrors...) - mergeMap(r.NetErrors, from.NetErrors) - mergeMap(r.ResponseCodes, from.ResponseCodes) - mergeMap(r.HTTP2Errors.ConnectionErrors, from.HTTP2Errors.ConnectionErrors) - mergeMap(r.HTTP2Errors.StreamErrors, from.HTTP2Errors.StreamErrors) +// ResponseError is the record about that error. +type ResponseError struct { + // Timestamp indicates when this error was received. + Timestamp time.Time `json:"timestamp"` + // Duration records timespan in seconds. + Duration float64 `json:"duration"` + // Type indicates that category to which the error belongs. + Type ResponseErrorType `json:"type"` + // Code only works when Type is http. + Code int `json:"code,omitempty"` + // Message shows error message for this error. + // + // NOTE: When Type is http, this field will be empty. + Message string `json:"message,omitempty"` } // ResponseStats is the report about benchmark result. type ResponseStats struct { - // ErrorStats means summary of errors. - ErrorStats ResponseErrorStats + // Errors stores all the observed errors. + Errors []ResponseError // LatenciesByURL stores all the observed latencies for each request. LatenciesByURL map[string][]float64 // TotalReceivedBytes is total bytes read from apiserver. @@ -79,8 +51,10 @@ type RunnerMetricReport struct { Total int `json:"total"` // Duration means the time of benchmark. Duration string `json:"duration"` - // ErrorStats means summary of errors. - ErrorStats ResponseErrorStats `json:"errorStats"` + // Errors stores all the observed errors. + Errors []ResponseError `json:"errors,omitempty"` + // ErrorStats means summary of errors group by type. + ErrorStats map[string]int32 `json:"errorStats,omitempty"` // TotalReceivedBytes is total bytes read from apiserver. TotalReceivedBytes int64 `json:"totalReceivedBytes"` // LatenciesByURL stores all the observed latencies. @@ -94,17 +68,3 @@ type RunnerMetricReport struct { // TODO(weifu): build brand new struct for RunnerGroupsReport to include more // information, like how many runner groups, service account and flow control. type RunnerGroupsReport = RunnerMetricReport - -func mergeMap[K comparable, V int32](to, from map[K]V) { - for key, value := range from { - to[key] += value - } -} - -func cloneMap[K comparable, V int32](src map[K]V) map[K]V { - res := map[K]V{} - for key, value := range src { - res[key] = value - } - return res -} diff --git a/cmd/kperf/commands/runner/runner.go b/cmd/kperf/commands/runner/runner.go index edf3c1f..d04546c 100644 --- a/cmd/kperf/commands/runner/runner.go +++ b/cmd/kperf/commands/runner/runner.go @@ -192,7 +192,7 @@ func loadConfig(cliCtx *cli.Context) (*types.LoadProfile, error) { func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Result) error { output := types.RunnerMetricReport{ Total: stats.Total, - ErrorStats: stats.ErrorStats, + ErrorStats: metrics.BuildErrorStatsGroupByType(stats.Errors), Duration: stats.Duration.String(), TotalReceivedBytes: stats.TotalReceivedBytes, @@ -215,6 +215,7 @@ func printResponseStats(f *os.File, rawDataFlagIncluded bool, stats *request.Res if rawDataFlagIncluded { output.LatenciesByURL = stats.LatenciesByURL + output.Errors = stats.Errors } encoder := json.NewEncoder(f) diff --git a/metrics/request.go b/metrics/request.go index 571c4fd..979ad22 100644 --- a/metrics/request.go +++ b/metrics/request.go @@ -7,6 +7,7 @@ import ( "container/list" "sync" "sync/atomic" + "time" "github.com/Azure/kperf/api/types" ) @@ -16,7 +17,7 @@ type ResponseMetric interface { // ObserveLatency observes latency. ObserveLatency(url string, seconds float64) // ObserveFailure observes failure response. - ObserveFailure(err error) + ObserveFailure(now time.Time, seconds float64, err error) // ObserveReceivedBytes observes the bytes read from apiserver. ObserveReceivedBytes(bytes int64) // Gather returns the summary. @@ -25,14 +26,14 @@ type ResponseMetric interface { type responseMetricImpl struct { mu sync.Mutex - errorStats *types.ResponseErrorStats + errors *list.List receivedBytes int64 latenciesByURLs map[string]*list.List } func NewResponseMetric() ResponseMetric { return &responseMetricImpl{ - errorStats: types.NewResponseErrorStats(), + errors: list.New(), latenciesByURLs: map[string]*list.List{}, } } @@ -51,7 +52,7 @@ func (m *responseMetricImpl) ObserveLatency(url string, seconds float64) { } // ObserveFailure implements ResponseMetric. -func (m *responseMetricImpl) ObserveFailure(err error) { +func (m *responseMetricImpl) ObserveFailure(now time.Time, seconds float64, err error) { if err == nil { return } @@ -59,18 +60,30 @@ func (m *responseMetricImpl) ObserveFailure(err error) { m.mu.Lock() defer m.mu.Unlock() - // HTTP2 -> TCP/TLS -> Unknown + oerr := types.ResponseError{ + Timestamp: now, + Duration: seconds, + } + + // HTTP Code -> HTTP2 -> Connection -> Unknown code := codeFromHTTP(err) + http2Err, isHTTP2Err := isHTTP2Error(err) + connErr, isConnErr := isConnectionError(err) switch { case code != 0: - m.errorStats.ResponseCodes[code]++ - case isHTTP2Error(err): - updateHTTP2ErrorStats(m.errorStats, err) - case isNetRelatedError(err): - updateNetErrors(m.errorStats, err) + oerr.Type = types.ResponseErrorTypeHTTP + oerr.Code = code + case isHTTP2Err: + oerr.Type = types.ResponseErrorTypeHTTP2Protocol + oerr.Message = http2Err + case isConnErr: + oerr.Type = types.ResponseErrorTypeConnection + oerr.Message = connErr default: - m.errorStats.UnknownErrors = append(m.errorStats.UnknownErrors, err.Error()) + oerr.Type = types.ResponseErrorTypeUnknown + oerr.Message = err.Error() } + m.errors.PushBack(oerr) } // ObserveReceivedBytes implements ResponseMetric. @@ -81,7 +94,7 @@ func (m *responseMetricImpl) ObserveReceivedBytes(bytes int64) { // Gather implements ResponseMetric. func (m *responseMetricImpl) Gather() types.ResponseStats { return types.ResponseStats{ - ErrorStats: m.dumpErrorStats(), + Errors: m.dumpErrors(), LatenciesByURL: m.dumpLatencies(), TotalReceivedBytes: atomic.LoadInt64(&m.receivedBytes), } @@ -102,9 +115,13 @@ func (m *responseMetricImpl) dumpLatencies() map[string][]float64 { return res } -func (m *responseMetricImpl) dumpErrorStats() types.ResponseErrorStats { +func (m *responseMetricImpl) dumpErrors() []types.ResponseError { m.mu.Lock() defer m.mu.Unlock() - return m.errorStats.Copy() + res := make([]types.ResponseError, 0, m.errors.Len()) + for e := m.errors.Front(); e != nil; e = e.Next() { + res = append(res, e.Value.(types.ResponseError)) + } + return res } diff --git a/metrics/request_test.go b/metrics/request_test.go index c22ca57..557923e 100644 --- a/metrics/request_test.go +++ b/metrics/request_test.go @@ -10,6 +10,7 @@ import ( "io" "syscall" "testing" + "time" "github.com/Azure/kperf/api/types" @@ -19,31 +20,99 @@ import ( ) func TestResponseMetric_ObserveFailure(t *testing.T) { - expectedStats := types.ResponseErrorStats{ - UnknownErrors: []string{ - "unknown", - }, - ResponseCodes: map[int]int32{ - 429: 1, - 500: 1, - 504: 1, - }, - NetErrors: map[string]int32{ - "net/http: TLS handshake timeout": 2, - "connection reset by peer": 1, - "connection refused": 1, - "unexpected EOF": 1, - "context deadline exceeded": 1, - }, - HTTP2Errors: types.HTTP2ErrorStats{ - ConnectionErrors: map[string]int32{ - "http2: client connection lost": 2, - "http2: server sent GOAWAY and closed the connection; ErrCode=NO_ERROR, debug=\"\"": 1, - "http2: server sent GOAWAY and closed the connection; ErrCode=PROTOCOL_ERROR, debug=\"\"": 1, - }, - StreamErrors: map[string]int32{ - "CONNECT_ERROR": 1, - }, + observedAt := time.Now() + dur := 10 * time.Second + + expectedErrors := []types.ResponseError{ + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP, + Code: 429, + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP, + Code: 500, + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP, + Code: 504, + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: "http2: server sent GOAWAY and closed the connection; ErrCode=NO_ERROR, debug=", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: "http2: server sent GOAWAY and closed the connection; ErrCode=PROTOCOL_ERROR, debug=", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: "http2: client connection lost", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: "http2: client connection lost", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeHTTP2Protocol, + Message: http2.ErrCode(10).String(), + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: "net/http: TLS handshake timeout", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: "net/http: TLS handshake timeout", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: "context deadline exceeded", + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: syscall.ECONNRESET.Error(), + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: syscall.ECONNREFUSED.Error(), + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeConnection, + Message: io.ErrUnexpectedEOF.Error(), + }, + { + Timestamp: observedAt, + Duration: dur.Seconds(), + Type: types.ResponseErrorTypeUnknown, + Message: "unknown", }, } @@ -82,8 +151,8 @@ func TestResponseMetric_ObserveFailure(t *testing.T) { m := NewResponseMetric() for _, err := range errs { - m.ObserveFailure(err) + m.ObserveFailure(observedAt, dur.Seconds(), err) } - stats := m.Gather().ErrorStats - assert.Equal(t, expectedStats, stats) + errors := m.Gather().Errors + assert.Equal(t, expectedErrors, errors) } diff --git a/metrics/utils.go b/metrics/utils.go index f0c4f3f..d2b9b14 100644 --- a/metrics/utils.go +++ b/metrics/utils.go @@ -15,7 +15,6 @@ import ( "syscall" "github.com/Azure/kperf/api/types" - "golang.org/x/net/http2" apierrors "k8s.io/apimachinery/pkg/api/errors" ) @@ -42,6 +41,23 @@ func BuildPercentileLatencies(latencies []float64) [][2]float64 { return res } +// BuildErrorStatsGroupByType summaries total count for each type of errors. +func BuildErrorStatsGroupByType(errors []types.ResponseError) map[string]int32 { + res := map[string]int32{} + + for _, err := range errors { + var key string + switch err.Type { + case types.ResponseErrorTypeHTTP: + key = fmt.Sprintf("%s/%d", err.Type, err.Code) + default: + key = fmt.Sprintf("%s/%s", err.Type, err.Message) + } + res[key]++ + } + return res +} + var ( // errHTTP2ClientConnectionLost is used to track unexported http2 error. errHTTP2ClientConnectionLost = errors.New("http2: client connection lost") @@ -95,84 +111,55 @@ func codeFromHTTP(err error) int { } } -// updateHTTP2ErrorStats updates stats if err is http2 error. -func updateHTTP2ErrorStats(stats *types.ResponseErrorStats, err error) { +// isHTTP2Error returns true if it's related to http2 error. +func isHTTP2Error(err error) (string, bool) { + if err == nil { + return "", false + } + if connErr, ok := err.(http2.ConnectionError); ok || errors.As(err, &connErr) { - stats.HTTP2Errors.ConnectionErrors[(http2.ErrCode(connErr)).String()]++ - return + return (http2.ErrCode(connErr)).String(), true } if streamErr, ok := err.(http2.StreamError); ok || errors.As(err, &streamErr) { - stats.HTTP2Errors.StreamErrors[streamErr.Code.String()]++ - return + return streamErr.Code.String(), true } if connErr, ok := err.(http2.GoAwayError); ok || errors.As(err, &connErr) { - stats.HTTP2Errors.ConnectionErrors[fmt.Sprintf("http2: server sent GOAWAY and closed the connection; ErrCode=%v, debug=%q", connErr.ErrCode, connErr.DebugData)]++ - return + return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; ErrCode=%v, debug=%s", + connErr.ErrCode, connErr.DebugData), true } if strings.Contains(err.Error(), errHTTP2ClientConnectionLost.Error()) { - stats.HTTP2Errors.ConnectionErrors[errHTTP2ClientConnectionLost.Error()]++ + return errHTTP2ClientConnectionLost.Error(), true } + return "", false } -// updateNetErrors updates stats if err is related net error. -func updateNetErrors(stats *types.ResponseErrorStats, err error) { +// isConnectionError returns true if it's related to connection error. +func isConnectionError(err error) (string, bool) { if err == nil { - return + return "", false } - errInStr := err.Error() switch { case isTimeoutError(err): - stats.NetErrors[err.Error()]++ - case errors.Is(err, io.ErrUnexpectedEOF): - stats.NetErrors[io.ErrUnexpectedEOF.Error()]++ + return err.Error(), true case isConnectionRefused(err): - stats.NetErrors[syscall.ECONNREFUSED.Error()]++ + return syscall.ECONNREFUSED.Error(), true case isConnectionResetByPeer(err): - stats.NetErrors[syscall.ECONNRESET.Error()]++ - case strings.Contains(errInStr, errTLSHandshakeTimeout.Error()): - stats.NetErrors[errTLSHandshakeTimeout.Error()]++ + return syscall.ECONNRESET.Error(), true + case errors.Is(err, io.ErrUnexpectedEOF): + return io.ErrUnexpectedEOF.Error(), true + case errors.Is(err, io.EOF): + return io.EOF.Error(), true + case strings.Contains(err.Error(), errTLSHandshakeTimeout.Error()): + return errTLSHandshakeTimeout.Error(), true default: - // TODO(weifu): add more categories. + return "", false } } -// isHTTP2Error returns true if it's related to http2 error. -func isHTTP2Error(err error) bool { - if err == nil { - return false - } - - if connErr, ok := err.(http2.ConnectionError); ok || errors.As(err, &connErr) { - return true - } - - if streamErr, ok := err.(http2.StreamError); ok || errors.As(err, &streamErr) { - return true - } - - if connErr, ok := err.(http2.GoAwayError); ok || errors.As(err, &connErr) { - return true - } - - if strings.Contains(err.Error(), errHTTP2ClientConnectionLost.Error()) { - return true - } - return false -} - -// isNetRelatedError returns true if it's related to net error. -func isNetRelatedError(err error) bool { - return err != nil && (isTimeoutError(err) || - isConnectionRefused(err) || - isConnectionResetByPeer(err) || - errors.Is(err, io.ErrUnexpectedEOF) || - strings.Contains(err.Error(), errTLSHandshakeTimeout.Error())) -} - // isTimeoutError returns true if it's related to golang standard library // net's timeout error. func isTimeoutError(err error) bool { diff --git a/request/schedule.go b/request/schedule.go index ac06fd1..80f345a 100644 --- a/request/schedule.go +++ b/request/schedule.go @@ -104,11 +104,12 @@ func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.I err = nil } } - latency := time.Since(start).Seconds() + end := time.Now() + latency := end.Sub(start).Seconds() respMetric.ObserveReceivedBytes(bytes) if err != nil { - respMetric.ObserveFailure(err) + respMetric.ObserveFailure(end, latency, err) klog.V(5).Infof("Request stream failed: %v", err) return } diff --git a/runner/utils.go b/runner/utils.go index 18ef5df..7ea6ff8 100644 --- a/runner/utils.go +++ b/runner/utils.go @@ -63,7 +63,8 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type totalBytes := int64(0) totalResp := 0 latenciesByURL := map[string]*list.List{} - errStats := types.NewResponseErrorStats() + errs := []types.ResponseError{} + errStats := map[string]int32{} maxDuration := 0 * time.Second for idx := range groups { @@ -107,7 +108,9 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type } // update error stats - errStats.Merge(&report.ErrorStats) + mergeErrorStat(errStats, report.ErrorStats) + errs = append(errs, report.Errors...) + report.Errors = nil // update max duration rDur, err := time.ParseDuration(report.Duration) @@ -133,7 +136,8 @@ func buildRunnerGroupSummary(s *localstore.Store, groups []*group.Handler) *type return &types.RunnerMetricReport{ Total: totalResp, - ErrorStats: *errStats, + Errors: errs, + ErrorStats: errStats, Duration: maxDuration.String(), TotalReceivedBytes: totalBytes, PercentileLatencies: metrics.BuildPercentileLatencies(latencies), @@ -150,6 +154,13 @@ func listToSliceFloat64(l *list.List) []float64 { return res } +// mergeErrorStat merges two error stats. +func mergeErrorStat(s, d map[string]int32) { + for e, n := range d { + s[e] += n + } +} + // readBlob reads blob data from localstore. func readBlob(s *localstore.Store, ref string) ([]byte, error) { r, err := s.OpenReader(ref)