Skip to content

Commit

Permalink
Add OriginEvent for scheduler scheduling action (#1574)
Browse files Browse the repository at this point in the history
* Add OriginEvent for scheduler scheduling action

Adds an OriginEvent for when the scheduler schedules an action.
This can be used for external systems to give visuals to what
is happening inside the system or for debugging.

* Expose OriginContextCollector to scheduler's matching engine

Ensures OriginContext data is available to the matching engine
and when dealing with an action, the first user to invoke the
action will be associated with the creation of the operation.
  • Loading branch information
allada authored Feb 6, 2025
1 parent a8c7339 commit 60b0049
Show file tree
Hide file tree
Showing 27 changed files with 544 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ syntax = "proto3";

package com.github.trace_machina.nativelink.events;

import "com/github/trace_machina/nativelink/remote_execution/worker_api.proto";
import "build/bazel/remote/execution/v2/remote_execution.proto";
import "google/bytestream/bytestream.proto";
import "google/devtools/build/v1/publish_build_event.proto";
Expand Down Expand Up @@ -81,7 +82,10 @@ message RequestEvent {
google.bytestream.QueryWriteStatusRequest query_write_status_request = 10;
build.bazel.remote.execution.v2.ExecuteRequest execute_request = 11;
build.bazel.remote.execution.v2.WaitExecutionRequest wait_execution_request = 12;

com.github.trace_machina.nativelink.remote_execution.StartExecute scheduler_start_execute = 13;
}
reserved 14; // NextId.
}

message ResponseEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,14 @@ message StartExecute {
/// of the ActionResult.
google.protobuf.Timestamp queued_timestamp = 3;

reserved 5; // NextId.
/// The post-computed platform properties that the scheduler has reserved for
/// the action.
build.bazel.remote.execution.v2.Platform platform = 5;

/// The ID of the worker that is executing the action.
string worker_id = 6;

reserved 7; // NextId.
}

/// This is a special message used to save actions into the CAS that can be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct WriteRequestOverride {
pub struct RequestEvent {
#[prost(
oneof = "request_event::Event",
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12"
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13"
)]
pub event: ::core::option::Option<request_event::Event>,
}
Expand Down Expand Up @@ -152,6 +152,8 @@ pub mod request_event {
WaitExecutionRequest(
super::super::super::super::super::super::build::bazel::remote::execution::v2::WaitExecutionRequest,
),
#[prost(message, tag = "13")]
SchedulerStartExecute(super::super::remote_execution::StartExecute),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ pub struct StartExecute {
/// / of the ActionResult.
#[prost(message, optional, tag = "3")]
pub queued_timestamp: ::core::option::Option<::prost_types::Timestamp>,
/// / The post-computed platform properties that the scheduler has reserved for
/// / the action.
#[prost(message, optional, tag = "5")]
pub platform: ::core::option::Option<
super::super::super::super::super::build::bazel::remote::execution::v2::Platform,
>,
/// / The ID of the worker that is executing the action.
#[prost(string, tag = "6")]
pub worker_id: ::prost::alloc::string::String,
}
/// / This is a special message used to save actions into the CAS that can be used
/// / by programs like bb_browswer to inspect the history of a build.
Expand Down
9 changes: 5 additions & 4 deletions nativelink-scheduler/src/api_worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl ApiWorkerSchedulerImpl {
let was_paused = !worker.can_accept_work();

// Note: We need to run this before dealing with backpressure logic.
let complete_action_res = worker.complete_action(operation_id);
let complete_action_res = worker.complete_action(operation_id).await;

// Only pause if there's an action still waiting that will unpause.
if (was_paused || due_to_backpressure) && worker.has_actions() {
Expand All @@ -273,8 +273,9 @@ impl ApiWorkerSchedulerImpl {
action_info: ActionInfoWithProps,
) -> Result<(), Error> {
if let Some(worker) = self.workers.get_mut(&worker_id) {
let notify_worker_result =
worker.notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())));
let notify_worker_result = worker
.notify_update(WorkerUpdate::RunAction((operation_id, action_info.clone())))
.await;

if notify_worker_result.is_err() {
event!(
Expand Down Expand Up @@ -314,7 +315,7 @@ impl ApiWorkerSchedulerImpl {
let mut result = Ok(());
if let Some(mut worker) = self.remove_worker(worker_id) {
// We don't care if we fail to send message to worker, this is only a best attempt.
let _ = worker.notify_update(WorkerUpdate::Disconnect);
let _ = worker.notify_update(WorkerUpdate::Disconnect).await;
for (operation_id, _) in worker.running_action_infos.drain() {
result = result.merge(
self.worker_state_manager
Expand Down
15 changes: 15 additions & 0 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use nativelink_metric::{
use nativelink_util::action_messages::{
ActionInfo, ActionStage, ActionState, OperationId, WorkerId,
};
use nativelink_util::origin_context::ActiveOriginContext;
use nativelink_util::origin_event::{OriginMetadata, ORIGIN_EVENT_COLLECTOR};
use serde::{Deserialize, Serialize};
use static_assertions::{assert_eq_size, const_assert, const_assert_eq};

Expand Down Expand Up @@ -78,6 +80,9 @@ pub struct AwaitedAction {
#[metric(help = "The state of the AwaitedAction")]
state: Arc<ActionState>,

/// The origin metadata of the action.
maybe_origin_metadata: Option<OriginMetadata>,

/// Number of attempts the job has been tried.
#[metric(help = "The number of attempts the AwaitedAction has been tried")]
pub attempts: usize,
Expand All @@ -100,6 +105,11 @@ impl AwaitedAction {
client_operation_id: operation_id.clone(),
action_digest: action_info.unique_qualifier.digest(),
});
let maybe_origin_metadata = ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
.ok()
.flatten()
.map(|v| v.metadata.clone());

Self {
version: AwaitedActionVersion(0),
action_info,
Expand All @@ -108,6 +118,7 @@ impl AwaitedAction {
attempts: 0,
last_worker_updated_timestamp: now,
last_client_keepalive_timestamp: now,
maybe_origin_metadata,
worker_id: None,
state,
}
Expand Down Expand Up @@ -141,6 +152,10 @@ impl AwaitedAction {
&self.state
}

pub(crate) fn maybe_origin_metadata(&self) -> Option<&OriginMetadata> {
self.maybe_origin_metadata.as_ref()
}

pub(crate) fn worker_id(&self) -> Option<WorkerId> {
self.worker_id
}
Expand Down
25 changes: 20 additions & 5 deletions nativelink-scheduler/src/cache_lookup_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProv
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
};
use nativelink_util::origin_context::ActiveOriginContext;
use nativelink_util::origin_event::{OriginMetadata, ORIGIN_EVENT_COLLECTOR};
use nativelink_util::store_trait::Store;
use parking_lot::{Mutex, MutexGuard};
use scopeguard::guard;
Expand Down Expand Up @@ -109,27 +111,34 @@ fn subscribe_to_existing_action(

struct CacheLookupActionStateResult {
action_state: Arc<ActionState>,
maybe_origin_metadata: Option<OriginMetadata>,
change_called: bool,
}

#[async_trait]
impl ActionStateResult for CacheLookupActionStateResult {
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
Ok(self.action_state.clone())
async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
Ok((
self.action_state.clone(),
self.maybe_origin_metadata.clone(),
))
}

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
if self.change_called {
return Err(make_err!(
Code::Internal,
"CacheLookupActionStateResult::changed called twice"
));
}
self.change_called = true;
Ok(self.action_state.clone())
Ok((
self.action_state.clone(),
self.maybe_origin_metadata.clone(),
))
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
// TODO(allada) We should probably remove as_action_info()
// or implement it properly.
return Err(make_err!(
Expand Down Expand Up @@ -251,11 +260,17 @@ impl CacheLookupScheduler {
action_digest: action_info.unique_qualifier.digest(),
};

let maybe_origin_metadata =
ActiveOriginContext::get_value(&ORIGIN_EVENT_COLLECTOR)
.ok()
.flatten()
.map(|v| v.metadata.clone());
for (client_operation_id, pending_tx) in pending_txs {
action_state.client_operation_id = client_operation_id;
// Ignore errors here, as the other end may have hung up.
let _ = pending_tx.send(Ok(Box::new(CacheLookupActionStateResult {
action_state: Arc::new(action_state.clone()),
maybe_origin_metadata: maybe_origin_metadata.clone(),
change_called: false,
})));
}
Expand Down
30 changes: 21 additions & 9 deletions nativelink-scheduler/src/default_scheduler_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use nativelink_config::schedulers::{
};
use nativelink_config::stores::EvictionPolicy;
use nativelink_error::{make_input_err, Error, ResultExt};
use nativelink_proto::com::github::trace_machina::nativelink::events::OriginEvent;
use nativelink_store::redis_store::RedisStore;
use nativelink_store::store_manager::StoreManager;
use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::operation_state_manager::ClientStateManager;
use tokio::sync::Notify;
use tokio::sync::{mpsc, Notify};

use crate::cache_lookup_scheduler::CacheLookupScheduler;
use crate::grpc_scheduler::GrpcScheduler;
Expand All @@ -46,25 +47,27 @@ pub type SchedulerFactoryResults = (
pub fn scheduler_factory(
spec: &SchedulerSpec,
store_manager: &StoreManager,
maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
) -> Result<SchedulerFactoryResults, Error> {
inner_scheduler_factory(spec, store_manager)
inner_scheduler_factory(spec, store_manager, maybe_origin_event_tx)
}

fn inner_scheduler_factory(
spec: &SchedulerSpec,
store_manager: &StoreManager,
maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
) -> Result<SchedulerFactoryResults, Error> {
let scheduler: SchedulerFactoryResults = match spec {
SchedulerSpec::simple(spec) => {
simple_scheduler_factory(spec, store_manager, SystemTime::now)?
simple_scheduler_factory(spec, store_manager, SystemTime::now, maybe_origin_event_tx)?
}
SchedulerSpec::grpc(spec) => (Some(Arc::new(GrpcScheduler::new(spec)?)), None),
SchedulerSpec::cache_lookup(spec) => {
let ac_store = store_manager
.get_store(&spec.ac_store)
.err_tip(|| format!("'ac_store': '{}' does not exist", spec.ac_store))?;
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&spec.scheduler, store_manager)
inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)
.err_tip(|| "In nested CacheLookupScheduler construction")?;
let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new(
ac_store,
Expand All @@ -74,7 +77,7 @@ fn inner_scheduler_factory(
}
SchedulerSpec::property_modifier(spec) => {
let (action_scheduler, worker_scheduler) =
inner_scheduler_factory(&spec.scheduler, store_manager)
inner_scheduler_factory(&spec.scheduler, store_manager, maybe_origin_event_tx)
.err_tip(|| "In nested PropertyModifierScheduler construction")?;
let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new(
spec,
Expand All @@ -91,6 +94,7 @@ fn simple_scheduler_factory(
spec: &SimpleSpec,
store_manager: &StoreManager,
now_fn: fn() -> SystemTime,
maybe_origin_event_tx: Option<&mpsc::Sender<OriginEvent>>,
) -> Result<SchedulerFactoryResults, Error> {
match spec
.experimental_backend
Expand All @@ -104,8 +108,12 @@ fn simple_scheduler_factory(
&task_change_notify.clone(),
SystemTime::now,
);
let (action_scheduler, worker_scheduler) =
SimpleScheduler::new(spec, awaited_action_db, task_change_notify);
let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
spec,
awaited_action_db,
task_change_notify,
maybe_origin_event_tx.cloned(),
);
Ok((Some(action_scheduler), Some(worker_scheduler)))
}
ExperimentalSimpleSchedulerBackend::redis(redis_config) => {
Expand Down Expand Up @@ -134,8 +142,12 @@ fn simple_scheduler_factory(
Default::default,
)
.err_tip(|| "In state_manager_factory::redis_state_manager")?;
let (action_scheduler, worker_scheduler) =
SimpleScheduler::new(spec, awaited_action_db, task_change_notify);
let (action_scheduler, worker_scheduler) = SimpleScheduler::new(
spec,
awaited_action_db,
task_change_notify,
maybe_origin_event_tx.cloned(),
);
Ok((Some(action_scheduler), Some(worker_scheduler)))
}
}
Expand Down
15 changes: 10 additions & 5 deletions nativelink-scheduler/src/grpc_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProv
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
};
use nativelink_util::origin_event::OriginMetadata;
use nativelink_util::retry::{Retrier, RetryResult};
use nativelink_util::{background_spawn, tls_utils};
use parking_lot::Mutex;
Expand All @@ -55,13 +56,15 @@ struct GrpcActionStateResult {

#[async_trait]
impl ActionStateResult for GrpcActionStateResult {
async fn as_state(&self) -> Result<Arc<ActionState>, Error> {
async fn as_state(&self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
let mut action_state = self.rx.borrow().clone();
Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
Ok(action_state)
// TODO(allada) We currently don't support OriginMetadata in this implementation, but
// we should.
Ok((action_state, None))
}

async fn changed(&mut self) -> Result<Arc<ActionState>, Error> {
async fn changed(&mut self) -> Result<(Arc<ActionState>, Option<OriginMetadata>), Error> {
self.rx.changed().await.map_err(|_| {
make_err!(
Code::Internal,
Expand All @@ -70,10 +73,12 @@ impl ActionStateResult for GrpcActionStateResult {
})?;
let mut action_state = self.rx.borrow().clone();
Arc::make_mut(&mut action_state).client_operation_id = self.client_operation_id.clone();
Ok(action_state)
// TODO(allada) We currently don't support OriginMetadata in this implementation, but
// we should.
Ok((action_state, None))
}

async fn as_action_info(&self) -> Result<Arc<ActionInfo>, Error> {
async fn as_action_info(&self) -> Result<(Arc<ActionInfo>, Option<OriginMetadata>), Error> {
// TODO(allada) We should probably remove as_action_info()
// or implement it properly.
return Err(make_err!(
Expand Down
Loading

0 comments on commit 60b0049

Please sign in to comment.