Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into backend-tls
Browse files Browse the repository at this point in the history
  • Loading branch information
realityone committed Jul 16, 2024
2 parents 4074cfb + 324d93f commit ad50278
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 35 deletions.
8 changes: 4 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (f *FileLoader) mergePriorityConfig(dst *configv1.Gateway) error {
if err != nil {
return err
}
replaceOrAppendEndpoint := MakeReplaceOrAppendEndpointFn(dst.Endpoints)
replaceOrPrependEndpoint := MakeReplaceOrPrependEndpointFn(dst.Endpoints)
for _, e := range entrys {
if e.IsDir() {
continue
Expand All @@ -160,7 +160,7 @@ func (f *FileLoader) mergePriorityConfig(dst *configv1.Gateway) error {
continue
}
for _, e := range pCfg.Endpoints {
dst.Endpoints = replaceOrAppendEndpoint(dst.Endpoints, e)
dst.Endpoints = replaceOrPrependEndpoint(dst.Endpoints, e)
}
log.Infof("succeeded to merge priority config: %s, %d endpoints effected", cfgPath, len(pCfg.Endpoints))
}
Expand All @@ -183,7 +183,7 @@ func (f *FileLoader) parsePriorityConfig(cfgPath string) (*configv1.PriorityConf
return out, nil
}

func MakeReplaceOrAppendEndpointFn(origin []*configv1.Endpoint) func([]*configv1.Endpoint, *configv1.Endpoint) []*configv1.Endpoint {
func MakeReplaceOrPrependEndpointFn(origin []*configv1.Endpoint) func([]*configv1.Endpoint, *configv1.Endpoint) []*configv1.Endpoint {
keyFn := func(e *configv1.Endpoint) string {
return fmt.Sprintf("%s-%s", e.Method, e.Path)
}
Expand All @@ -194,7 +194,7 @@ func MakeReplaceOrAppendEndpointFn(origin []*configv1.Endpoint) func([]*configv1
return func(dst []*configv1.Endpoint, item *configv1.Endpoint) []*configv1.Endpoint {
idx, ok := index[keyFn(item)]
if !ok {
return append(dst, item)
return append([]*configv1.Endpoint{item}, dst...)
}
dst[idx] = item
return dst
Expand Down
62 changes: 31 additions & 31 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func writeError(w http.ResponseWriter, r *http.Request, err error, labels middle
log.Errorf("Failed to handle request: %s: %+v", r.URL.String(), err)
statusCode = 502
}
requestsTotalIncr(labels, statusCode)
requestsTotalIncr(r, labels, statusCode)
if labels.Protocol() == config.Protocol_GRPC.String() {
// see https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
code := strconv.Itoa(int(status.ToGRPCCode(statusCode)))
Expand Down Expand Up @@ -192,22 +192,22 @@ func (p *Proxy) buildMiddleware(ms []*config.Middleware, next http.RoundTripper)
return next, nil
}

func splitRetryMetricsHandler(e *config.Endpoint) (func(int), func(int, error)) {
func splitRetryMetricsHandler(e *config.Endpoint) (func(*http.Request, int), func(*http.Request, int, error)) {
labels := middleware.NewMetricsLabels(e)
success := func(i int) {
success := func(req *http.Request, i int) {
if i <= 0 {
return
}
retryStateIncr(labels, true)
retryStateIncr(req, labels, true)
}
failed := func(i int, err error) {
failed := func(req *http.Request, i int, err error) {
if i <= 0 {
return
}
if errors.Is(err, context.Canceled) {
return
}
retryStateIncr(labels, false)
retryStateIncr(req, labels, false)
}
return success, failed
}
Expand Down Expand Up @@ -236,14 +236,14 @@ func (p *Proxy) buildEndpoint(buildCtx *client.BuildContext, e *config.Endpoint,
labels := middleware.NewMetricsLabels(e)
markSuccessStat, markFailedStat := splitRetryMetricsHandler(e)
retryBreaker := sre.NewBreaker(sre.WithSuccess(0.8))
markSuccess := func(i int) {
markSuccessStat(i)
markSuccess := func(req *http.Request, i int) {
markSuccessStat(req, i)
if i > 0 {
retryBreaker.MarkSuccess()
}
}
markFailed := func(i int, err error) {
markFailedStat(i, err)
markFailed := func(req *http.Request, i int, err error) {
markFailedStat(req, i, err)
if i > 0 {
retryBreaker.MarkFailed()
}
Expand All @@ -257,15 +257,15 @@ func (p *Proxy) buildEndpoint(buildCtx *client.BuildContext, e *config.Endpoint,
ctx, cancel := context.WithTimeout(ctx, retryStrategy.timeout)
defer cancel()
defer func() {
requestsDurationObserve(labels, time.Since(startTime).Seconds())
requestsDurationObserve(req, labels, time.Since(startTime).Seconds())
}()

body, err := io.ReadAll(req.Body)
if err != nil {
writeError(w, req, err, labels)
return
}
receivedBytesAdd(labels, int64(len(body)))
receivedBytesAdd(req, labels, int64(len(body)))
req.GetBody = func() (io.ReadCloser, error) {
reader := bytes.NewReader(body)
return io.NopCloser(reader), nil
Expand All @@ -278,7 +278,7 @@ func (p *Proxy) buildEndpoint(buildCtx *client.BuildContext, e *config.Endpoint,
break
}
if err := retryBreaker.Allow(); err != nil {
markFailed(i, err)
markFailed(req, i, err)
break
}
}
Expand All @@ -288,7 +288,7 @@ func (p *Proxy) buildEndpoint(buildCtx *client.BuildContext, e *config.Endpoint,
}
// canceled or deadline exceeded
if err = ctx.Err(); err != nil {
markFailed(i, err)
markFailed(req, i, err)
break
}
tryCtx, cancel := p.Interceptors.prepareAttemptTimeoutContext(ctx, req, retryStrategy.perTryTimeout)
Expand All @@ -297,16 +297,16 @@ func (p *Proxy) buildEndpoint(buildCtx *client.BuildContext, e *config.Endpoint,
req.Body = io.NopCloser(reader)
resp, err = tripper.RoundTrip(req.Clone(tryCtx))
if err != nil {
markFailed(i, err)
markFailed(req, i, err)
log.Errorf("Attempt at [%d/%d], failed to handle request: %s: %+v", i+1, retryStrategy.attempts, req.URL.String(), err)
continue
}
if !judgeRetryRequired(retryStrategy.conditions, resp) {
reqOpts.LastAttempt = true
markSuccess(i)
markSuccess(req, i)
break
}
markFailed(i, errors.New("assertion failed"))
markFailed(req, i, errors.New("assertion failed"))
// continue the retry loop
}
if err != nil {
Expand All @@ -328,11 +328,11 @@ func (p *Proxy) buildEndpoint(buildCtx *client.BuildContext, e *config.Endpoint,
sent, err := io.Copy(w, resp.Body)
if err != nil {
reqOpts.DoneFunc(ctx, selector.DoneInfo{Err: err})
sentBytesAdd(labels, sent)
sentBytesAdd(req, labels, sent)
log.Errorf("Failed to copy backend response body to client: [%s] %s %s %d %+v\n", e.Protocol, e.Method, e.Path, sent, err)
return false
}
sentBytesAdd(labels, sent)
sentBytesAdd(req, labels, sent)
reqOpts.DoneFunc(ctx, selector.DoneInfo{ReplyMD: getReplyMD(e, resp)})
// see https://pkg.go.dev/net/http#example-ResponseWriter-Trailers
for k, v := range resp.Trailer {
Expand All @@ -341,7 +341,7 @@ func (p *Proxy) buildEndpoint(buildCtx *client.BuildContext, e *config.Endpoint,
return true
}
doCopyBody()
requestsTotalIncr(labels, resp.StatusCode)
requestsTotalIncr(req, labels, resp.StatusCode)
}), closer, nil
}

Expand All @@ -352,28 +352,28 @@ func getReplyMD(ep *config.Endpoint, resp *http.Response) selector.ReplyMD {
return resp.Header
}

func receivedBytesAdd(labels middleware.MetricsLabels, received int64) {
_metricReceivedBytes.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath()).Add(float64(received))
func receivedBytesAdd(req *http.Request, labels middleware.MetricsLabels, received int64) {
_metricReceivedBytes.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath()).Add(float64(received))
}

func sentBytesAdd(labels middleware.MetricsLabels, sent int64) {
_metricSentBytes.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath()).Add(float64(sent))
func sentBytesAdd(req *http.Request, labels middleware.MetricsLabels, sent int64) {
_metricSentBytes.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath()).Add(float64(sent))
}

func requestsTotalIncr(labels middleware.MetricsLabels, statusCode int) {
_metricRequestsTotal.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), strconv.Itoa(statusCode), labels.Service(), labels.BasePath()).Inc()
func requestsTotalIncr(req *http.Request, labels middleware.MetricsLabels, statusCode int) {
_metricRequestsTotal.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), strconv.Itoa(statusCode), labels.Service(), labels.BasePath()).Inc()
}

func requestsDurationObserve(labels middleware.MetricsLabels, seconds float64) {
_metricRequestsDuration.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath()).Observe(seconds)
func requestsDurationObserve(req *http.Request, labels middleware.MetricsLabels, seconds float64) {
_metricRequestsDuration.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath()).Observe(seconds)
}

func retryStateIncr(labels middleware.MetricsLabels, success bool) {
func retryStateIncr(req *http.Request, labels middleware.MetricsLabels, success bool) {
if success {
_metricRetryState.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath(), "true").Inc()
_metricRetryState.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath(), "true").Inc()
return
}
_metricRetryState.WithLabelValues(labels.Protocol(), labels.Method(), labels.Path(), labels.Service(), labels.BasePath(), "false").Inc()
_metricRetryState.WithLabelValues(labels.Protocol(), req.Method, labels.Path(), labels.Service(), labels.BasePath(), "false").Inc()
}

func closeOnError(closer io.Closer, err *error) {
Expand Down

0 comments on commit ad50278

Please sign in to comment.