Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more logging and tracing #183

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion impl/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func run() error {
case sig := <-shutdown:
logrus.WithContext(ctx).WithField("signal", sig.String()).Info("shutdown signal received")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err = s.Shutdown(ctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions impl/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func GetDefaultConfig() Config {
BootstrapPeers: GetDefaultBootstrapPeers(),
},
PkarrConfig: PkarrServiceConfig{
RepublishCRON: "0 */2 * * *",
RepublishCRON: "0 */3 * * *",
CacheTTLSeconds: 600,
CacheSizeLimitMB: 500,
CacheSizeLimitMB: 1000,
},
Log: LogConfig{
Level: logrus.DebugLevel.String(),
Expand Down
2 changes: 1 addition & 1 deletion impl/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ bootstrap_peers = ["router.magnets.im:6881", "router.bittorrent.com:6881", "dht.
"router.utorrent.com:6881", "router.nuh.dev:6881"]

[pkarr]
republish_cron = "0 */2 * * *" # every 2 hours
republish_cron = "0 */3 * * *" # every 3 hours
cache_ttl_seconds = 600 # 10 minutes
cache_size_limit_mb = 1000 # 1000 MB
2 changes: 1 addition & 1 deletion impl/integrationtest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func doRequest(ctx context.Context, req *http.Request) error {
"url": req.URL,
})

ctx, done := context.WithTimeout(ctx, time.Second*10)
ctx, done := context.WithTimeout(ctx, 10*time.Second)
defer done()

req = req.WithContext(ctx)
Expand Down
10 changes: 9 additions & 1 deletion impl/internal/dht/getput.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/anacrolix/dht/v2/bep44"
"github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/dht/v2/traversal"

"github.com/TBD54566975/did-dht-method/pkg/telemetry"
)

// Copied from https://github.com/anacrolix/dht/blob/master/exts/getput/getput.go and modified
Expand All @@ -38,7 +40,7 @@ func startGetTraversal(
Alpha: 15,
Target: target,
DoQuery: func(ctx context.Context, addr krpc.NodeAddr) traversal.QueryResult {
queryCtx, cancel := context.WithTimeout(ctx, time.Second*8)
queryCtx, cancel := context.WithTimeout(ctx, 8*time.Second)
defer cancel()

res := s.Get(queryCtx, dht.NewAddr(addr.UDP()), target, seq, dht.QueryRateLimiting{})
Expand Down Expand Up @@ -101,6 +103,9 @@ func Get(
) (
ret FullGetResult, stats *traversal.Stats, err error,
) {
ctx, span := telemetry.GetTracer().Start(ctx, "DHT.Get")
defer span.End()

vChan, op, err := startGetTraversal(ctx, target, s, seq, salt)
if err != nil {
return
Expand Down Expand Up @@ -139,6 +144,9 @@ func Put(
) (
stats *traversal.Stats, err error,
) {
ctx, span := telemetry.GetTracer().Start(ctx, "DHT.Put")
defer span.End()

vChan, op, err := startGetTraversal(ctx, target, s,
// When we do a get traversal for a put, we don't care what seq the peers have?
nil,
Expand Down
4 changes: 2 additions & 2 deletions impl/internal/util/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func (h *TraceHook) Fire(entry *logrus.Entry) error {
traceID := span.SpanContext().TraceID().String()
spanID := span.SpanContext().SpanID().String()

entry.Data["traceID"] = traceID
entry.Data["spanID"] = spanID
entry.Data["trace_id"] = traceID
entry.Data["span_id"] = spanID

return nil
}
30 changes: 20 additions & 10 deletions impl/pkg/server/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"
)

// logger is the logrus logger handler amended for telemetry
Expand Down Expand Up @@ -45,17 +46,26 @@ func logger(logger logrus.FieldLogger, notLogged ...string) gin.HandlerFunc {
return
}

traceID, spanID := "", ""
span := trace.SpanFromContext(c)
if span.SpanContext().IsValid() {
traceID = span.SpanContext().TraceID().String()
spanID = span.SpanContext().SpanID().String()
}

entry := logger.WithFields(logrus.Fields{
"hostname": hostname,
"statusCode": statusCode,
"latency": latency,
"clientIP": clientIP,
"method": c.Request.Method,
"path": path,
"referer": referer,
"dataLength": dataLength,
"userAgent": clientUserAgent,
"time": time.Now().Format(time.RFC3339),
"hostname": hostname,
"status_code": statusCode,
"latency": latency,
"client_ip": clientIP,
"method": c.Request.Method,
"path": path,
"referer": referer,
"data_length": dataLength,
"user_agent": clientUserAgent,
"time": time.Now().Format(time.RFC3339),
"trace_id": traceID,
"span_id": spanID,
})

if len(c.Errors) > 0 {
Expand Down
24 changes: 17 additions & 7 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/allegro/bigcache/v3"
"github.com/anacrolix/torrent/bencode"
"github.com/goccy/go-json"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/tv42/zbase32"

Expand Down Expand Up @@ -147,17 +148,23 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
if got, err := s.cache.Get(id); err == nil {
var resp pkarr.Response
if err = json.Unmarshal(got, &resp); err == nil {
logrus.WithContext(ctx).WithField("record_id", id).Debug("resolved pkarr record from cache")
logrus.WithContext(ctx).WithField("record_id", id).Info("resolved pkarr record from cache")
return &resp, nil
}
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from cache, falling back to dht")
}

// next do a dht lookup
got, err := s.dht.GetFull(ctx, id)
// next do a dht lookup with a timeout of 10 seconds
getCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

got, err := s.dht.GetFull(getCtx, id)
if err != nil {
// try to resolve from storage before returning and error
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from dht, attempting to resolve from storage")
if errors.Is(err, context.DeadlineExceeded) {
logrus.WithContext(ctx).WithField("record", id).Warn("dht lookup timed out, attempting to resolve from storage")
} else {
logrus.WithContext(ctx).WithError(err).WithField("record", id).Warn("failed to get pkarr record from dht, attempting to resolve from storage")
}

rawID, err := util.Z32Decode(id)
if err != nil {
Expand All @@ -176,8 +183,9 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
return nil, err
}

logrus.WithContext(ctx).WithField("record", id).Debug("resolved pkarr record from storage")
logrus.WithContext(ctx).WithField("record", id).Info("resolved pkarr record from storage")
resp := record.Response()
// add the record back to the cache for future lookups
if err = s.addRecordToCache(id, record.Response()); err != nil {
logrus.WithError(err).WithField("record", id).Error("failed to set pkarr record in cache")
}
Expand All @@ -203,6 +211,8 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response
// add the record to cache, do it here to avoid duplicate calculations
if err = s.addRecordToCache(id, resp); err != nil {
logrus.WithContext(ctx).WithError(err).Errorf("failed to set pkarr record[%s] in cache", id)
} else {
logrus.WithContext(ctx).WithField("record", id).Info("added pkarr record back to cache")
}

return &resp, nil
Expand Down Expand Up @@ -267,7 +277,7 @@ func (s *PkarrService) republish() {
recordID := zbase32.EncodeToString(record.Key[:])
logrus.WithContext(ctx).Debugf("republishing record: %s", recordID)

putCtx, cancel := context.WithTimeout(ctx, time.Second*10)
putCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

if _, putErr := s.dht.Put(putCtx, record.BEP44()); putErr != nil {
Expand Down
14 changes: 11 additions & 3 deletions impl/pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -23,12 +24,13 @@ const (
)

var (
traceProvider *sdktrace.TracerProvider
tracer trace.Tracer

traceProvider *sdktrace.TracerProvider
meterProvider *sdkmetric.MeterProvider
propagator propagation.TextMapPropagator
)

// SetupTelemetry initializes the OpenTelemetry SDK with the appropriate exporters and propagators.
func SetupTelemetry(ctx context.Context) error {
r, err := resource.Merge(
resource.Default(),
Expand All @@ -55,14 +57,19 @@ func SetupTelemetry(ctx context.Context) error {
otel.SetMeterProvider(meterProvider)

// setup memory metrics
err = runtime.Start(runtime.WithMinimumReadMemStatsInterval(time.Second * 30))
err = runtime.Start(runtime.WithMeterProvider(meterProvider), runtime.WithMinimumReadMemStatsInterval(time.Second*15))
if err != nil {
return err
}

// setup propagator
propagator = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
otel.SetTextMapPropagator(propagator)

return nil
}

// Shutdown stops the telemetry providers and exporters safely.
func Shutdown(ctx context.Context) {
if traceProvider != nil {
if err := traceProvider.Shutdown(ctx); err != nil {
Expand All @@ -77,6 +84,7 @@ func Shutdown(ctx context.Context) {
}
}

// GetTracer returns the tracer for the application. If the tracer is not yet initialized, it will be created.
func GetTracer() trace.Tracer {
if tracer == nil {
tracer = otel.GetTracerProvider().Tracer(scopeName, trace.WithInstrumentationVersion(config.Version))
Expand Down
Loading