Skip to content

Commit

Permalink
Add ability to prefix worker_id in config
Browse files Browse the repository at this point in the history
WorkerId can now be prefixed with a config. This will cause all
internal WorkerIds to be prefixed with this config followed by
a generated UUIDv6.
  • Loading branch information
allada committed Feb 6, 2025
1 parent 60b0049 commit 192c186
Show file tree
Hide file tree
Showing 20 changed files with 218 additions and 156 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,8 @@ pub struct UploadActionResultConfig {
#[serde(deny_unknown_fields)]
pub struct LocalWorkerConfig {
/// Name of the worker. This is give a more friendly name to a worker for logging
/// and metric publishing.
/// and metric publishing. This is also the prefix of the worker id
/// (ie: "{name}{uuidv6}").
/// Default: {Index position in the workers list}
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
pub name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ service WorkerApi {
/// this worker supports. The response must be listened on the client
/// side for updates from the server. The first item sent will always be
/// a ConnectionResult, after that it is undefined.
rpc ConnectWorker(SupportedProperties) returns (stream UpdateForWorker);
rpc ConnectWorker(ConnectWorkerRequest) returns (stream UpdateForWorker);

/// Message used to let the scheduler know that it is still alive as
/// well as check to see if the scheduler is still alive. The scheduler
Expand Down Expand Up @@ -74,8 +74,8 @@ message GoingAwayRequest {
}

/// Represents the initial request sent to the scheduler informing the
/// scheduler about this worker's capabilities.
message SupportedProperties {
/// scheduler about this worker's capabilities and metadata.
message ConnectWorkerRequest {
/// The list of properties this worker can support. The exact
/// implementation is driven by the configuration matrix between the
/// worker and scheduler.
Expand All @@ -87,7 +87,13 @@ message SupportedProperties {
/// The details on how to use this property can be found here:
/// https://github.com/TraceMachina/nativelink/blob/3147265047544572e3483c985e4aab0f9fdded38/nativelink-config/src/cas_server.rs
repeated build.bazel.remote.execution.v2.Platform.Property properties = 1;
reserved 2; // NextId.

/// Prefix to use for worker IDs. This is primarily used for debugging
/// or for other systems to identify workers. The scheduler will always
/// append this prefix to the assigned worker_id followed by a UUIDv6.
string worker_id_prefix = 2;

reserved 3; // NextId.
}

/// The result of an ExecutionRequest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ pub struct GoingAwayRequest {
pub worker_id: ::prost::alloc::string::String,
}
/// / Represents the initial request sent to the scheduler informing the
/// / scheduler about this worker's capabilities.
/// / scheduler about this worker's capabilities and metadata.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SupportedProperties {
pub struct ConnectWorkerRequest {
/// / The list of properties this worker can support. The exact
/// / implementation is driven by the configuration matrix between the
/// / worker and scheduler.
Expand All @@ -45,6 +45,11 @@ pub struct SupportedProperties {
pub properties: ::prost::alloc::vec::Vec<
super::super::super::super::super::build::bazel::remote::execution::v2::platform::Property,
>,
/// / Prefix to use for worker IDs. This is primarily used for debugging
/// / or for other systems to identify workers. The scheduler will always
/// / append this prefix to the assigned worker_id followed by a UUIDv6.
#[prost(string, tag = "2")]
pub worker_id_prefix: ::prost::alloc::string::String,
}
/// / The result of an ExecutionRequest.
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -261,7 +266,7 @@ pub mod worker_api_client {
/// / a ConnectionResult, after that it is undefined.
pub async fn connect_worker(
&mut self,
request: impl tonic::IntoRequest<super::SupportedProperties>,
request: impl tonic::IntoRequest<super::ConnectWorkerRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::UpdateForWorker>>,
tonic::Status,
Expand Down Expand Up @@ -410,7 +415,7 @@ pub mod worker_api_server {
/// / a ConnectionResult, after that it is undefined.
async fn connect_worker(
&self,
request: tonic::Request<super::SupportedProperties>,
request: tonic::Request<super::ConnectWorkerRequest>,
) -> std::result::Result<
tonic::Response<Self::ConnectWorkerStream>,
tonic::Status,
Expand Down Expand Up @@ -533,7 +538,7 @@ pub mod worker_api_server {
struct ConnectWorkerSvc<T: WorkerApi>(pub Arc<T>);
impl<
T: WorkerApi,
> tonic::server::ServerStreamingService<super::SupportedProperties>
> tonic::server::ServerStreamingService<super::ConnectWorkerRequest>
for ConnectWorkerSvc<T> {
type Response = super::UpdateForWorker;
type ResponseStream = T::ConnectWorkerStream;
Expand All @@ -543,7 +548,7 @@ pub mod worker_api_server {
>;
fn call(
&mut self,
request: tonic::Request<super::SupportedProperties>,
request: tonic::Request<super::ConnectWorkerRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
Expand Down
12 changes: 6 additions & 6 deletions nativelink-scheduler/src/api_worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl ApiWorkerSchedulerImpl {
for operation_id in worker.running_action_infos.keys() {
if self
.operation_keep_alive_tx
.send((operation_id.clone(), *worker_id))
.send((operation_id.clone(), worker_id.clone()))
.is_err()
{
event!(
Expand All @@ -128,8 +128,8 @@ impl ApiWorkerSchedulerImpl {
/// Adds a worker to the pool.
/// Note: This function will not do any task matching.
fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
let worker_id = worker.id;
self.workers.put(worker_id, worker);
let worker_id = worker.id.clone();
self.workers.put(worker_id.clone(), worker);

// Worker is not cloneable, and we do not want to send the initial connection results until
// we have added it to the map, or we might get some strange race conditions due to the way
Expand Down Expand Up @@ -189,7 +189,7 @@ impl ApiWorkerSchedulerImpl {
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
}),
};
workers_iter.map(|(_, w)| &w.id).copied()
workers_iter.map(|(_, w)| w.id.clone())
}

async fn update_action(
Expand Down Expand Up @@ -451,7 +451,7 @@ impl WorkerScheduler for ApiWorkerScheduler {

async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
let mut inner = self.inner.lock().await;
let worker_id = worker.id;
let worker_id = worker.id.clone();
let result = inner
.add_worker(worker)
.err_tip(|| "Error while adding worker, removing from pool");
Expand Down Expand Up @@ -505,7 +505,7 @@ impl WorkerScheduler for ApiWorkerScheduler {
.rev()
.map_while(|(worker_id, worker)| {
if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s {
Some(*worker_id)
Some(worker_id.clone())
} else {
None
}
Expand Down
4 changes: 2 additions & 2 deletions nativelink-scheduler/src/awaited_action_db/awaited_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ impl AwaitedAction {
self.maybe_origin_metadata.as_ref()
}

pub(crate) fn worker_id(&self) -> Option<WorkerId> {
self.worker_id
pub(crate) fn worker_id(&self) -> Option<&WorkerId> {
self.worker_id.as_ref()
}

pub(crate) fn last_worker_updated_timestamp(&self) -> SystemTime {
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ where
}
}

if filter.worker_id.is_some() && filter.worker_id != awaited_action.worker_id() {
if filter.worker_id.is_some() && filter.worker_id.as_ref() != awaited_action.worker_id() {
return false;
}

Expand Down Expand Up @@ -500,7 +500,7 @@ where
// worker that was assigned.
if awaited_action.worker_id().is_some()
&& maybe_worker_id.is_some()
&& maybe_worker_id != awaited_action.worker_id().as_ref()
&& maybe_worker_id != awaited_action.worker_id()
{
// If another worker is already assigned to the action, another
// worker probably picked up the action. We should not update the
Expand Down Expand Up @@ -575,7 +575,7 @@ where
// which worker sent the update.
awaited_action.set_worker_id(None, now);
} else {
awaited_action.set_worker_id(maybe_worker_id.copied(), now);
awaited_action.set_worker_id(maybe_worker_id.cloned(), now);
}
awaited_action.worker_set_state(
Arc::new(ActionState {
Expand Down
6 changes: 3 additions & 3 deletions nativelink-scheduler/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Worker {
send_msg_to_worker(
&mut self.tx,
update_for_worker::Update::ConnectionResult(ConnectionResult {
worker_id: self.id.to_string(),
worker_id: self.id.clone().into(),
}),
)
.err_tip(|| format!("Failed to send ConnectionResult to worker : {}", self.id))
Expand All @@ -181,7 +181,7 @@ impl Worker {

pub fn keep_alive(&mut self) -> Result<(), Error> {
let tx = &mut self.tx;
let id = self.id;
let id = &self.id;
self.metrics.keep_alive.wrap(move || {
send_msg_to_worker(tx, update_for_worker::Update::KeepAlive(()))
.err_tip(|| format!("Failed to send KeepAlive to worker : {id}"))
Expand All @@ -196,7 +196,7 @@ impl Worker {
let tx = &mut self.tx;
let worker_platform_properties = &mut self.platform_properties;
let running_action_infos = &mut self.running_action_infos;
let worker_id = self.id.to_string();
let worker_id = self.id.clone().into();
self.metrics
.run_action
.wrap(async move {
Expand Down
Loading

0 comments on commit 192c186

Please sign in to comment.