diff --git a/src/workerd/api/cache.c++ b/src/workerd/api/cache.c++ index 288f9a4416a..475a4d31395 100644 --- a/src/workerd/api/cache.c++ +++ b/src/workerd/api/cache.c++ @@ -518,7 +518,8 @@ jsg::Promise Cache::delete_( kj::Own Cache::getHttpClient( IoContext& context, kj::Maybe cfBlobJson, kj::ConstString operationName) { - auto span = context.makeTraceSpan(kj::mv(operationName)); + auto span = context.makeTraceSpan(kj::ConstString(kj::str(operationName))); + auto limeSpan = context.makeLimeTraceSpan(kj::mv(operationName)); auto cacheClient = context.getCacheClient(); auto httpClient = cacheName diff --git a/src/workerd/api/kv.c++ b/src/workerd/api/kv.c++ index 1e5a9d9fc83..db4650407ab 100644 --- a/src/workerd/api/kv.c++ +++ b/src/workerd/api/kv.c++ @@ -82,6 +82,8 @@ kj::Own KvNamespace::getHttpClient(IoContext& context, switch (opType) { case LimitEnforcer::KvOpType::GET: return "kv_get"_kjc; + case LimitEnforcer::KvOpType::GET_WITH: + return "kv_getWithMetadata"_kjc; case LimitEnforcer::KvOpType::PUT: return "kv_put"_kjc; case LimitEnforcer::KvOpType::LIST: @@ -95,7 +97,10 @@ kj::Own KvNamespace::getHttpClient(IoContext& context, KJ_UNREACHABLE; }(); - auto client = context.getHttpClient(subrequestChannel, true, kj::none, operationName); + auto client = context.getHttpClientSpans( + subrequestChannel, true, kj::none, operationName, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("db.system"_kjc, kj::str("cloudflare-kv"_kj)); + }); headers.add(FLPROD_405_HEADER, urlStr); for (const auto& header: additionalHeaders) { headers.add(header.name.asPtr(), header.value.asPtr()); @@ -147,7 +152,9 @@ jsg::Promise KvNamespace::getWithMetadata( auto urlStr = url.toString(kj::Url::Context::HTTP_PROXY_REQUEST); auto headers = kj::HttpHeaders(context.getHeaderTable()); + // TODO: Make this KvOpType::GET_WITH if not called via KvNamespace::get. auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::GET, urlStr); + //auto client = getHttpClient(context, headers, LimitEnforcer::KvOpType::GET_WITH, urlStr); auto request = client->request(kj::HttpMethod::GET, urlStr, headers); return context.awaitIo(js, kj::mv(request.response), diff --git a/src/workerd/api/memory-cache.c++ b/src/workerd/api/memory-cache.c++ index c322e2a5c3a..f82c6c77fe7 100644 --- a/src/workerd/api/memory-cache.c++ +++ b/src/workerd/api/memory-cache.c++ @@ -383,6 +383,7 @@ jsg::Promise> MemoryCache::read(jsg::Lock& js, } auto readSpan = IoContext::current().makeTraceSpan("memory_cache_read"_kjc); + auto limeReadSpan = IoContext::current().makeLimeTraceSpan("memory_cache_read"_kjc); KJ_IF_SOME(fallback, optionalFallback) { KJ_SWITCH_ONEOF(cacheUse.getWithFallback(key.value, readSpan)) { @@ -393,7 +394,8 @@ jsg::Promise> MemoryCache::read(jsg::Lock& js, } KJ_CASE_ONEOF(promise, kj::Promise) { return IoContext::current().awaitIo(js, kj::mv(promise), - [fallback = kj::mv(fallback), key = kj::str(key.value), span = kj::mv(readSpan)]( + [fallback = kj::mv(fallback), key = kj::str(key.value), span = kj::mv(readSpan), + limeSpan = kj::mv(limeReadSpan)]( jsg::Lock& js, SharedMemoryCache::Use::GetWithFallbackOutcome cacheResult) mutable -> jsg::Promise> { KJ_SWITCH_ONEOF(cacheResult) { diff --git a/src/workerd/api/r2-admin.c++ b/src/workerd/api/r2-admin.c++ index 5704e7d1574..6c9383f6696 100644 --- a/src/workerd/api/r2-admin.c++ +++ b/src/workerd/api/r2-admin.c++ @@ -26,7 +26,12 @@ jsg::Ref R2Admin::get(jsg::Lock& js, kj::String bucketName) { jsg::Promise> R2Admin::create( jsg::Lock& js, kj::String name, const jsg::TypeHandler>& errorType) { auto& context = IoContext::current(); - auto client = context.getHttpClient(subrequestChannel, true, kj::none, "r2_delete"_kjc); + auto client = context.getHttpClientSpans( + subrequestChannel, true, kj::none, "r2_create"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("CreateBucket"_kj)); + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(name)); + }); capnp::JsonCodec json; json.handleByAnnotation(); @@ -57,7 +62,11 @@ jsg::Promise R2Admin::list(jsg::Lock& js, const jsg::TypeHandler>& errorType, CompatibilityFlags::Reader flags) { auto& context = IoContext::current(); - auto client = context.getHttpClient(subrequestChannel, true, kj::none, "r2_delete"_kjc); + auto client = context.getHttpClientSpans( + subrequestChannel, true, kj::none, "r2_list"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("ListObjects"_kj)); + }); capnp::JsonCodec json; json.handleByAnnotation(); @@ -113,7 +122,12 @@ jsg::Promise R2Admin::list(jsg::Lock& js, jsg::Promise R2Admin::delete_( jsg::Lock& js, kj::String name, const jsg::TypeHandler>& errorType) { auto& context = IoContext::current(); - auto client = context.getHttpClient(subrequestChannel, true, kj::none, "r2_delete"_kjc); + auto client = context.getHttpClientSpans( + subrequestChannel, true, kj::none, "r2_delete"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("DeleteBucket"_kj)); + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(name)); + }); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/api/r2-bucket.c++ b/src/workerd/api/r2-bucket.c++ index dcabc11d940..d46dc046f18 100644 --- a/src/workerd/api/r2-bucket.c++ +++ b/src/workerd/api/r2-bucket.c++ @@ -309,7 +309,15 @@ jsg::Promise>> R2Bucket::head(jsg::Lock return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc); + auto client = context.getHttpClientSpans( + clientIndex, true, kj::none, "r2_get"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("GetObject"_kj)); + tracing.limeSpan.setTag("cloudflare.r2.key"_kjc, kj::str(name)); + KJ_IF_SOME(b, this->adminBucketName()) { + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + }); capnp::JsonCodec json; json.handleByAnnotation(); @@ -345,7 +353,15 @@ R2Bucket::get(jsg::Lock& js, return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_get"_kjc); + auto client = context.getHttpClientSpans( + clientIndex, true, kj::none, "r2_get"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("GetObject"_kj)); + tracing.limeSpan.setTag("cloudflare.r2.key"_kjc, kj::str(name)); + KJ_IF_SOME(b, this->adminBucketName()) { + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + }); capnp::JsonCodec json; json.handleByAnnotation(); @@ -407,7 +423,15 @@ jsg::Promise>> R2Bucket::put(jsg::Lock& }); auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_put"_kjc); + auto client = context.getHttpClientSpans( + clientIndex, true, kj::none, "r2_put"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("PutObject"_kj)); + tracing.limeSpan.setTag("cloudflare.r2.key"_kjc, kj::str(name)); + KJ_IF_SOME(b, this->adminBucketName()) { + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + }); capnp::JsonCodec json; json.handleByAnnotation(); @@ -593,8 +617,14 @@ jsg::Promise> R2Bucket::createMultipartUpload(jsg::L const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = - context.getHttpClient(clientIndex, true, kj::none, "r2_createMultipartUpload"_kjc); + auto client = context.getHttpClientSpans( + clientIndex, true, kj::none, "r2_createMultipartUpload"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("CreateMultipartUpload"_kj)); + KJ_IF_SOME(b, adminBucket) { + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + }); capnp::JsonCodec json; json.handleByAnnotation(); @@ -687,7 +717,22 @@ jsg::Promise R2Bucket::delete_(jsg::Lock& js, const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_delete"_kjc); + auto client = context.getHttpClientSpans( + clientIndex, true, kj::none, "r2_delete"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("DeleteObject"_kj)); + KJ_SWITCH_ONEOF(keys) { + KJ_CASE_ONEOF(ks, kj::Array) { + tracing.limeSpan.setTag("cloudflare.r2.delete"_kjc, kj::str(ks)); + } + KJ_CASE_ONEOF(k, kj::String) { + tracing.limeSpan.setTag("cloudflare.r2.delete"_kjc, kj::str(k)); + } + } + KJ_IF_SOME(b, this->adminBucketName()) { + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + }); capnp::JsonCodec json; json.handleByAnnotation(); @@ -732,7 +777,14 @@ jsg::Promise R2Bucket::list(jsg::Lock& js, CompatibilityFlags::Reader flags) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient(clientIndex, true, kj::none, "r2_list"_kjc); + auto client = context.getHttpClientSpans( + clientIndex, true, kj::none, "r2_list"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("ListObjects"_kj)); + KJ_IF_SOME(b, this->adminBucketName()) { + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + }); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/api/r2-multipart.c++ b/src/workerd/api/r2-multipart.c++ index 4c563767fcb..532501548c7 100644 --- a/src/workerd/api/r2-multipart.c++ +++ b/src/workerd/api/r2-multipart.c++ @@ -28,8 +28,15 @@ jsg::Promise R2MultipartUpload::uploadPart(jsg: "Part number must be between 1 and 10000 (inclusive). Actual value was: ", partNumber); auto& context = IoContext::current(); - auto client = - context.getHttpClient(this->bucket->clientIndex, true, kj::none, "r2_uploadPart"_kjc); + auto client = context.getHttpClientSpans( + this->bucket->clientIndex, true, kj::none, "r2_uploadPart"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("UploadPart"_kj)); + tracing.limeSpan.setTag("cloudflare.r2.upload_id"_kjc, kj::str(uploadId)); + KJ_IF_SOME(b, this->bucket->adminBucketName()) { + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + }); capnp::JsonCodec json; json.handleByAnnotation(); @@ -75,8 +82,15 @@ jsg::Promise> R2MultipartUpload::complete(jsg::Lo const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient( - this->bucket->clientIndex, true, kj::none, "r2_completeMultipartUpload"_kjc); + auto client = context.getHttpClientSpans(this->bucket->clientIndex, true, kj::none, + "r2_completeMultipartUpload"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("CompleteMultipartUpload"_kj)); + tracing.limeSpan.setTag("cloudflare.r2.upload_id"_kjc, kj::str(uploadId)); + KJ_IF_SOME(b, this->bucket->adminBucketName()) { + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + }); capnp::JsonCodec json; json.handleByAnnotation(); @@ -125,8 +139,15 @@ jsg::Promise R2MultipartUpload::abort( jsg::Lock& js, const jsg::TypeHandler>& errorType) { return js.evalNow([&] { auto& context = IoContext::current(); - auto client = context.getHttpClient( - this->bucket->clientIndex, true, kj::none, "r2_abortMultipartUpload"_kjc); + auto client = context.getHttpClientSpans(this->bucket->clientIndex, true, kj::none, + "r2_abortMultipartUpload"_kjc, [&](TraceContext& tracing) { + tracing.limeSpan.setTag("rpc.service"_kjc, kj::str("r2"_kj)); + tracing.limeSpan.setTag("rpc.method"_kjc, kj::str("AbortMultipartUpload"_kj)); + tracing.limeSpan.setTag("cloudflare.r2.upload_id"_kjc, kj::str(uploadId)); + KJ_IF_SOME(b, this->bucket->adminBucketName()) { + tracing.limeSpan.setTag("cloudflare.r2.bucket"_kjc, kj::str(b)); + } + }); capnp::JsonCodec json; json.handleByAnnotation(); diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 685d5f49f17..d2a10bafeb7 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -835,6 +835,24 @@ kj::Own IoContext::getSubrequestChannel( }); } +kj::Own IoContext::getSubrequestChannelSpans(uint channel, + bool isInHouse, + kj::Maybe cfBlobJson, + kj::ConstString operationName, + kj::FunctionParam func) { + return getSubrequest( + [&](TraceContext& tracing, IoChannelFactory& channelFactory) { + func(tracing); + return getSubrequestChannelImpl( + channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory); + }, + SubrequestOptions{ + .inHouse = isInHouse, + .wrapMetrics = !isInHouse, + .operationName = kj::mv(operationName), + }); +} + kj::Own IoContext::getSubrequestChannelNoChecks(uint channel, bool isInHouse, kj::Maybe cfBlobJson, @@ -872,6 +890,14 @@ kj::Own IoContext::getHttpClient( return asHttpClient( getSubrequestChannel(channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName))); } +kj::Own IoContext::getHttpClientSpans(uint channel, + bool isInHouse, + kj::Maybe cfBlobJson, + kj::ConstString operationName, + kj::FunctionParam func) { + return asHttpClient(getSubrequestChannelSpans( + channel, isInHouse, kj::mv(cfBlobJson), kj::mv(operationName), func)); +} kj::Own IoContext::getHttpClientNoChecks(uint channel, bool isInHouse, diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index d0600a95fd5..732aecfd7d7 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -696,6 +696,13 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler kj::Maybe cfBlobJson, kj::ConstString operationName); + // As above, but with callback to add span tags. + kj::Own getSubrequestChannelSpans(uint channel, + bool isInHouse, + kj::Maybe cfBlobJson, + kj::ConstString operationName, + kj::FunctionParam func); + // Like getSubrequestChannel() but doesn't enforce limits. Use for trusted paths only. kj::Own getSubrequestChannelNoChecks(uint channel, bool isInHouse, @@ -708,6 +715,12 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler bool isInHouse, kj::Maybe cfBlobJson, kj::ConstString operationName); + // As above, but with callback to add span tags, analogous to getSubrequestChannelSpans(). + kj::Own getHttpClientSpans(uint channel, + bool isInHouse, + kj::Maybe cfBlobJson, + kj::ConstString operationName, + kj::FunctionParam func); // Convenience methods that call getSubrequest*() and adapt the returned WorkerInterface objects // to HttpClient. diff --git a/src/workerd/io/limit-enforcer.h b/src/workerd/io/limit-enforcer.h index acbd5f880ac..9135c01be16 100644 --- a/src/workerd/io/limit-enforcer.h +++ b/src/workerd/io/limit-enforcer.h @@ -109,7 +109,7 @@ class LimitEnforcer { // external subrequests. virtual void newSubrequest(bool isInHouse) = 0; - enum class KvOpType { GET, PUT, LIST, DELETE }; + enum class KvOpType { GET, GET_WITH, PUT, LIST, DELETE }; // Called before starting a KV operation. Throws a JSG exception if the operation should be // blocked due to exceeding limits, such as the free tier daily operation limit. virtual void newKvRequest(KvOpType op) = 0; diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 98c2ae019c4..7af97b3476c 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -1523,7 +1523,7 @@ Worker::Worker(kj::Own scriptParam, kj::FunctionParam target)> compileBindings, IsolateObserver::StartType startType, - SpanParent parentSpan, + TraceParentContext spans, LockType lockType, kj::Maybe errorReporter, kj::Maybe startupTime) @@ -1546,7 +1546,7 @@ Worker::Worker(kj::Own scriptParam, }); auto maybeMakeSpan = [&](auto operationName) -> SpanBuilder { - auto span = parentSpan.newChild(kj::mv(operationName)); + auto span = spans.parentSpan.newChild(kj::mv(operationName)); if (span.isObserved()) { span.setTag("truncated_script_id"_kjc, truncateScriptId(script->getId())); } @@ -1613,6 +1613,8 @@ Worker::Worker(kj::Own scriptParam, // Execute script. currentSpan = maybeMakeSpan("lw:top_level_execution"_kjc); + SpanBuilder currentLimeSpan = + spans.limeParentSpan.newChild("lw:top_level_execution"_kjc); KJ_SWITCH_ONEOF(script->impl->unboundScriptOrMainModule) { KJ_CASE_ONEOF(unboundScript, jsg::NonModuleScript) { diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index 7a0f1518883..0704d8cf21c 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -97,7 +97,7 @@ class Worker: public kj::AtomicRefcounted { kj::FunctionParam target)> compileBindings, IsolateObserver::StartType startType, - SpanParent parentSpan, + TraceParentContext spans, LockType lockType, kj::Maybe errorReporter = kj::none, kj::Maybe startupTime = kj::none);