Skip to content

Commit

Permalink
Get the WorkerTracer via RequestObserver
Browse files Browse the repository at this point in the history
We're needlessly refcounting the WorkerTracer to make it available
via the IoContext::IncomingRequest when the RequestObserver also
has a reference and can be used to get it. Also makes more sense
to access WorkerTracer via RequestObserver anyway.
  • Loading branch information
jasnell committed Oct 19, 2024
1 parent 35ec39b commit f06baa7
Show file tree
Hide file tree
Showing 14 changed files with 30 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve

auto eventParameters = consumeParams();

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
Trace::HibernatableWebSocketEventInfo::Type type =
[&]() -> Trace::HibernatableWebSocketEventInfo::Type {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/node/diagnostics-channel.c++
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) {
}

auto& context = IoContext::current();
KJ_IF_SOME(tracer, context.getWorkerTracer()) {
KJ_IF_SOME(tracer, context.getMetrics().getWorkerTracer()) {
jsg::Serializer ser(js,
jsg::Serializer::Options{
.omitHeader = false,
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/node/util.c++
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ namespace {
kj::str("The Node.js process.exit(", code, ") API was called. Canceling the request.");
auto& ioContext = IoContext::current();
// If we have a tail worker, let's report the error.
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
// Why create the error like this in tracing? Because we're adding the exception
// to the trace and ideally we'd have the JS stack attached to it. Just using
// JSG_KJ_EXCEPTION would not give us that, and we only want to incur the cost
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
}
}

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize));
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
auto& context = incomingRequest->getContext();
auto& metrics = incomingRequest->getMetrics();

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::TraceEventInfo(traces));
}

Expand Down
15 changes: 5 additions & 10 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1602,14 +1602,11 @@ void RpcSerializerExternalHander::serializeFunction(
// call of an RPC session.
class EntrypointJsRpcTarget final: public JsRpcTargetBase {
public:
EntrypointJsRpcTarget(IoContext& ioCtx,
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<kj::Own<WorkerTracer>> tracer)
EntrypointJsRpcTarget(IoContext& ioCtx, kj::Maybe<kj::StringPtr> entrypointName)
: JsRpcTargetBase(ioCtx),
// Most of the time we don't really have to clone this but it's hard to fully prove, so
// let's be safe.
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })),
tracer(kj::mv(tracer)) {}
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })) {}

TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override {
jsg::Lock& js = lock;
Expand Down Expand Up @@ -1647,7 +1644,6 @@ public:

private:
kj::Maybe<kj::String> entrypointName;
kj::Maybe<kj::Own<WorkerTracer>> tracer;

bool isReservedName(kj::StringPtr name) override {
if ( // "fetch" and "connect" are treated specially on entrypoints.
Expand All @@ -1670,8 +1666,8 @@ private:
}

void addTrace(jsg::Lock& js, IoContext& ioctx, kj::StringPtr methodName) override {
KJ_IF_SOME(t, tracer) {
t->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName)));
KJ_IF_SOME(t, ioctx.getMetrics().getWorkerTracer()) {
t.setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName)));
}
}
};
Expand Down Expand Up @@ -1725,8 +1721,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEventImpl::r
incomingRequest->delivered();

auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller<void>();
capFulfiller->fulfill(capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName,
mapAddRef(incomingRequest->getWorkerTracer())),
capFulfiller->fulfill(capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName),
kj::refcounted<ServerTopLevelMembrane>(kj::mv(doneFulfiller))));

KJ_DEFER({
Expand Down
6 changes: 2 additions & 4 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,9 @@ IoContext::IoContext(ThreadContext& thread,

IoContext::IncomingRequest::IoContext_IncomingRequest(kj::Own<IoContext> contextParam,
kj::Own<IoChannelFactory> ioChannelFactoryParam,
kj::Own<RequestObserver> metricsParam,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer)
kj::Own<RequestObserver> metricsParam)
: context(kj::mv(contextParam)),
metrics(kj::mv(metricsParam)),
workerTracer(kj::mv(workerTracer)),
ioChannelFactory(kj::mv(ioChannelFactoryParam)) {}

// A call to delivered() implies a promise to call drain() later (or one of the other methods
Expand Down Expand Up @@ -340,7 +338,7 @@ void IoContext::logUncaughtException(

void IoContext::logUncaughtExceptionAsync(
UncaughtExceptionSource source, kj::Exception&& exception) {
if (getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) {
if (getMetrics().getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) {
// We don't need to take the isolate lock as neither inspecting nor tracing is enabled. We
// do still want to syslog if relevant, but we can do that without a lock.
if (!jsg::isTunneledException(exception.getDescription()) &&
Expand Down
13 changes: 1 addition & 12 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ class IoContext_IncomingRequest final {
public:
IoContext_IncomingRequest(kj::Own<IoContext> context,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer);
kj::Own<RequestObserver> metrics);
KJ_DISALLOW_COPY_AND_MOVE(IoContext_IncomingRequest);
~IoContext_IncomingRequest() noexcept(false);

Expand Down Expand Up @@ -154,14 +153,9 @@ class IoContext_IncomingRequest final {
return *metrics;
}

kj::Maybe<WorkerTracer&> getWorkerTracer() {
return workerTracer;
}

private:
kj::Own<IoContext> context;
kj::Own<RequestObserver> metrics;
kj::Maybe<kj::Own<WorkerTracer>> workerTracer;
kj::Own<IoChannelFactory> ioChannelFactory;

bool wasDelivered = false;
Expand Down Expand Up @@ -235,11 +229,6 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
return *getCurrentIncomingRequest().metrics;
}

const kj::Maybe<WorkerTracer&> getWorkerTracer() {
if (incomingRequests.empty()) return kj::none;
return getCurrentIncomingRequest().getWorkerTracer();
}

LimitEnforcer& getLimitEnforcer() {
return *limitEnforcer;
}
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class RequestObserver: public kj::Refcounted {
return nullptr;
}

virtual kj::Maybe<WorkerTracer&> getWorkerTracer() {
return kj::none;
}

virtual void addedContextTask() {}
virtual void finishedContextTask() {}
virtual void addedWaitUntilTask() {}
Expand Down
21 changes: 8 additions & 13 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public:
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson);

kj::Promise<void> request(kj::HttpMethod method,
Expand Down Expand Up @@ -95,8 +94,7 @@ private:
kj::Own<LimitEnforcer> limitEnforcer,
kj::Own<void> ioContextDependency,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer);
kj::Own<RequestObserver> metrics);

template <typename T>
kj::Promise<T> maybeAddGcPassForTest(IoContext& context, kj::Promise<T> promise);
Expand Down Expand Up @@ -157,13 +155,12 @@ kj::Own<WorkerInterface> WorkerEntrypoint::construct(ThreadContext& threadContex
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson) {
TRACE_EVENT("workerd", "WorkerEntrypoint::construct()");
auto obj = kj::heap<WorkerEntrypoint>(kj::Badge<WorkerEntrypoint>(), threadContext,
waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson));
obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency),
kj::mv(ioChannelFactory), kj::addRef(*metrics), kj::mv(workerTracer));
kj::mv(ioChannelFactory), kj::addRef(*metrics));
auto& wrapper = metrics->wrapWorkerInterface(*obj);
return kj::attachRef(wrapper, kj::mv(obj), kj::mv(metrics));
}
Expand All @@ -185,8 +182,7 @@ void WorkerEntrypoint::init(kj::Own<const Worker> worker,
kj::Own<LimitEnforcer> limitEnforcer,
kj::Own<void> ioContextDependency,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer) {
kj::Own<RequestObserver> metrics) {
TRACE_EVENT("workerd", "WorkerEntrypoint::init()");
// We need to construct the IoContext -- unless this is an actor and it already has a
// IoContext, in which case we reuse it.
Expand All @@ -212,7 +208,7 @@ void WorkerEntrypoint::init(kj::Own<const Worker> worker,
}

incomingRequest = kj::heap<IoContext::IncomingRequest>(
kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics), kj::mv(workerTracer))
kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics))
.attach(kj::mv(actor));
}

Expand All @@ -233,7 +229,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,

bool isActor = context.getActor() != kj::none;

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
auto timestamp = context.now();
kj::String cfJson;
KJ_IF_SOME(c, cfBlobJson) {
Expand Down Expand Up @@ -476,7 +472,7 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
// calling context->drain(). We don't ever send scheduled events to actors. If we do, we'll have
// to think more about this.

KJ_IF_SOME(t, context.getWorkerTracer()) {
KJ_IF_SOME(t, context.getMetrics().getWorkerTracer()) {
double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS;
t.setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron)));
}
Expand Down Expand Up @@ -535,7 +531,7 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
// There isn't a pre-existing alarm, we can call `delivered()` (and emit metrics events).
incomingRequest->delivered();

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime));
}

Expand Down Expand Up @@ -715,11 +711,10 @@ kj::Own<WorkerInterface> newWorkerEntrypoint(ThreadContext& threadContext,
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson) {
return WorkerEntrypoint::construct(threadContext, kj::mv(worker), kj::mv(entrypointName),
kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory),
kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(workerTracer), kj::mv(cfBlobJson));
kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(cfBlobJson));
}

} // namespace workerd
1 change: 0 additions & 1 deletion src/workerd/io/worker-entrypoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ kj::Own<WorkerInterface> newWorkerEntrypoint(ThreadContext& threadContext,
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson);

} // namespace workerd
6 changes: 3 additions & 3 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ Worker::Isolate::Isolate(kj::Own<Api> apiParam,
// Only add exception to trace when running within an I/O context with a tracer.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
addExceptionToTrace(js, ioContext, tracer, UncaughtExceptionSource::REQUEST_HANDLER,
error, api->getErrorInterfaceTypeHandler(js));
}
Expand Down Expand Up @@ -1840,7 +1840,7 @@ void Worker::handleLog(jsg::Lock& js,
// Only check tracing if console.log() was not invoked at the top level.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
auto timestamp = ioContext.now();
tracer.log(timestamp, level, message());
}
Expand Down Expand Up @@ -2039,7 +2039,7 @@ void Worker::Lock::logUncaughtException(
// Only add exception to trace when running within an I/O context with a tracer.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
JSG_WITHIN_CONTEXT_SCOPE(*this, getContext(), [&](jsg::Lock& js) {
addExceptionToTrace(impl->inner, ioContext, tracer, source, exception,
worker.getIsolate().getApi().getErrorInterfaceTypeHandler(*this));
Expand Down
3 changes: 1 addition & 2 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1541,8 +1541,7 @@ public:
kj::Own<IoChannelFactory>(this, kj::NullDisposer::instance),
kj::refcounted<RequestObserver>(), // default observer makes no observations
waitUntilTasks,
true, // tunnelExceptions
kj::none, // workerTracer
true, // tunnelExceptions
kj::mv(metadata.cfBlobJson));
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ kj::Own<IoContext::IncomingRequest> TestFixture::createIncomingRequest() {
auto context = kj::refcounted<IoContext>(
threadContext, kj::atomicAddRef(*worker), actor, kj::heap<MockLimitEnforcer>());
auto incomingRequest = kj::heap<IoContext::IncomingRequest>(kj::addRef(*context),
kj::heap<DummyIoChannelFactory>(*timerChannel), kj::refcounted<RequestObserver>(), nullptr);
kj::heap<DummyIoChannelFactory>(*timerChannel), kj::refcounted<RequestObserver>());
incomingRequest->delivered();
return incomingRequest;
}
Expand Down

0 comments on commit f06baa7

Please sign in to comment.