Skip to content

Commit

Permalink
Simplify PipelineTracer
Browse files Browse the repository at this point in the history
Remove the PipelineTracer parent tracer. The onComplete promise
can be used to track the completion of the pipeline to report
to the parent instead. The parent tracer is not used in workerd
at all.
  • Loading branch information
jasnell committed Oct 15, 2024
1 parent 0860b64 commit c792f66
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 64 deletions.
11 changes: 6 additions & 5 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,17 @@ void SpanBuilder::addLog(kj::Date timestamp, kj::ConstString key, TagValue value
}

PipelineTracer::~PipelineTracer() noexcept(false) {
KJ_IF_SOME(p, parentTracer) {
for (auto& t: traces) {
p->traces.add(kj::addRef(*t));
}
}
KJ_IF_SOME(f, completeFulfiller) {
f.get()->fulfill(traces.releaseAsArray());
}
}

void PipelineTracer::addTracesFromChild(kj::ArrayPtr<kj::Own<Trace>> traces) {
for (auto& t: traces) {
this->traces.add(kj::addRef(*t));
}
}

kj::Promise<kj::Array<kj::Own<Trace>>> PipelineTracer::onComplete() {
KJ_REQUIRE(completeFulfiller == kj::none, "onComplete() can only be called once");

Expand Down
13 changes: 3 additions & 10 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,14 @@ class WorkerTracer;
class PipelineTracer final: public kj::Refcounted {
public:
// Creates a pipeline tracer (with a possible parent).
explicit PipelineTracer(kj::Maybe<kj::Own<PipelineTracer>> parentPipeline = kj::none)
: parentTracer(kj::mv(parentPipeline)) {}

explicit PipelineTracer() = default;
~PipelineTracer() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(PipelineTracer);

// Returns a promise that fulfills when traces are complete. Only one such promise can
// exist at a time.
kj::Promise<kj::Array<kj::Own<Trace>>> onComplete();

// Makes a tracer for a subpipeline.
kj::Own<PipelineTracer> makePipelineSubtracer() {
return kj::refcounted<PipelineTracer>(kj::addRef(*this));
}

// Makes a tracer for a worker stage.
kj::Own<WorkerTracer> makeWorkerTracer(PipelineLogLevel pipelineLogLevel,
ExecutionModel executionModel,
Expand All @@ -70,12 +63,12 @@ class PipelineTracer final: public kj::Refcounted {
// to the host where tracing was initiated.
void addTrace(rpc::Trace::Reader reader);

void addTracesFromChild(kj::ArrayPtr<kj::Own<Trace>> traces);

private:
kj::Vector<kj::Own<Trace>> traces;
kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Array<kj::Own<Trace>>>>> completeFulfiller;

kj::Maybe<kj::Own<PipelineTracer>> parentTracer;

friend class WorkerTracer;
};

Expand Down
102 changes: 53 additions & 49 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1616,7 +1616,6 @@ public:
const kj::HashMap<kj::String, ActorConfig>& actorClasses,
LinkCallback linkCallback,
AbortActorsCallback abortActorsCallback,
kj::Maybe<kj::Own<PipelineTracer>> maybeTracer,
kj::Array<kj::Own<config::ServiceDesignator::Reader>> loggingServices,
LookupServiceCallback lookupService)
: threadContext(threadContext),
Expand All @@ -1625,7 +1624,6 @@ public:
defaultEntrypointHandlers(kj::mv(defaultEntrypointHandlers)),
waitUntilTasks(*this),
abortActorsCallback(kj::mv(abortActorsCallback)),
maybeTracer(kj::mv(maybeTracer)),
loggingServices(kj::mv(loggingServices)),
lookupService(kj::mv(lookupService)) {

Expand Down Expand Up @@ -1685,33 +1683,34 @@ public:
kj::Maybe<kj::StringPtr> entrypointName,
kj::Maybe<kj::Own<Worker::Actor>> actor = kj::none) {
TRACE_EVENT("workerd", "Server::WorkerService::startRequest()");
kj::Maybe<kj::Own<WorkerTracer>> maybeWorkerTracer = kj::none;
KJ_IF_SOME(tracer, maybeTracer) {
auto childTracer = kj::refcounted<PipelineTracer>(kj::addRef(*tracer));
// TODO(streaming-trace): Make sure the execution model here accurately reflects reality.
// e.g. this might be a durable object, or eventually a workflow.
maybeWorkerTracer = childTracer->makeWorkerTracer(PipelineLogLevel::FULL,
ExecutionModel::STATELESS, kj::none /* scriptId */, kj::none /* stableId */,
kj::none /* scriptName */, kj::none /* scriptVersion */, kj::none /* dispatchNamespace */,
nullptr /* scriptTags */, kj::none /* entrypoint */);

kj::Vector<kj::Own<WorkerInterface>> tailWorkers;
for (auto& svc: loggingServices) {
auto& service = lookupService(*svc, "looking logging service");
KJ_ASSERT(&service != this, "A worker currently cannot log to itself");
tailWorkers.add(service.startRequest({}));
}
waitUntilTasks.add(childTracer->onComplete().then(
kj::coCapture([tailWorkers = kj::mv(tailWorkers)](
kj::Array<kj::Own<Trace>> traces) mutable -> kj::Promise<void> {
for (auto& worker: tailWorkers) {
auto event = kj::heap<workerd::api::TraceCustomEventImpl>(
workerd::api::TraceCustomEventImpl::TYPE, mapAddRef(traces));
co_await worker->customEvent(kj::mv(event)).ignoreResult();
}
co_return;
})));
};

// We'll create a WorkerTracer if, and only if, there are tail workers configured.
auto maybeWorkerTracer = ([&]() -> kj::Maybe<kj::Own<WorkerTracer>> {
if (loggingServices.size() > 0) {
auto tracer = kj::refcounted<PipelineTracer>();

waitUntilTasks.add(dispatchTraces(tracer->onComplete(), KJ_MAP(svc, loggingServices) {
auto& service = lookupService(*svc, "resolving logging service");
KJ_ASSERT(&service != this, "A worker currently cannot log to itself");
return service.startRequest({});
}));

// The PipelineTracer is a kj::Refcounted. The call to makeWorkerTracer
// will cause the created WorkerTracer to retain a strong reference to
// the PipelineTracer so we don't need to arrange for it to stay alive.
// The instance will be dropped when the WorkerTracer is dropped, causing
// the onComplete promise to be fulfilled, allowing the collected traces
// to be dispatched.

// TODO(streaming-trace): Make sure the execution model here accurately reflects reality.
// e.g. this might be a durable object, or eventually a workflow.
return tracer->makeWorkerTracer(PipelineLogLevel::FULL, ExecutionModel::STATELESS, kj::none,
kj::none, kj::none, kj::none, kj::none, nullptr, kj::none);
};

return kj::none;
})();

return newWorkerEntrypoint(threadContext, kj::atomicAddRef(*worker), entrypointName,
kj::mv(actor), kj::Own<LimitEnforcer>(this, kj::NullDisposer::instance),
{}, // ioContextDependency
Expand Down Expand Up @@ -2208,6 +2207,16 @@ public:
};

private:
kj::Promise<void> dispatchTraces(kj::Promise<kj::Array<kj::Own<Trace>>> promise,
kj::Array<kj::Own<WorkerInterface>> tailWorkers) {
auto traces = co_await promise;
for (auto& worker: tailWorkers) {
auto event = kj::heap<workerd::api::TraceCustomEventImpl>(
workerd::api::TraceCustomEventImpl::TYPE, mapAddRef(traces));
co_await worker->customEvent(kj::mv(event)).ignoreResult();
}
}

class EntrypointService final: public Service {
public:
EntrypointService(
Expand Down Expand Up @@ -2241,7 +2250,6 @@ private:
kj::HashMap<kj::StringPtr, kj::Own<ActorNamespace>> actorNamespaces;
kj::TaskSet waitUntilTasks;
AbortActorsCallback abortActorsCallback;
kj::Maybe<kj::Own<PipelineTracer>> maybeTracer;
kj::Array<kj::Own<config::ServiceDesignator::Reader>> loggingServices;
LookupServiceCallback lookupService;

Expand Down Expand Up @@ -3249,31 +3257,27 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name,
};

kj::Vector<kj::Own<config::ServiceDesignator::Reader>> loggingServices;
auto maybeMakeTracer = [conf, &loggingServices]() -> kj::Maybe<kj::Own<PipelineTracer>> {
auto logging = conf.getLogging();
switch (logging.which()) {
case config::Worker::Logging::Which::NONE:
return kj::none;
case config::Worker::Logging::Which::TO_SERVICE: {
loggingServices.add(capnp::clone(logging.getToService()));
break;
}
case config::Worker::Logging::Which::TO_SERVICES: {
for (auto svc: logging.getToServices()) {
loggingServices.add(capnp::clone(svc));
}
break;
auto logging = conf.getLogging();
switch (conf.getLogging().which()) {
case config::Worker::Logging::Which::NONE:
// Nothing to do here.
break;
case config::Worker::Logging::Which::TO_SERVICE: {
loggingServices.add(capnp::clone(logging.getToService()));
break;
}
case config::Worker::Logging::Which::TO_SERVICES: {
for (auto svc: logging.getToServices()) {
loggingServices.add(capnp::clone(svc));
}
break;
}

return kj::refcounted<PipelineTracer>(kj::none);
};
auto tracer = maybeMakeTracer();
}

return kj::heap<WorkerService>(globalContext->threadContext, kj::mv(worker),
kj::mv(errorReporter.defaultEntrypoint), kj::mv(errorReporter.namedEntrypoints),
localActorConfigs, kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors),
kj::mv(tracer), loggingServices.releaseAsArray(),
loggingServices.releaseAsArray(),
[this](const config::ServiceDesignator::Reader& service, kj::StringPtr context) -> Service& {
return lookupService(service, kj::str(context));
});
Expand Down

0 comments on commit c792f66

Please sign in to comment.