diff --git a/src/workerd/api/kv.c++ b/src/workerd/api/kv.c++ index 1e5a9d9fc83..e8593275a14 100644 --- a/src/workerd/api/kv.c++ +++ b/src/workerd/api/kv.c++ @@ -95,7 +95,8 @@ kj::Own KvNamespace::getHttpClient(IoContext& context, KJ_UNREACHABLE; }(); - auto client = context.getHttpClient(subrequestChannel, true, kj::none, operationName); + auto client = context.getHttpClientWithSpans( + subrequestChannel, true, kj::none, operationName, {{"db.system"_kjc, "cloudflare-kv"_kjc}}); headers.add(FLPROD_405_HEADER, urlStr); for (const auto& header: additionalHeaders) { headers.add(header.name.asPtr(), header.value.asPtr()); diff --git a/src/workerd/api/r2-admin.c++ b/src/workerd/api/r2-admin.c++ index a8213b35bed..f4eaa78cb28 100644 --- a/src/workerd/api/r2-admin.c++ +++ b/src/workerd/api/r2-admin.c++ @@ -26,7 +26,9 @@ jsg::Ref R2Admin::get(jsg::Lock& js, kj::String bucketName) { jsg::Promise> R2Admin::create( jsg::Lock& js, kj::String name, const jsg::TypeHandler>& errorType) { auto& context = IoContext::current(); - auto client = context.getHttpClient(subrequestChannel, true, kj::none, "r2_create"_kjc); + // TODO(o11y): Add cloudflare.r2.bucket here. + auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_create"_kjc, + {{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "CreateBucket"_kjc}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -57,7 +59,8 @@ jsg::Promise R2Admin::list(jsg::Lock& js, const jsg::TypeHandler>& errorType, CompatibilityFlags::Reader flags) { auto& context = IoContext::current(); - auto client = context.getHttpClient(subrequestChannel, true, kj::none, "r2_list"_kjc); + auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_list"_kjc, + {{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "ListObjects"_kjc}}); capnp::JsonCodec json; json.handleByAnnotation(); @@ -113,7 +116,9 @@ jsg::Promise R2Admin::list(jsg::Lock& js, jsg::Promise R2Admin::delete_( jsg::Lock& js, kj::String name, const jsg::TypeHandler>& errorType) { auto& context = IoContext::current(); - auto client = context.getHttpClient(subrequestChannel, true, kj::none, "r2_delete"_kjc); + // TODO(o11y): Add cloudflare.r2.bucket + auto client = context.getHttpClientWithSpans(subrequestChannel, true, kj::none, "r2_delete"_kjc, + {{"rpc.service"_kjc, "r2"_kjc}, {"rpc.method"_kjc, "DeleteBucket"_kjc}}); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 0d0d1b46631..47e61c1cd3c 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -833,6 +833,26 @@ kj::Own IoContext::getSubrequestChannel( }); } +kj::Own IoContext::getSubrequestChannelSpans(uint channel, + bool isInHouse, + kj::Maybe cfBlobJson, + kj::ConstString operationName, + std::initializer_list tags) { + return getSubrequest( + [&](TraceContext& tracing, IoChannelFactory& channelFactory) { + for (const SpanTagParams& tag: tags) { + tracing.userSpan.setTag(kj::mv(tag.key), kj::str(tag.value)); + } + return getSubrequestChannelImpl( + channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory); + }, + SubrequestOptions{ + .inHouse = isInHouse, + .wrapMetrics = !isInHouse, + .operationName = kj::mv(operationName), + }); +} + kj::Own IoContext::getSubrequestChannelNoChecks(uint channel, bool isInHouse, kj::Maybe cfBlobJson, @@ -871,6 +891,15 @@ kj::Own IoContext::getHttpClient( getSubrequestChannel(channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName))); } +kj::Own IoContext::getHttpClientWithSpans(uint channel, + bool isInHouse, + kj::Maybe cfBlobJson, + kj::ConstString operationName, + std::initializer_list tags) { + return asHttpClient(getSubrequestChannelSpans( + channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName), kj::mv(tags))); +} + kj::Own IoContext::getHttpClientNoChecks(uint channel, bool isInHouse, kj::Maybe cfBlobJson, diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index b44a6aa07d5..e146af2c0fa 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -25,6 +25,8 @@ #include #include +#include + namespace workerd { class LimitEnforcer; } @@ -696,6 +698,20 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler kj::Maybe cfBlobJson, kj::ConstString operationName); + // As above, but with list of span tags to add. + // TODO(o11y): For now this only supports literal values based on initializer_list constraints. + // Add syntactic sugar to kj::vector so that we can pass in a vector more ergonomically and use + // that instead to support other value types. + struct SpanTagParams { + kj::LiteralStringConst key; + kj::LiteralStringConst value; + }; + kj::Own getSubrequestChannelSpans(uint channel, + bool isInHouse, + kj::Maybe cfBlobJson, + kj::ConstString operationName, + std::initializer_list tags); + // Like getSubrequestChannel() but doesn't enforce limits. Use for trusted paths only. kj::Own getSubrequestChannelNoChecks(uint channel, bool isInHouse, @@ -709,6 +725,13 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler kj::Maybe cfBlobJson, kj::ConstString operationName); + // As above, but with list of span tags to add, analogous to getSubrequestChannelSpans(). + kj::Own getHttpClientWithSpans(uint channel, + bool isInHouse, + kj::Maybe cfBlobJson, + kj::ConstString operationName, + std::initializer_list tags); + // Convenience methods that call getSubrequest*() and adapt the returned WorkerInterface objects // to HttpClient. kj::Own getHttpClientNoChecks(uint channel, diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 753690988f0..e55c13da402 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -1551,7 +1551,7 @@ Worker::Worker(kj::Own scriptParam, kj::FunctionParam target)> compileBindings, IsolateObserver::StartType startType, - SpanParent parentSpan, + TraceParentContext spans, LockType lockType, kj::Maybe errorReporter, kj::Maybe startupTime) @@ -1574,7 +1574,7 @@ Worker::Worker(kj::Own scriptParam, }); auto maybeMakeSpan = [&](auto operationName) -> SpanBuilder { - auto span = parentSpan.newChild(kj::mv(operationName)); + auto span = spans.parentSpan.newChild(kj::mv(operationName)); if (span.isObserved()) { span.setTag("truncated_script_id"_kjc, truncateScriptId(script->getId())); } @@ -1641,6 +1641,8 @@ Worker::Worker(kj::Own scriptParam, // Execute script. currentSpan = maybeMakeSpan("lw:top_level_execution"_kjc); + SpanBuilder currentUserSpan = + spans.userParentSpan.newChild("lw:top_level_execution"_kjc); KJ_SWITCH_ONEOF(script->impl->unboundScriptOrMainModule) { KJ_CASE_ONEOF(unboundScript, jsg::NonModuleScript) { diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index 0bc9d383520..9a4492a645a 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -97,7 +97,7 @@ class Worker: public kj::AtomicRefcounted { kj::FunctionParam target)> compileBindings, IsolateObserver::StartType startType, - SpanParent parentSpan, + TraceParentContext spans, LockType lockType, kj::Maybe errorReporter = kj::none, kj::Maybe startupTime = kj::none); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index a1140dc73cd..e89ae329a0a 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -2972,7 +2972,7 @@ kj::Own Server::makeWorker(kj::StringPtr name, [&](jsg::Lock& lock, const Worker::Api& api, v8::Local target) { return WorkerdApi::from(api).compileGlobals(lock, globals, target, 1); }, IsolateObserver::StartType::COLD, - nullptr, // systemTracer -- TODO(beta): factor out + TraceParentContext(nullptr, nullptr), // systemTracer -- TODO(beta): factor out Worker::Lock::TakeSynchronously(kj::none), errorReporter); { diff --git a/src/workerd/tests/test-fixture.c++ b/src/workerd/tests/test-fixture.c++ index 3aec147aefd..b42efdc584a 100644 --- a/src/workerd/tests/test-fixture.c++ +++ b/src/workerd/tests/test-fixture.c++ @@ -349,7 +349,7 @@ TestFixture::TestFixture(SetupParams&& params) // no bindings, nothing to do }, IsolateObserver::StartType::COLD, - nullptr /* parentSpan */, + TraceParentContext(nullptr, nullptr), /* spans */ Worker::LockType(Worker::Lock::TakeSynchronously(kj::none)))), errorHandler(kj::heap()), waitUntilTasks(*errorHandler),