Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get the WorkerTracer via RequestObserver #2957

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve

auto eventParameters = consumeParams();

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) {
Trace::HibernatableWebSocketEventInfo::Type type =
[&]() -> Trace::HibernatableWebSocketEventInfo::Type {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
Expand All @@ -95,7 +95,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve
KJ_UNREACHABLE;
}();

t.setEventInfo(context.now(), Trace::HibernatableWebSocketEventInfo(kj::mv(type)));
tracer->setEventInfo(context.now(), Trace::HibernatableWebSocketEventInfo(kj::mv(type)));
}

try {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/node/diagnostics-channel.c++
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) {
}

auto& context = IoContext::current();
KJ_IF_SOME(tracer, context.getWorkerTracer()) {
KJ_IF_SOME(tracer, context.getMetrics().getWorkerTracer()) {
jsg::Serializer ser(js,
jsg::Serializer::Options{
.omitHeader = false,
Expand All @@ -37,7 +37,7 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) {
Error,
"Diagnostic events cannot be published with SharedArrayBuffer or "
"transferred ArrayBuffer instances");
tracer.addDiagnosticChannelEvent(context.now(), name.toString(js), kj::mv(tmp.data));
tracer->addDiagnosticChannelEvent(context.now(), name.toString(js), kj::mv(tmp.data));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/node/util.c++
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ namespace {
kj::str("The Node.js process.exit(", code, ") API was called. Canceling the request.");
auto& ioContext = IoContext::current();
// If we have a tail worker, let's report the error.
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
// Why create the error like this in tracing? Because we're adding the exception
// to the trace and ideally we'd have the JS stack attached to it. Just using
// JSG_KJ_EXCEPTION would not give us that, and we only want to incur the cost
// of creating and capturing the stack when we actually need it.
auto ex = KJ_ASSERT_NONNULL(js.error(message).tryCast<jsg::JsObject>());
tracer.addException(ioContext.now(), ex.get(js, "name"_kj).toString(js),
tracer->addException(ioContext.now(), ex.get(js, "name"_kj).toString(js),
ex.get(js, "message"_kj).toString(js), ex.get(js, "stack"_kj).toString(js));
ioContext.abort(js.exceptionToKj(ex));
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
}
}

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize));
KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) {
tracer->setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize));
}

// Create a custom refcounted type for holding the queueEvent so that we can pass it to the
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
auto& context = incomingRequest->getContext();
auto& metrics = incomingRequest->getMetrics();

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::TraceEventInfo(traces));
KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) {
tracer->setEventInfo(context.now(), Trace::TraceEventInfo(traces));
}

// Add the actual JS as a wait until because the handler may be an event listener which can't
Expand Down
15 changes: 5 additions & 10 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1602,14 +1602,11 @@ void RpcSerializerExternalHander::serializeFunction(
// call of an RPC session.
class EntrypointJsRpcTarget final: public JsRpcTargetBase {
public:
EntrypointJsRpcTarget(IoContext& ioCtx,
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<kj::Own<WorkerTracer>> tracer)
EntrypointJsRpcTarget(IoContext& ioCtx, kj::Maybe<kj::StringPtr> entrypointName)
: JsRpcTargetBase(ioCtx),
// Most of the time we don't really have to clone this but it's hard to fully prove, so
// let's be safe.
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })),
tracer(kj::mv(tracer)) {}
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })) {}

TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override {
jsg::Lock& js = lock;
Expand Down Expand Up @@ -1647,7 +1644,6 @@ public:

private:
kj::Maybe<kj::String> entrypointName;
kj::Maybe<kj::Own<WorkerTracer>> tracer;

bool isReservedName(kj::StringPtr name) override {
if ( // "fetch" and "connect" are treated specially on entrypoints.
Expand All @@ -1670,8 +1666,8 @@ private:
}

void addTrace(jsg::Lock& js, IoContext& ioctx, kj::StringPtr methodName) override {
KJ_IF_SOME(t, tracer) {
t->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName)));
KJ_IF_SOME(tracer, ioctx.getMetrics().getWorkerTracer()) {
tracer->setEventInfo(ioctx.now(), Trace::JsRpcEventInfo(kj::str(methodName)));
}
}
};
Expand Down Expand Up @@ -1725,8 +1721,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEventImpl::r
incomingRequest->delivered();

auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller<void>();
capFulfiller->fulfill(capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName,
mapAddRef(incomingRequest->getWorkerTracer())),
capFulfiller->fulfill(capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName),
kj::refcounted<ServerTopLevelMembrane>(kj::mv(doneFulfiller))));

KJ_DEFER({
Expand Down
6 changes: 2 additions & 4 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,9 @@ IoContext::IoContext(ThreadContext& thread,

IoContext::IncomingRequest::IoContext_IncomingRequest(kj::Own<IoContext> contextParam,
kj::Own<IoChannelFactory> ioChannelFactoryParam,
kj::Own<RequestObserver> metricsParam,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer)
kj::Own<RequestObserver> metricsParam)
: context(kj::mv(contextParam)),
metrics(kj::mv(metricsParam)),
workerTracer(kj::mv(workerTracer)),
ioChannelFactory(kj::mv(ioChannelFactoryParam)) {}

// A call to delivered() implies a promise to call drain() later (or one of the other methods
Expand Down Expand Up @@ -340,7 +338,7 @@ void IoContext::logUncaughtException(

void IoContext::logUncaughtExceptionAsync(
UncaughtExceptionSource source, kj::Exception&& exception) {
if (getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) {
if (getMetrics().getWorkerTracer() == kj::none && !worker->getIsolate().isInspectorEnabled()) {
// We don't need to take the isolate lock as neither inspecting nor tracing is enabled. We
// do still want to syslog if relevant, but we can do that without a lock.
if (!jsg::isTunneledException(exception.getDescription()) &&
Expand Down
13 changes: 1 addition & 12 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ class IoContext_IncomingRequest final {
public:
IoContext_IncomingRequest(kj::Own<IoContext> context,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer);
kj::Own<RequestObserver> metrics);
KJ_DISALLOW_COPY_AND_MOVE(IoContext_IncomingRequest);
~IoContext_IncomingRequest() noexcept(false);

Expand Down Expand Up @@ -154,14 +153,9 @@ class IoContext_IncomingRequest final {
return *metrics;
}

kj::Maybe<WorkerTracer&> getWorkerTracer() {
return workerTracer;
}

private:
kj::Own<IoContext> context;
kj::Own<RequestObserver> metrics;
kj::Maybe<kj::Own<WorkerTracer>> workerTracer;
kj::Own<IoChannelFactory> ioChannelFactory;

bool wasDelivered = false;
Expand Down Expand Up @@ -235,11 +229,6 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
return *getCurrentIncomingRequest().metrics;
}

const kj::Maybe<WorkerTracer&> getWorkerTracer() {
if (incomingRequests.empty()) return kj::none;
return getCurrentIncomingRequest().getWorkerTracer();
}

LimitEnforcer& getLimitEnforcer() {
return *limitEnforcer;
}
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class RequestObserver: public kj::Refcounted {
return nullptr;
}

virtual kj::Maybe<kj::Rc<WorkerTracer>> getWorkerTracer() {
return kj::none;
}

virtual void addedContextTask() {}
virtual void finishedContextTask() {}
virtual void addedWaitUntilTask() {}
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ kj::Promise<kj::Array<kj::Own<Trace>>> PipelineTracer::onComplete() {
return kj::mv(paf.promise);
}

kj::Own<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
kj::Rc<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
ExecutionModel executionModel,
kj::Maybe<kj::String> scriptId,
kj::Maybe<kj::String> stableId,
Expand All @@ -590,7 +590,7 @@ kj::Own<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline
kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint),
executionModel);
traces.add(kj::addRef(*trace));
return kj::refcounted<WorkerTracer>(kj::addRef(*this), kj::mv(trace), pipelineLogLevel);
return kj::rc<WorkerTracer>(kj::addRef(*this), kj::mv(trace), pipelineLogLevel);
}

void PipelineTracer::addTrace(rpc::Trace::Reader reader) {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ class PipelineTracer final: public kj::Refcounted {
}

// Makes a tracer for a worker stage.
kj::Own<WorkerTracer> makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
kj::Rc<WorkerTracer> makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
ExecutionModel executionModel,
kj::Maybe<kj::String> scriptId,
kj::Maybe<kj::String> stableId,
Expand Down
27 changes: 11 additions & 16 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public:
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson);

kj::Promise<void> request(kj::HttpMethod method,
Expand Down Expand Up @@ -95,8 +94,7 @@ private:
kj::Own<LimitEnforcer> limitEnforcer,
kj::Own<void> ioContextDependency,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer);
kj::Own<RequestObserver> metrics);

template <typename T>
kj::Promise<T> maybeAddGcPassForTest(IoContext& context, kj::Promise<T> promise);
Expand Down Expand Up @@ -157,13 +155,12 @@ kj::Own<WorkerInterface> WorkerEntrypoint::construct(ThreadContext& threadContex
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson) {
TRACE_EVENT("workerd", "WorkerEntrypoint::construct()");
auto obj = kj::heap<WorkerEntrypoint>(kj::Badge<WorkerEntrypoint>(), threadContext,
waitUntilTasks, tunnelExceptions, entrypointName, kj::mv(cfBlobJson));
obj->init(kj::mv(worker), kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency),
kj::mv(ioChannelFactory), kj::addRef(*metrics), kj::mv(workerTracer));
kj::mv(ioChannelFactory), kj::addRef(*metrics));
auto& wrapper = metrics->wrapWorkerInterface(*obj);
return kj::attachRef(wrapper, kj::mv(obj), kj::mv(metrics));
}
Expand All @@ -185,8 +182,7 @@ void WorkerEntrypoint::init(kj::Own<const Worker> worker,
kj::Own<LimitEnforcer> limitEnforcer,
kj::Own<void> ioContextDependency,
kj::Own<IoChannelFactory> ioChannelFactory,
kj::Own<RequestObserver> metrics,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer) {
kj::Own<RequestObserver> metrics) {
TRACE_EVENT("workerd", "WorkerEntrypoint::init()");
// We need to construct the IoContext -- unless this is an actor and it already has a
// IoContext, in which case we reuse it.
Expand All @@ -212,7 +208,7 @@ void WorkerEntrypoint::init(kj::Own<const Worker> worker,
}

incomingRequest = kj::heap<IoContext::IncomingRequest>(
kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics), kj::mv(workerTracer))
kj::mv(context), kj::mv(ioChannelFactory), kj::mv(metrics))
.attach(kj::mv(actor));
}

Expand All @@ -233,7 +229,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,

bool isActor = context.getActor() != kj::none;

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
KJ_IF_SOME(tracer, incomingRequest->getMetrics().getWorkerTracer()) {
auto timestamp = context.now();
kj::String cfJson;
KJ_IF_SOME(c, cfBlobJson) {
Expand All @@ -257,7 +253,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
return Trace::FetchEventInfo::Header(kj::mv(entry.key), kj::strArray(entry.value, ", "));
};

t.setEventInfo(timestamp,
tracer->setEventInfo(timestamp,
Trace::FetchEventInfo(method, kj::str(url), kj::mv(cfJson), kj::mv(traceHeadersArray)));
}

Expand Down Expand Up @@ -476,9 +472,9 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
// calling context->drain(). We don't ever send scheduled events to actors. If we do, we'll have
// to think more about this.

KJ_IF_SOME(t, context.getWorkerTracer()) {
KJ_IF_SOME(t, context.getMetrics().getWorkerTracer()) {
double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS;
t.setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron)));
t->setEventInfo(context.now(), Trace::ScheduledEventInfo(eventTime, kj::str(cron)));
}

// Scheduled handlers run entirely in waitUntil() tasks.
Expand Down Expand Up @@ -535,8 +531,8 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
// There isn't a pre-existing alarm, we can call `delivered()` (and emit metrics events).
incomingRequest->delivered();

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime));
KJ_IF_SOME(t, incomingRequest->getMetrics().getWorkerTracer()) {
t->setEventInfo(context.now(), Trace::AlarmEventInfo(scheduledTime));
}

auto scheduleAlarmResult = co_await actor.scheduleAlarm(scheduledTime);
Expand Down Expand Up @@ -715,11 +711,10 @@ kj::Own<WorkerInterface> newWorkerEntrypoint(ThreadContext& threadContext,
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson) {
return WorkerEntrypoint::construct(threadContext, kj::mv(worker), kj::mv(entrypointName),
kj::mv(actor), kj::mv(limitEnforcer), kj::mv(ioContextDependency), kj::mv(ioChannelFactory),
kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(workerTracer), kj::mv(cfBlobJson));
kj::mv(metrics), waitUntilTasks, tunnelExceptions, kj::mv(cfBlobJson));
}

} // namespace workerd
1 change: 0 additions & 1 deletion src/workerd/io/worker-entrypoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ kj::Own<WorkerInterface> newWorkerEntrypoint(ThreadContext& threadContext,
kj::Own<RequestObserver> metrics,
kj::TaskSet& waitUntilTasks,
bool tunnelExceptions,
kj::Maybe<kj::Own<WorkerTracer>> workerTracer,
kj::Maybe<kj::String> cfBlobJson);

} // namespace workerd
12 changes: 6 additions & 6 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void sendExceptionToInspector(jsg::Lock& js,

void addExceptionToTrace(jsg::Lock& js,
IoContext& ioContext,
WorkerTracer& tracer,
kj::Rc<WorkerTracer>& tracer,
UncaughtExceptionSource source,
const jsg::JsValue& exception,
const jsg::TypeHandler<Worker::Api::ErrorInterface>& errorTypeHandler) {
Expand Down Expand Up @@ -214,7 +214,7 @@ void addExceptionToTrace(jsg::Lock& js,
}

// TODO(someday): Limit size of exception content?
tracer.addException(timestamp, kj::mv(name), kj::mv(message), kj::mv(stack));
tracer->addException(timestamp, kj::mv(name), kj::mv(message), kj::mv(stack));
}

void reportStartupError(kj::StringPtr id,
Expand Down Expand Up @@ -1047,7 +1047,7 @@ Worker::Isolate::Isolate(kj::Own<Api> apiParam,
// Only add exception to trace when running within an I/O context with a tracer.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
addExceptionToTrace(js, ioContext, tracer, UncaughtExceptionSource::REQUEST_HANDLER,
error, api->getErrorInterfaceTypeHandler(js));
}
Expand Down Expand Up @@ -1840,9 +1840,9 @@ void Worker::handleLog(jsg::Lock& js,
// Only check tracing if console.log() was not invoked at the top level.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
auto timestamp = ioContext.now();
tracer.log(timestamp, level, message());
tracer->log(timestamp, level, message());
}
}

Expand Down Expand Up @@ -2039,7 +2039,7 @@ void Worker::Lock::logUncaughtException(
// Only add exception to trace when running within an I/O context with a tracer.
if (IoContext::hasCurrent()) {
auto& ioContext = IoContext::current();
KJ_IF_SOME(tracer, ioContext.getWorkerTracer()) {
KJ_IF_SOME(tracer, ioContext.getMetrics().getWorkerTracer()) {
JSG_WITHIN_CONTEXT_SCOPE(*this, getContext(), [&](jsg::Lock& js) {
addExceptionToTrace(impl->inner, ioContext, tracer, source, exception,
worker.getIsolate().getApi().getErrorInterfaceTypeHandler(*this));
Expand Down
3 changes: 1 addition & 2 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1541,8 +1541,7 @@ public:
kj::Own<IoChannelFactory>(this, kj::NullDisposer::instance),
kj::refcounted<RequestObserver>(), // default observer makes no observations
waitUntilTasks,
true, // tunnelExceptions
kj::none, // workerTracer
true, // tunnelExceptions
kj::mv(metadata.cfBlobJson));
}

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ kj::Own<IoContext::IncomingRequest> TestFixture::createIncomingRequest() {
auto context = kj::refcounted<IoContext>(
threadContext, kj::atomicAddRef(*worker), actor, kj::heap<MockLimitEnforcer>());
auto incomingRequest = kj::heap<IoContext::IncomingRequest>(kj::addRef(*context),
kj::heap<DummyIoChannelFactory>(*timerChannel), kj::refcounted<RequestObserver>(), nullptr);
kj::heap<DummyIoChannelFactory>(*timerChannel), kj::refcounted<RequestObserver>());
incomingRequest->delivered();
return incomingRequest;
}
Expand Down
Loading
Loading