From f06baa7897313c455616dd4d7d46be8624eb4271 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Sat, 19 Oct 2024 07:15:19 -0700 Subject: [PATCH] Get the WorkerTracer via RequestObserver We're needlessly refcounting the WorkerTracer to make it available via the IoContext::IncomingRequest when the RequestObserver also has a reference and can be used to get it. Also makes more sense to access WorkerTracer via RequestObserver anyway. --- src/workerd/api/hibernatable-web-socket.c++ | 2 +- src/workerd/api/node/diagnostics-channel.c++ | 2 +- src/workerd/api/node/util.c++ | 2 +- src/workerd/api/queue.c++ | 2 +- src/workerd/api/trace.c++ | 2 +- src/workerd/api/worker-rpc.c++ | 15 +++++--------- src/workerd/io/io-context.c++ | 6 ++---- src/workerd/io/io-context.h | 13 +----------- src/workerd/io/observer.h | 4 ++++ src/workerd/io/worker-entrypoint.c++ | 21 ++++++++------------ src/workerd/io/worker-entrypoint.h | 1 - src/workerd/io/worker.c++ | 6 +++--- src/workerd/server/server.c++ | 3 +-- src/workerd/tests/test-fixture.c++ | 2 +- 14 files changed, 30 insertions(+), 51 deletions(-) diff --git a/src/workerd/api/hibernatable-web-socket.c++ b/src/workerd/api/hibernatable-web-socket.c++ index ecb83fe7fe5..bb534cd0173 100644 --- a/src/workerd/api/hibernatable-web-socket.c++ +++ b/src/workerd/api/hibernatable-web-socket.c++ @@ -74,7 +74,7 @@ kj::Promise HibernatableWebSocketCustomEve auto eventParameters = consumeParams(); - KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { Trace::HibernatableWebSocketEventInfo::Type type = [&]() -> Trace::HibernatableWebSocketEventInfo::Type { KJ_SWITCH_ONEOF(eventParameters.eventType) { diff --git a/src/workerd/api/node/diagnostics-channel.c++ b/src/workerd/api/node/diagnostics-channel.c++ index 7e7923de50e..f8e72b1c03d 100644 --- a/src/workerd/api/node/diagnostics-channel.c++ +++ b/src/workerd/api/node/diagnostics-channel.c++ @@ -26,7 +26,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) { } auto& context = IoContext::current(); - KJ_IF_SOME(tracer, context.getWorkerTracer()) { + KJ_IF_SOME(tracer, context.getMetrics().getWorkerTracer()) { jsg::Serializer ser(js, jsg::Serializer::Options{ .omitHeader = false, diff --git a/src/workerd/api/node/util.c++ b/src/workerd/api/node/util.c++ index 5fe83665f23..e8a882bf854 100644 --- a/src/workerd/api/node/util.c++ +++ b/src/workerd/api/node/util.c++ @@ -249,7 +249,7 @@ namespace { kj::str("The Node.js process.exit(", code, ") API was called. Canceling the request."); auto& ioContext = IoContext::current(); // If we have a tail worker, let's report the error. - KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { // Why create the error like this in tracing? Because we're adding the exception // to the trace and ideally we'd have the JS stack attached to it. Just using // JSG_KJ_EXCEPTION would not give us that, and we only want to incur the cost diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index c0cf06a3e43..efb0fd44af7 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -530,7 +530,7 @@ kj::Promise QueueCustomEventImpl::run( } } - KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { t.setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize)); } diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index 3dc4fe65d59..a61c4bf75fd 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -603,7 +603,7 @@ kj::Promise sendTracesToExportedHandler(kj::OwngetContext(); auto& metrics = incomingRequest->getMetrics(); - KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { t.setEventInfo(context.now(), Trace::TraceEventInfo(traces)); } diff --git a/src/workerd/api/worker-rpc.c++ b/src/workerd/api/worker-rpc.c++ index 863e23fca54..e4a49729614 100644 --- a/src/workerd/api/worker-rpc.c++ +++ b/src/workerd/api/worker-rpc.c++ @@ -1602,14 +1602,11 @@ void RpcSerializerExternalHander::serializeFunction( // call of an RPC session. class EntrypointJsRpcTarget final: public JsRpcTargetBase { public: - EntrypointJsRpcTarget(IoContext& ioCtx, - kj::Maybe entrypointName, - kj::Maybe> tracer) + EntrypointJsRpcTarget(IoContext& ioCtx, kj::Maybe entrypointName) : JsRpcTargetBase(ioCtx), // Most of the time we don't really have to clone this but it's hard to fully prove, so // let's be safe. - entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })), - tracer(kj::mv(tracer)) {} + entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })) {} TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override { jsg::Lock& js = lock; @@ -1647,7 +1644,6 @@ public: private: kj::Maybe entrypointName; - kj::Maybe> tracer; bool isReservedName(kj::StringPtr name) override { if ( // "fetch" and "connect" are treated specially on entrypoints. @@ -1670,8 +1666,8 @@ private: } void addTrace(jsg::Lock& js, IoContext& ioctx, kj::StringPtr methodName) override { - KJ_IF_SOME(t, tracer) { - t->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName))); + KJ_IF_SOME(t, ioctx.getMetrics().getWorkerTracer()) { + t.setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName))); } } }; @@ -1725,8 +1721,7 @@ kj::Promise JsRpcSessionCustomEventImpl::r incomingRequest->delivered(); auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller(); - capFulfiller->fulfill(capnp::membrane(kj::heap(ioctx, entrypointName, - mapAddRef(incomingRequest->getWorkerTracer())), + capFulfiller->fulfill(capnp::membrane(kj::heap(ioctx, entrypointName), kj::refcounted(kj::mv(doneFulfiller)))); KJ_DEFER({ diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 47759cdb9f7..33009a4f89e 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -200,11 +200,9 @@ IoContext::IoContext(ThreadContext& thread, IoContext::IncomingRequest::IoContext_IncomingRequest(kj::Own contextParam, kj::Own ioChannelFactoryParam, - kj::Own metricsParam, - kj::Maybe> workerTracer) + kj::Own metricsParam) : context(kj::mv(contextParam)), metrics(kj::mv(metricsParam)), - workerTracer(kj::mv(workerTracer)), ioChannelFactory(kj::mv(ioChannelFactoryParam)) {} // A call to delivered() implies a promise to call drain() later (or one of the other methods @@ -340,7 +338,7 @@ void IoContext::logUncaughtException( void IoContext::logUncaughtExceptionAsync( UncaughtExceptionSource source, kj::Exception&& exception) { - if (getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) { + if (getMetrics().getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) { // We don't need to take the isolate lock as neither inspecting nor tracing is enabled. We // do still want to syslog if relevant, but we can do that without a lock. if (!jsg::isTunneledException(exception.getDescription()) && diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index b44a6aa07d5..cfaa3e1557b 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -107,8 +107,7 @@ class IoContext_IncomingRequest final { public: IoContext_IncomingRequest(kj::Own context, kj::Own ioChannelFactory, - kj::Own metrics, - kj::Maybe> workerTracer); + kj::Own metrics); KJ_DISALLOW_COPY_AND_MOVE(IoContext_IncomingRequest); ~IoContext_IncomingRequest() noexcept(false); @@ -154,14 +153,9 @@ class IoContext_IncomingRequest final { return *metrics; } - kj::Maybe getWorkerTracer() { - return workerTracer; - } - private: kj::Own context; kj::Own metrics; - kj::Maybe> workerTracer; kj::Own ioChannelFactory; bool wasDelivered = false; @@ -235,11 +229,6 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler return *getCurrentIncomingRequest().metrics; } - const kj::Maybe getWorkerTracer() { - if (incomingRequests.empty()) return kj::none; - return getCurrentIncomingRequest().getWorkerTracer(); - } - LimitEnforcer& getLimitEnforcer() { return *limitEnforcer; } diff --git a/src/workerd/io/observer.h b/src/workerd/io/observer.h index ad18b59afdd..721f6a6ca78 100644 --- a/src/workerd/io/observer.h +++ b/src/workerd/io/observer.h @@ -103,6 +103,10 @@ class RequestObserver: public kj::Refcounted { return nullptr; } + virtual kj::Maybe getWorkerTracer() { + return kj::none; + } + virtual void addedContextTask() {} virtual void finishedContextTask() {} virtual void addedWaitUntilTask() {} diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 9b3bbc93fc0..1bc8a212d42 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -52,7 +52,6 @@ public: kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, - kj::Maybe> workerTracer, kj::Maybe cfBlobJson); kj::Promise request(kj::HttpMethod method, @@ -95,8 +94,7 @@ private: kj::Own limitEnforcer, kj::Own ioContextDependency, kj::Own ioChannelFactory, - kj::Own metrics, - kj::Maybe> workerTracer); + kj::Own metrics); template kj::Promise maybeAddGcPassForTest(IoContext& context, kj::Promise promise); @@ -157,13 +155,12 @@ kj::Own WorkerEntrypoint::construct(ThreadContext& threadContex kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, - kj::Maybe> workerTracer, kj::Maybe cfBlobJson) { TRACE_EVENT("workerd", "WorkerEntrypoint::construct()"); auto obj = kj::heap(kj::Badge(), threadContext, waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson)); obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), - kj::mv(ioChannelFactory), kj::addRef(*metrics), kj::mv(workerTracer)); + kj::mv(ioChannelFactory), kj::addRef(*metrics)); auto& wrapper = metrics->wrapWorkerInterface(*obj); return kj::attachRef(wrapper, kj::mv(obj), kj::mv(metrics)); } @@ -185,8 +182,7 @@ void WorkerEntrypoint::init(kj::Own worker, kj::Own limitEnforcer, kj::Own ioContextDependency, kj::Own ioChannelFactory, - kj::Own metrics, - kj::Maybe> workerTracer) { + kj::Own metrics) { TRACE_EVENT("workerd", "WorkerEntrypoint::init()"); // We need to construct the IoContext -- unless this is an actor and it already has a // IoContext, in which case we reuse it. @@ -212,7 +208,7 @@ void WorkerEntrypoint::init(kj::Own worker, } incomingRequest = kj::heap( - kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics), kj::mv(workerTracer)) + kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics)) .attach(kj::mv(actor)); } @@ -233,7 +229,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, bool isActor = context.getActor() != kj::none; - KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { auto timestamp = context.now(); kj::String cfJson; KJ_IF_SOME(c, cfBlobJson) { @@ -476,7 +472,7 @@ kj::Promise WorkerEntrypoint::runScheduled( // calling context->drain(). We don't ever send scheduled events to actors. If we do, we'll have // to think more about this. - KJ_IF_SOME(t, context.getWorkerTracer()) { + KJ_IF_SOME(t, context.getMetrics().getWorkerTracer()) { double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS; t.setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron))); } @@ -535,7 +531,7 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( // There isn't a pre-existing alarm, we can call `delivered()` (and emit metrics events). incomingRequest->delivered(); - KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { + KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) { t.setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime)); } @@ -715,11 +711,10 @@ kj::Own newWorkerEntrypoint(ThreadContext& threadContext, kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, - kj::Maybe> workerTracer, kj::Maybe cfBlobJson) { return WorkerEntrypoint::construct(threadContext, kj::mv(worker), kj::mv(entrypointName), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory), - kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(workerTracer), kj::mv(cfBlobJson)); + kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(cfBlobJson)); } } // namespace workerd diff --git a/src/workerd/io/worker-entrypoint.h b/src/workerd/io/worker-entrypoint.h index e5213e8d6e5..ac3fc8e4e5a 100644 --- a/src/workerd/io/worker-entrypoint.h +++ b/src/workerd/io/worker-entrypoint.h @@ -33,7 +33,6 @@ kj::Own newWorkerEntrypoint(ThreadContext& threadContext, kj::Own metrics, kj::TaskSet& waitUntilTasks, bool tunnelExceptions, - kj::Maybe> workerTracer, kj::Maybe cfBlobJson); } // namespace workerd diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 753690988f0..7d023468977 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -1047,7 +1047,7 @@ Worker::Isolate::Isolate(kj::Own apiParam, // Only add exception to trace when running within an I/O context with a tracer. if (IoContext::hasCurrent()) { auto& ioContext = IoContext::current(); - KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { addExceptionToTrace(js, ioContext, tracer, UncaughtExceptionSource::REQUEST_HANDLER, error, api->getErrorInterfaceTypeHandler(js)); } @@ -1840,7 +1840,7 @@ void Worker::handleLog(jsg::Lock& js, // Only check tracing if console.log() was not invoked at the top level. if (IoContext::hasCurrent()) { auto& ioContext = IoContext::current(); - KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { auto timestamp = ioContext.now(); tracer.log(timestamp, level, message()); } @@ -2039,7 +2039,7 @@ void Worker::Lock::logUncaughtException( // Only add exception to trace when running within an I/O context with a tracer. if (IoContext::hasCurrent()) { auto& ioContext = IoContext::current(); - KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) { + KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) { JSG_WITHIN_CONTEXT_SCOPE(*this, getContext(), [&](jsg::Lock& js) { addExceptionToTrace(impl->inner, ioContext, tracer, source, exception, worker.getIsolate().getApi().getErrorInterfaceTypeHandler(*this)); diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index a4cdcd10042..5883b55e080 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1541,8 +1541,7 @@ public: kj::Own(this, kj::NullDisposer::instance), kj::refcounted(), // default observer makes no observations waitUntilTasks, - true, // tunnelExceptions - kj::none, // workerTracer + true, // tunnelExceptions kj::mv(metadata.cfBlobJson)); } diff --git a/src/workerd/tests/test-fixture.c++ b/src/workerd/tests/test-fixture.c++ index a34c940c072..c4f5e9294db 100644 --- a/src/workerd/tests/test-fixture.c++ +++ b/src/workerd/tests/test-fixture.c++ @@ -410,7 +410,7 @@ kj::Own TestFixture::createIncomingRequest() { auto context = kj::refcounted( threadContext, kj::atomicAddRef(*worker), actor, kj::heap()); auto incomingRequest = kj::heap(kj::addRef(*context), - kj::heap(*timerChannel), kj::refcounted(), nullptr); + kj::heap(*timerChannel), kj::refcounted()); incomingRequest->delivered(); return incomingRequest; }