Skip to content

Commit

Permalink
Add tracing logs for Nexus HTTP request retries (#7186)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Added additional HTTP tracing logs for the first 3 Nexus request
retries.
Both minimum and maximum attempts to add additional tracing info to are
configurable.

## Why?
<!-- Tell your future self why have you made these changes -->
To aid in debugging.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
pdoerner authored Feb 6, 2025
1 parent f34f860 commit 2640635
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 5 deletions.
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func Timestamp(timestamp time.Time) ZapTag {
return NewTimeTag("timestamp", timestamp)
}

// RequestID returns tag for RequestID
func RequestID(requestID string) ZapTag {
return NewStringTag("request-id", requestID)
}

// ========== Workflow tags defined here: ( wf is short for workflow) ==========

// WorkflowAction returns tag for WorkflowAction
Expand Down
183 changes: 183 additions & 0 deletions common/nexus/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// The MIT License
//
// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package nexus

import (
"crypto/tls"
"net/http/httptrace"
"time"

"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
)

type HTTPClientTraceProvider interface {
// NewTrace returns a *httptrace.ClientTrace which provides hooks to invoke at each point in the HTTP request
// lifecycle. This trace must be added to the HTTP request context using httptrace.WithClientTrace for the hooks to
// be invoked. The provided logger should already be tagged with relevant request information
// e.g. using log.With(logger, tag.RequestID(id), tag.Operation(op), ...).
NewTrace(attempt int32, logger log.Logger) *httptrace.ClientTrace
}

// HTTPTraceConfig is the dynamic config for controlling Nexus HTTP request tracing behavior.
// The default is nil and the conversion function does not do any actual conversion because this should be wrapped by
// a dynamicconfig.NewGlobalCachedTypedValue with the actual conversion function so that it is cached.
var HTTPTraceConfig = dynamicconfig.NewGlobalTypedSettingWithConverter(
"system.nexusHTTPTraceConfig",
func(a any) (any, error) { return a, nil },
nil,
`Configuration options for controlling additional tracing for Nexus HTTP requests. Fields: Enabled, MinAttempt, MaxAttempt, Hooks. See HTTPClientTraceConfig comments for more detail.`,
)

type HTTPClientTraceConfig struct {
// Enabled controls whether any additional tracing will be invoked. Default false.
Enabled bool
// MinAttempt is the first operation attempt to include additional tracing. Default 2. Setting to 0 or 1 will add tracing to all requests and may be expensive.
MinAttempt int32
// MaxAttempt is the maximum operation attempt to include additional tracing. Default 2. Setting to 0 means no maximum.
MaxAttempt int32
// Hooks is the list of method names to invoke with extra tracing. See httptrace.ClientTrace for more detail.
// Defaults to all implemented hooks: GetConn, GotConn, ConnectStart, ConnectDone, DNSStart, DNSDone, TLSHandshakeStart, TLSHandshakeDone, WroteRequest, GotFirstResponseByte.
Hooks []string
}

var defaultHTTPClientTraceConfig = HTTPClientTraceConfig{
Enabled: false,
MinAttempt: 2,
MaxAttempt: 2,
// Set to nil here because of dynamic config conversion limitations.
Hooks: []string(nil),
}

var defaultHTTPClientTraceHooks = []string{"GetConn", "GotConn", "ConnectStart", "ConnectDone", "DNSStart", "DNSDone", "TLSHandshakeStart", "TLSHandshakeDone", "WroteRequest", "GotFirstResponseByte"}

func convertHTTPClientTraceConfig(in any) (HTTPClientTraceConfig, error) {
cfg, err := dynamicconfig.ConvertStructure(defaultHTTPClientTraceConfig)(in)
if err != nil {
cfg = defaultHTTPClientTraceConfig
}
if len(cfg.Hooks) == 0 {
cfg.Hooks = defaultHTTPClientTraceHooks
}
return cfg, nil
}

type LoggedHTTPClientTraceProvider struct {
Config *dynamicconfig.GlobalCachedTypedValue[HTTPClientTraceConfig]
}

func NewLoggedHTTPClientTraceProvider(dc *dynamicconfig.Collection) HTTPClientTraceProvider {
return &LoggedHTTPClientTraceProvider{
Config: dynamicconfig.NewGlobalCachedTypedValue(dc, HTTPTraceConfig, convertHTTPClientTraceConfig),
}
}

//nolint:revive // cognitive complexity (> 25 max) but is just adding a logging function for each method in the list.
func (p *LoggedHTTPClientTraceProvider) NewTrace(attempt int32, logger log.Logger) *httptrace.ClientTrace {
config := p.Config.Get()
if !config.Enabled {
return nil
}
if attempt < config.MinAttempt {
return nil
}
if config.MaxAttempt > 0 && attempt > config.MaxAttempt {
return nil
}

clientTrace := &httptrace.ClientTrace{}
for _, h := range config.Hooks {
switch h {
case "GetConn":
clientTrace.GetConn = func(hostPort string) {
logger.Info("attempting to get HTTP connection for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Address(hostPort))
}
case "GotConn":
clientTrace.GotConn = func(info httptrace.GotConnInfo) {
logger.Info("got HTTP connection for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.NewBoolTag("reused", info.Reused),
tag.NewBoolTag("was-idle", info.WasIdle),
tag.NewDurationTag("idle-time", info.IdleTime))
}
case "ConnectStart":
clientTrace.ConnectStart = func(network, addr string) {
logger.Info("starting dial for new connection for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Address(addr),
tag.NewStringTag("network", network))
}
case "ConnectDone":
clientTrace.ConnectDone = func(network, addr string, err error) {
logger.Info("finished dial for new connection for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Address(addr),
tag.NewStringTag("network", network),
tag.Error(err))
}
case "DNSStart":
clientTrace.DNSStart = func(info httptrace.DNSStartInfo) {
logger.Info("starting DNS lookup for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Host(info.Host))
}
case "DNSDone":
clientTrace.DNSDone = func(info httptrace.DNSDoneInfo) {
addresses := make([]string, len(info.Addrs))
for i, a := range info.Addrs {
addresses[i] = a.String()
}
logger.Info("finished DNS lookup for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.Addresses(addresses),
tag.Error(info.Err),
tag.NewBoolTag("coalesced", info.Coalesced))
}
case "TLSHandshakeStart":
clientTrace.TLSHandshakeStart = func() {
logger.Info("starting TLS handshake for Nexus request", tag.Timestamp(time.Now().UTC()))
}
case "TLSHandshakeDone":
clientTrace.TLSHandshakeDone = func(state tls.ConnectionState, err error) {
logger.Info("finished TLS handshake for Nexus request",
tag.Timestamp(time.Now().UTC()),
tag.NewBoolTag("handshake-complete", state.HandshakeComplete),
tag.Error(err))
}
case "WroteRequest":
clientTrace.WroteRequest = func(info httptrace.WroteRequestInfo) {
logger.Info("finished writing Nexus HTTP request",
tag.Timestamp(time.Now().UTC()),
tag.Error(info.Err))
}
case "GotFirstResponseByte":
clientTrace.GotFirstResponseByte = func() {
logger.Info("got response to Nexus HTTP request", tag.AttemptEnd(time.Now().UTC()))
}
}
}
return clientTrace
}
5 changes: 5 additions & 0 deletions components/callbacks/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/queues"
Expand Down Expand Up @@ -66,6 +67,7 @@ type TaskExecutorOptions struct {
MetricsHandler metrics.Handler
Logger log.Logger
HTTPCallerProvider HTTPCallerProvider
HTTPTraceProvider commonnexus.HTTPClientTraceProvider
HistoryClient resource.HistoryClient
}

Expand Down Expand Up @@ -168,6 +170,9 @@ func (e taskExecutor) loadInvocationArgs(
nexusInvokable := nexusInvocation{}
nexusInvokable.nexus = variant.Nexus
nexusInvokable.completion, err = target.GetNexusCompletion(ctx)
nexusInvokable.workflowID = ref.WorkflowKey.WorkflowID
nexusInvokable.runID = ref.WorkflowKey.RunID
nexusInvokable.attempt = callback.Attempt
invokable = nexusInvokable
if err != nil {
return err
Expand Down
23 changes: 21 additions & 2 deletions components/callbacks/nexus_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"fmt"
"io"
"net/http"
"net/http/httptrace"
"slices"
"time"

"github.com/nexus-rpc/sdk-go/nexus"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand All @@ -48,8 +50,10 @@ type CanGetNexusCompletion interface {
}

type nexusInvocation struct {
nexus *persistencespb.Callback_Nexus
completion nexus.OperationCompletion
nexus *persistencespb.Callback_Nexus
completion nexus.OperationCompletion
workflowID, runID string
attempt int32
}

func isRetryableHTTPResponse(response *http.Response) bool {
Expand All @@ -74,6 +78,21 @@ func (n nexusInvocation) WrapError(result invocationResult, err error) error {
}

func (n nexusInvocation) Invoke(ctx context.Context, ns *namespace.Namespace, e taskExecutor, task InvocationTask) invocationResult {
if e.HTTPTraceProvider != nil {
traceLogger := log.With(e.Logger,
tag.WorkflowNamespace(ns.Name().String()),
tag.Operation("CompleteNexusOperation"),
tag.NewStringTag("destination", task.destination),
tag.WorkflowID(n.workflowID),
tag.WorkflowRunID(n.runID),
tag.AttemptStart(time.Now().UTC()),
tag.Attempt(n.attempt),
)
if trace := e.HTTPTraceProvider.NewTrace(n.attempt, traceLogger); trace != nil {
ctx = httptrace.WithClientTrace(ctx, trace)
}
}

request, err := nexus.NewCompletionHTTPRequest(ctx, n.nexus.Url, n.completion)
if err != nil {
return invocationResultFail{queues.NewUnprocessableTaskError(
Expand Down
41 changes: 38 additions & 3 deletions components/nexusoperations/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"errors"
"fmt"
"net/http/httptrace"
"strings"
"sync/atomic"
"text/template"
Expand Down Expand Up @@ -67,6 +68,7 @@ type TaskExecutorOptions struct {
CallbackTokenGenerator *commonnexus.CallbackTokenGenerator
ClientProvider ClientProvider
EndpointRegistry commonnexus.EndpointRegistry
HTTPTraceProvider commonnexus.HTTPClientTraceProvider
}

func RegisterExecutor(
Expand Down Expand Up @@ -201,6 +203,22 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()

if e.HTTPTraceProvider != nil {
traceLogger := log.With(e.Logger,
tag.WorkflowNamespace(ns.Name().String()),
tag.RequestID(args.requestID),
tag.Operation(args.operation),
tag.Endpoint(args.endpointName),
tag.WorkflowID(ref.WorkflowKey.WorkflowID),
tag.WorkflowRunID(ref.WorkflowKey.RunID),
tag.AttemptStart(time.Now().UTC()),
tag.Attempt(task.Attempt),
)
if trace := e.HTTPTraceProvider.NewTrace(task.Attempt, traceLogger); trace != nil {
callCtx = httptrace.WithClientTrace(callCtx, trace)
}
}

startTime := time.Now()
var rawResult *nexus.ClientStartOperationResult[*nexus.LazyValue]
var callErr error
Expand Down Expand Up @@ -546,6 +564,22 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro
callCtx, cancel := context.WithTimeout(ctx, callTimeout)
defer cancel()

if e.HTTPTraceProvider != nil {
traceLogger := log.With(e.Logger,
tag.WorkflowNamespace(ns.Name().String()),
tag.RequestID(args.requestID),
tag.Operation(args.operation),
tag.Endpoint(args.endpointName),
tag.WorkflowID(ref.WorkflowKey.WorkflowID),
tag.WorkflowRunID(ref.WorkflowKey.RunID),
tag.AttemptStart(time.Now().UTC()),
tag.Attempt(task.Attempt),
)
if trace := e.HTTPTraceProvider.NewTrace(task.Attempt, traceLogger); trace != nil {
callCtx = httptrace.WithClientTrace(callCtx, trace)
}
}

var callErr error
startTime := time.Now()
if callTimeout < e.Config.MinOperationTimeout(ns.Name().String()) {
Expand Down Expand Up @@ -576,9 +610,9 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro
}

type cancelArgs struct {
service, operation, token, endpointID, endpointName string
scheduledTime time.Time
scheduleToCloseTimeout time.Duration
service, operation, token, endpointID, endpointName, requestID string
scheduledTime time.Time
scheduleToCloseTimeout time.Duration
}

// loadArgsForCancelation loads state from the operation state machine that's the parent of the cancelation machine the
Expand All @@ -599,6 +633,7 @@ func (e taskExecutor) loadArgsForCancelation(ctx context.Context, env hsm.Enviro
args.token = op.OperationToken
args.endpointID = op.EndpointId
args.endpointName = op.Endpoint
args.requestID = op.RequestId
args.scheduledTime = op.ScheduledTime.AsTime()
args.scheduleToCloseTimeout = op.ScheduleToCloseTimeout.AsDuration()
return nil
Expand Down
2 changes: 2 additions & 0 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
commonnexus "go.temporal.io/server/common/nexus"
persistenceClient "go.temporal.io/server/common/persistence/client"
"go.temporal.io/server/common/persistence/visibility"
"go.temporal.io/server/common/persistence/visibility/manager"
Expand Down Expand Up @@ -93,6 +94,7 @@ var Module = fx.Options(
fx.Provide(ServerProvider),
fx.Provide(NewService),
fx.Provide(ReplicationProgressCacheProvider),
fx.Provide(commonnexus.NewLoggedHTTPClientTraceProvider),
fx.Invoke(ServiceLifetimeHooks),

callbacks.Module,
Expand Down

0 comments on commit 2640635

Please sign in to comment.