From e753b8d4dc84711fe8b656690ce9890ccc2e85c9 Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Thu, 6 Feb 2025 16:02:38 -0600 Subject: [PATCH] Add ability to prefix worker_id in config (#1578) WorkerId can now be prefixed with a config. This will cause all internal WorkerIds to be prefixed with this config followed by a generated UUIDv6. --- Cargo.lock | 1 + nativelink-config/src/cas_server.rs | 3 +- .../remote_execution/worker_api.proto | 14 +- ..._machina.nativelink.remote_execution.pb.rs | 17 +- .../src/api_worker_scheduler.rs | 12 +- .../src/awaited_action_db/awaited_action.rs | 4 +- .../src/simple_scheduler_state_manager.rs | 6 +- nativelink-scheduler/src/worker.rs | 6 +- .../tests/simple_scheduler_test.rs | 160 +++++++++++------- nativelink-service/BUILD.bazel | 1 + nativelink-service/Cargo.toml | 1 + nativelink-service/src/worker_api_server.rs | 38 +++-- .../tests/worker_api_server_test.rs | 13 +- nativelink-util/src/action_messages.rs | 32 ++-- nativelink-worker/src/local_worker.rs | 9 +- .../src/worker_api_client_wrapper.rs | 6 +- nativelink-worker/src/worker_utils.rs | 10 +- nativelink-worker/tests/local_worker_test.rs | 27 +-- .../tests/utils/local_worker_test_utils.rs | 8 +- src/bin/nativelink.rs | 6 +- 20 files changed, 218 insertions(+), 156 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 435f2b4aa..751fed588 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2147,6 +2147,7 @@ dependencies = [ "pretty_assertions", "prost", "prost-types", + "rand", "serde_json5", "tokio", "tokio-stream", diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index e1f325985..f64dbe7e2 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -605,7 +605,8 @@ pub struct UploadActionResultConfig { #[serde(deny_unknown_fields)] pub struct LocalWorkerConfig { /// Name of the worker. This is give a more friendly name to a worker for logging - /// and metric publishing. + /// and metric publishing. This is also the prefix of the worker id + /// (ie: "{name}{uuidv6}"). /// Default: {Index position in the workers list} #[serde(default, deserialize_with = "convert_string_with_shellexpand")] pub name: String, diff --git a/nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto b/nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto index d62ed69e0..e37c6bd7f 100644 --- a/nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto +++ b/nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto @@ -33,7 +33,7 @@ service WorkerApi { /// this worker supports. The response must be listened on the client /// side for updates from the server. The first item sent will always be /// a ConnectionResult, after that it is undefined. - rpc ConnectWorker(SupportedProperties) returns (stream UpdateForWorker); + rpc ConnectWorker(ConnectWorkerRequest) returns (stream UpdateForWorker); /// Message used to let the scheduler know that it is still alive as /// well as check to see if the scheduler is still alive. The scheduler @@ -74,8 +74,8 @@ message GoingAwayRequest { } /// Represents the initial request sent to the scheduler informing the -/// scheduler about this worker's capabilities. -message SupportedProperties { +/// scheduler about this worker's capabilities and metadata. +message ConnectWorkerRequest { /// The list of properties this worker can support. The exact /// implementation is driven by the configuration matrix between the /// worker and scheduler. @@ -87,7 +87,13 @@ message SupportedProperties { /// The details on how to use this property can be found here: /// https://github.com/TraceMachina/nativelink/blob/3147265047544572e3483c985e4aab0f9fdded38/nativelink-config/src/cas_server.rs repeated build.bazel.remote.execution.v2.Platform.Property properties = 1; - reserved 2; // NextId. + + /// Prefix to use for worker IDs. This is primarily used for debugging + /// or for other systems to identify workers. The scheduler will always + /// append this prefix to the assigned worker_id followed by a UUIDv6. + string worker_id_prefix = 2; + + reserved 3; // NextId. } /// The result of an ExecutionRequest. diff --git a/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs b/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs index bc4622560..b2fe946c3 100644 --- a/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs +++ b/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs @@ -28,9 +28,9 @@ pub struct GoingAwayRequest { pub worker_id: ::prost::alloc::string::String, } /// / Represents the initial request sent to the scheduler informing the -/// / scheduler about this worker's capabilities. +/// / scheduler about this worker's capabilities and metadata. #[derive(Clone, PartialEq, ::prost::Message)] -pub struct SupportedProperties { +pub struct ConnectWorkerRequest { /// / The list of properties this worker can support. The exact /// / implementation is driven by the configuration matrix between the /// / worker and scheduler. @@ -45,6 +45,11 @@ pub struct SupportedProperties { pub properties: ::prost::alloc::vec::Vec< super::super::super::super::super::build::bazel::remote::execution::v2::platform::Property, >, + /// / Prefix to use for worker IDs. This is primarily used for debugging + /// / or for other systems to identify workers. The scheduler will always + /// / append this prefix to the assigned worker_id followed by a UUIDv6. + #[prost(string, tag = "2")] + pub worker_id_prefix: ::prost::alloc::string::String, } /// / The result of an ExecutionRequest. #[derive(Clone, PartialEq, ::prost::Message)] @@ -261,7 +266,7 @@ pub mod worker_api_client { /// / a ConnectionResult, after that it is undefined. pub async fn connect_worker( &mut self, - request: impl tonic::IntoRequest, + request: impl tonic::IntoRequest, ) -> std::result::Result< tonic::Response>, tonic::Status, @@ -410,7 +415,7 @@ pub mod worker_api_server { /// / a ConnectionResult, after that it is undefined. async fn connect_worker( &self, - request: tonic::Request, + request: tonic::Request, ) -> std::result::Result< tonic::Response, tonic::Status, @@ -533,7 +538,7 @@ pub mod worker_api_server { struct ConnectWorkerSvc(pub Arc); impl< T: WorkerApi, - > tonic::server::ServerStreamingService + > tonic::server::ServerStreamingService for ConnectWorkerSvc { type Response = super::UpdateForWorker; type ResponseStream = T::ConnectWorkerStream; @@ -543,7 +548,7 @@ pub mod worker_api_server { >; fn call( &mut self, - request: tonic::Request, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { diff --git a/nativelink-scheduler/src/api_worker_scheduler.rs b/nativelink-scheduler/src/api_worker_scheduler.rs index e689a44dc..ec5366d48 100644 --- a/nativelink-scheduler/src/api_worker_scheduler.rs +++ b/nativelink-scheduler/src/api_worker_scheduler.rs @@ -111,7 +111,7 @@ impl ApiWorkerSchedulerImpl { for operation_id in worker.running_action_infos.keys() { if self .operation_keep_alive_tx - .send((operation_id.clone(), *worker_id)) + .send((operation_id.clone(), worker_id.clone())) .is_err() { event!( @@ -128,8 +128,8 @@ impl ApiWorkerSchedulerImpl { /// Adds a worker to the pool. /// Note: This function will not do any task matching. fn add_worker(&mut self, worker: Worker) -> Result<(), Error> { - let worker_id = worker.id; - self.workers.put(worker_id, worker); + let worker_id = worker.id.clone(); + self.workers.put(worker_id.clone(), worker); // Worker is not cloneable, and we do not want to send the initial connection results until // we have added it to the map, or we might get some strange race conditions due to the way @@ -189,7 +189,7 @@ impl ApiWorkerSchedulerImpl { w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties) }), }; - workers_iter.map(|(_, w)| &w.id).copied() + workers_iter.map(|(_, w)| w.id.clone()) } async fn update_action( @@ -451,7 +451,7 @@ impl WorkerScheduler for ApiWorkerScheduler { async fn add_worker(&self, worker: Worker) -> Result<(), Error> { let mut inner = self.inner.lock().await; - let worker_id = worker.id; + let worker_id = worker.id.clone(); let result = inner .add_worker(worker) .err_tip(|| "Error while adding worker, removing from pool"); @@ -505,7 +505,7 @@ impl WorkerScheduler for ApiWorkerScheduler { .rev() .map_while(|(worker_id, worker)| { if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s { - Some(*worker_id) + Some(worker_id.clone()) } else { None } diff --git a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs index c654df85f..dc7be982a 100644 --- a/nativelink-scheduler/src/awaited_action_db/awaited_action.rs +++ b/nativelink-scheduler/src/awaited_action_db/awaited_action.rs @@ -156,8 +156,8 @@ impl AwaitedAction { self.maybe_origin_metadata.as_ref() } - pub(crate) fn worker_id(&self) -> Option { - self.worker_id + pub(crate) fn worker_id(&self) -> Option<&WorkerId> { + self.worker_id.as_ref() } pub(crate) fn last_worker_updated_timestamp(&self) -> SystemTime { diff --git a/nativelink-scheduler/src/simple_scheduler_state_manager.rs b/nativelink-scheduler/src/simple_scheduler_state_manager.rs index c319901be..71b8d9264 100644 --- a/nativelink-scheduler/src/simple_scheduler_state_manager.rs +++ b/nativelink-scheduler/src/simple_scheduler_state_manager.rs @@ -354,7 +354,7 @@ where } } - if filter.worker_id.is_some() && filter.worker_id != awaited_action.worker_id() { + if filter.worker_id.is_some() && filter.worker_id.as_ref() != awaited_action.worker_id() { return false; } @@ -500,7 +500,7 @@ where // worker that was assigned. if awaited_action.worker_id().is_some() && maybe_worker_id.is_some() - && maybe_worker_id != awaited_action.worker_id().as_ref() + && maybe_worker_id != awaited_action.worker_id() { // If another worker is already assigned to the action, another // worker probably picked up the action. We should not update the @@ -575,7 +575,7 @@ where // which worker sent the update. awaited_action.set_worker_id(None, now); } else { - awaited_action.set_worker_id(maybe_worker_id.copied(), now); + awaited_action.set_worker_id(maybe_worker_id.cloned(), now); } awaited_action.worker_set_state( Arc::new(ActionState { diff --git a/nativelink-scheduler/src/worker.rs b/nativelink-scheduler/src/worker.rs index 2a96f72ef..131d8762b 100644 --- a/nativelink-scheduler/src/worker.rs +++ b/nativelink-scheduler/src/worker.rs @@ -160,7 +160,7 @@ impl Worker { send_msg_to_worker( &mut self.tx, update_for_worker::Update::ConnectionResult(ConnectionResult { - worker_id: self.id.to_string(), + worker_id: self.id.clone().into(), }), ) .err_tip(|| format!("Failed to send ConnectionResult to worker : {}", self.id)) @@ -181,7 +181,7 @@ impl Worker { pub fn keep_alive(&mut self) -> Result<(), Error> { let tx = &mut self.tx; - let id = self.id; + let id = &self.id; self.metrics.keep_alive.wrap(move || { send_msg_to_worker(tx, update_for_worker::Update::KeepAlive(())) .err_tip(|| format!("Failed to send KeepAlive to worker : {id}")) @@ -196,7 +196,7 @@ impl Worker { let tx = &mut self.tx; let worker_platform_properties = &mut self.platform_properties; let running_action_infos = &mut self.running_action_infos; - let worker_id = self.id.to_string(); + let worker_id = self.id.clone().into(); self.metrics .run_action .wrap(async move { diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index b11e713e9..9644d9131 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -55,7 +55,6 @@ use nativelink_util::platform_properties::{PlatformProperties, PlatformPropertyV use pretty_assertions::assert_eq; use tokio::sync::{mpsc, Notify}; use utils::scheduler_utils::{make_base_action_info, INSTANCE_NAME}; -use uuid::Uuid; mod utils { pub(crate) mod scheduler_utils; @@ -108,7 +107,7 @@ async fn verify_initial_connection_message( let expected_msg_for_worker = UpdateForWorker { update: Some(update_for_worker::Update::ConnectionResult( ConnectionResult { - worker_id: worker_id.to_string(), + worker_id: worker_id.into(), }, )), }; @@ -130,7 +129,7 @@ async fn setup_new_worker( props: PlatformProperties, ) -> Result, Error> { let (tx, mut rx) = mpsc::unbounded_channel(); - let worker = Worker::new(worker_id, props, tx, NOW_TIME); + let worker = Worker::new(worker_id.clone(), props, tx, NOW_TIME); scheduler .add_worker(worker) .await @@ -158,7 +157,7 @@ const WORKER_TIMEOUT_S: u64 = 100; #[nativelink_test] async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -176,7 +175,7 @@ async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); let mut action_listener = setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp) @@ -196,7 +195,7 @@ async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { operation_id: "Unknown Generated internally".to_string(), queued_timestamp: Some(insert_timestamp.into()), platform: Some(Platform::default()), - worker_id: worker_id.to_string(), + worker_id: worker_id.into(), })), }; let msg_for_worker = rx_from_worker.recv().await.unwrap(); @@ -229,7 +228,7 @@ async fn client_does_not_receive_update_timeout() -> Result<(), Error> { } } - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -250,7 +249,7 @@ async fn client_does_not_receive_update_timeout() -> Result<(), Error> { let action_digest = DigestInfo::new([99u8; 32], 512); let _rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; let mut action_listener = setup_action( &scheduler, action_digest, @@ -293,7 +292,7 @@ async fn client_does_not_receive_update_timeout() -> Result<(), Error> { #[nativelink_test] async fn find_executing_action() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -311,7 +310,7 @@ async fn find_executing_action() -> Result<(), Error> { let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); let action_listener = setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp) .await @@ -350,7 +349,7 @@ async fn find_executing_action() -> Result<(), Error> { operation_id: "Unknown Generated internally".to_string(), queued_timestamp: Some(insert_timestamp.into()), platform: Some(Platform::default()), - worker_id: worker_id.to_string(), + worker_id: worker_id.into(), })), }; let msg_for_worker = rx_from_worker.recv().await.unwrap(); @@ -374,8 +373,8 @@ async fn find_executing_action() -> Result<(), Error> { #[nativelink_test] async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Error> { - let worker_id1: WorkerId = WorkerId(Uuid::new_v4()); - let worker_id2: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id1 = WorkerId("worker1".to_string()); + let worker_id2 = WorkerId("worker2".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &SimpleSpec { @@ -395,8 +394,12 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err let action_digest1 = DigestInfo::new([99u8; 32], 512); let action_digest2 = DigestInfo::new([88u8; 32], 512); - let mut rx_from_worker1 = - setup_new_worker(&scheduler, worker_id1, PlatformProperties::default()).await?; + let mut rx_from_worker1 = setup_new_worker( + &scheduler, + worker_id1.clone(), + PlatformProperties::default(), + ) + .await?; let insert_timestamp1 = make_system_time(1); let mut client1_action_listener = setup_action( &scheduler, @@ -479,8 +482,12 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err }; // Add a second worker that can take jobs if the first dies. - let mut rx_from_worker2 = - setup_new_worker(&scheduler, worker_id2, PlatformProperties::default()).await?; + let mut rx_from_worker2 = setup_new_worker( + &scheduler, + worker_id2.clone(), + PlatformProperties::default(), + ) + .await?; { let expected_action_stage = ActionStage::Executing; @@ -563,7 +570,7 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err #[nativelink_test] async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -581,7 +588,7 @@ async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); let mut action_listener = setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp).await?; @@ -644,8 +651,8 @@ async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> #[nativelink_test] async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), Error> { - let worker_id1: WorkerId = WorkerId(Uuid::new_v4()); - let worker_id2: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id1 = WorkerId("worker1".to_string()); + let worker_id2 = WorkerId("worker2".to_string()); let mut prop_defs = HashMap::new(); prop_defs.insert("prop".to_string(), PropertyType::exact); @@ -703,7 +710,7 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E PlatformPropertyValue::Exact("1".to_string()), ); let mut rx_from_worker2 = - setup_new_worker(&scheduler, worker_id2, worker2_properties.clone()).await?; + setup_new_worker(&scheduler, worker_id2.clone(), worker2_properties.clone()).await?; { // Worker should have been sent an execute command. let expected_msg_for_worker = UpdateForWorker { @@ -746,7 +753,7 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E #[nativelink_test] async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -799,7 +806,7 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { }; let mut rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; { // Worker should have been sent an execute command. @@ -814,7 +821,7 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { operation_id: "Unknown Generated internally".to_string(), queued_timestamp: Some(insert_timestamp1.into()), platform: Some(Platform::default()), - worker_id: worker_id.to_string(), + worker_id: worker_id.into(), })), }; let msg_for_worker = rx_from_worker.recv().await.unwrap(); @@ -868,11 +875,11 @@ async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), MockInstantWrapped::default, None, ); - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let action_digest = DigestInfo::new([99u8; 32], 512); let rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; // Now act like the worker disconnected. drop(rx_from_worker); @@ -1028,7 +1035,7 @@ async fn matching_engine_fails_sends_abort() -> Result<(), Error> { senders.get_range_of_actions.send(vec![]).unwrap(); let _worker_rx = setup_new_worker( &scheduler, - WorkerId(Uuid::new_v4()), + WorkerId("worker_id".to_string()), PlatformProperties::default(), ) .await @@ -1074,7 +1081,7 @@ async fn matching_engine_fails_sends_abort() -> Result<(), Error> { senders.get_range_of_actions.send(vec![]).unwrap(); let _worker_rx = setup_new_worker( &scheduler, - WorkerId(Uuid::new_v4()), + WorkerId("worker_id".to_string()), PlatformProperties::default(), ) .await @@ -1111,8 +1118,8 @@ async fn matching_engine_fails_sends_abort() -> Result<(), Error> { #[nativelink_test] async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { - let worker_id1: WorkerId = WorkerId(Uuid::new_v4()); - let worker_id2: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id1 = WorkerId("worker1".to_string()); + let worker_id2 = WorkerId("worker2".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( &SimpleSpec { @@ -1132,15 +1139,23 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { let action_digest = DigestInfo::new([99u8; 32], 512); // Note: This needs to stay in scope or a disconnect will trigger. - let mut rx_from_worker1 = - setup_new_worker(&scheduler, worker_id1, PlatformProperties::default()).await?; + let mut rx_from_worker1 = setup_new_worker( + &scheduler, + worker_id1.clone(), + PlatformProperties::default(), + ) + .await?; let insert_timestamp = make_system_time(1); let mut action_listener = setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp).await?; // Note: This needs to stay in scope or a disconnect will trigger. - let mut rx_from_worker2 = - setup_new_worker(&scheduler, worker_id2, PlatformProperties::default()).await?; + let mut rx_from_worker2 = setup_new_worker( + &scheduler, + worker_id2.clone(), + PlatformProperties::default(), + ) + .await?; let mut start_execute = StartExecute { execute_request: Some(ExecuteRequest { @@ -1238,7 +1253,7 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { #[nativelink_test] async fn update_action_sends_completed_result_to_client_test() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -1256,7 +1271,7 @@ async fn update_action_sends_completed_result_to_client_test() -> Result<(), Err let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); let mut action_listener = setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp).await?; @@ -1340,7 +1355,7 @@ async fn update_action_sends_completed_result_to_client_test() -> Result<(), Err #[nativelink_test] async fn update_action_sends_completed_result_after_disconnect() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -1358,7 +1373,7 @@ async fn update_action_sends_completed_result_after_disconnect() -> Result<(), E let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); let action_listener = setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp).await?; @@ -1459,8 +1474,8 @@ async fn update_action_sends_completed_result_after_disconnect() -> Result<(), E #[nativelink_test] async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { - let good_worker_id: WorkerId = WorkerId(Uuid::new_v4()); - let rogue_worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let good_worker_id = WorkerId("good_worker_id".to_string()); + let rogue_worker_id = WorkerId("rogue_worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -1477,8 +1492,12 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { ); let action_digest = DigestInfo::new([99u8; 32], 512); - let mut rx_from_worker = - setup_new_worker(&scheduler, good_worker_id, PlatformProperties::default()).await?; + let mut rx_from_worker = setup_new_worker( + &scheduler, + good_worker_id.clone(), + PlatformProperties::default(), + ) + .await?; let insert_timestamp = make_system_time(1); let mut action_listener = setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp).await?; @@ -1495,7 +1514,12 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { ActionStage::Executing ); } - let _ = setup_new_worker(&scheduler, rogue_worker_id, PlatformProperties::default()).await?; + let _ = setup_new_worker( + &scheduler, + rogue_worker_id.clone(), + PlatformProperties::default(), + ) + .await?; let action_result = ActionResult { output_files: Vec::default(), @@ -1559,7 +1583,7 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { #[nativelink_test] async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -1588,9 +1612,10 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp) .await .unwrap(); - let mut rx_from_worker = setup_new_worker(&scheduler, worker_id, PlatformProperties::default()) - .await - .unwrap(); + let mut rx_from_worker = + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()) + .await + .unwrap(); let operation_id = { // Worker should have been sent an execute command. @@ -1605,7 +1630,7 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro operation_id: "Unknown Generated internally".to_string(), queued_timestamp: Some(insert_timestamp.into()), platform: Some(Platform::default()), - worker_id: worker_id.to_string(), + worker_id: worker_id.clone().into(), })), }; let msg_for_worker = rx_from_worker.recv().await.unwrap(); @@ -1700,7 +1725,7 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro /// a job finished on a specific worker (eg: restore platform properties). #[nativelink_test] async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let mut supported_props = HashMap::new(); supported_props.insert("prop1".to_string(), PropertyType::minimum); @@ -1732,9 +1757,10 @@ async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> .iter() .map(|(k, v)| (k.clone(), v.as_str().into_owned())) .collect(); - let mut rx_from_worker = setup_new_worker(&scheduler, worker_id, platform_properties.clone()) - .await - .unwrap(); + let mut rx_from_worker = + setup_new_worker(&scheduler, worker_id.clone(), platform_properties.clone()) + .await + .unwrap(); let insert_timestamp1 = make_system_time(1); let mut client1_action_listener = setup_action( &scheduler, @@ -1865,7 +1891,7 @@ async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> /// This tests that actions are performed in the order they were queued. #[nativelink_test] async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let mut supported_props = HashMap::new(); supported_props.insert("prop1".to_string(), PropertyType::minimum); @@ -1935,7 +1961,7 @@ async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> { #[nativelink_test] async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> { - let worker_id: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id = WorkerId("worker_id".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -1956,7 +1982,7 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> let action_digest = DigestInfo::new([99u8; 32], 512); let mut rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; let insert_timestamp = make_system_time(1); let mut action_listener = setup_action(&scheduler, action_digest, HashMap::new(), insert_timestamp).await?; @@ -1997,7 +2023,7 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> // Now connect a new worker and it should pickup the action. let mut rx_from_worker = - setup_new_worker(&scheduler, worker_id, PlatformProperties::default()).await?; + setup_new_worker(&scheduler, worker_id.clone(), PlatformProperties::default()).await?; { // Other tests check full data. We only care if we got StartAction. match rx_from_worker.recv().await.unwrap().update { @@ -2121,8 +2147,8 @@ async fn ensure_scheduler_drops_inner_spawn() -> Result<(), Error> { /// Regression test for: https://github.com/TraceMachina/nativelink/issues/257. #[nativelink_test] async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), Error> { - let worker_id1: WorkerId = WorkerId(Uuid::new_v4()); - let worker_id2: WorkerId = WorkerId(Uuid::new_v4()); + let worker_id1 = WorkerId("worker1".to_string()); + let worker_id2 = WorkerId("worker2".to_string()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( @@ -2139,8 +2165,12 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), ); let action_digest = DigestInfo::new([99u8; 32], 512); - let mut rx_from_worker1 = - setup_new_worker(&scheduler, worker_id1, PlatformProperties::default()).await?; + let mut rx_from_worker1 = setup_new_worker( + &scheduler, + worker_id1.clone(), + PlatformProperties::default(), + ) + .await?; let mut action_listener = setup_action( &scheduler, action_digest, @@ -2149,8 +2179,12 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), ) .await?; - let mut rx_from_worker2 = - setup_new_worker(&scheduler, worker_id2, PlatformProperties::default()).await?; + let mut rx_from_worker2 = setup_new_worker( + &scheduler, + worker_id2.clone(), + PlatformProperties::default(), + ) + .await?; let operation_id = { // Other tests check full data. We only care if we got StartAction. diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index b8b97308f..35986016c 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -35,6 +35,7 @@ rust_library( "@crates//:hyper-1.5.2", "@crates//:parking_lot", "@crates//:prost", + "@crates//:rand", "@crates//:serde_json5", "@crates//:tokio", "@crates//:tonic", diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index 2c5b83902..37b9cc997 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -16,6 +16,7 @@ futures = { version = "0.3.31", default-features = false } http-body = "1.0.1" http-body-util = "0.1.2" hyper = { version = "1.5.2" } +rand = { version = "0.8.5", default-features = false } serde_json5 = "0.1.0" parking_lot = "0.12.3" prost = { version = "0.13.4", default-features = false } diff --git a/nativelink-service/src/worker_api_server.rs b/nativelink-service/src/worker_api_server.rs index 4f2c8c5fa..5ffa00ca8 100644 --- a/nativelink-service/src/worker_api_server.rs +++ b/nativelink-service/src/worker_api_server.rs @@ -26,7 +26,7 @@ use nativelink_proto::com::github::trace_machina::nativelink::remote_execution:: WorkerApi, WorkerApiServer as Server, }; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{ - execute_result, ExecuteResult, GoingAwayRequest, KeepAliveRequest, SupportedProperties, UpdateForWorker, + execute_result, ConnectWorkerRequest, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker }; use nativelink_scheduler::worker::Worker; use nativelink_scheduler::worker_scheduler::WorkerScheduler; @@ -34,6 +34,7 @@ use nativelink_util::background_spawn; use nativelink_util::action_messages::{OperationId, WorkerId}; use nativelink_util::operation_state_manager::UpdateOperationType; use nativelink_util::platform_properties::PlatformProperties; +use rand::RngCore; use tokio::sync::mpsc; use tokio::time::interval; use tonic::{Request, Response, Status}; @@ -48,6 +49,7 @@ pub type NowFn = Box Result + Send + Sync>; pub struct WorkerApiServer { scheduler: Arc, now_fn: NowFn, + node_id: [u8; 6], } impl WorkerApiServer { @@ -55,6 +57,12 @@ impl WorkerApiServer { config: &WorkerApiConfig, schedulers: &HashMap>, ) -> Result { + let node_id = { + let mut rng = rand::thread_rng(); + let mut out = [0; 6]; + rng.fill_bytes(&mut out); + out + }; for scheduler in schedulers.values() { // This will protect us from holding a reference to the scheduler forever in the // event our ExecutionServer dies. Our scheduler is a weak ref, so the spawn will @@ -90,6 +98,7 @@ impl WorkerApiServer { .duration_since(UNIX_EPOCH) .map_err(|_| make_err!(Code::Internal, "System time is now behind unix epoch")) }), + node_id, ) } @@ -99,6 +108,7 @@ impl WorkerApiServer { config: &WorkerApiConfig, schedulers: &HashMap>, now_fn: NowFn, + node_id: [u8; 6], ) -> Result { let scheduler = schedulers .get(&config.scheduler) @@ -109,7 +119,11 @@ impl WorkerApiServer { ) })? .clone(); - Ok(Self { scheduler, now_fn }) + Ok(Self { + scheduler, + now_fn, + node_id, + }) } pub fn into_service(self) -> Server { @@ -118,14 +132,14 @@ impl WorkerApiServer { async fn inner_connect_worker( &self, - supported_properties: SupportedProperties, + connect_worker_request: ConnectWorkerRequest, ) -> Result, Error> { let (tx, rx) = mpsc::unbounded_channel(); // First convert our proto platform properties into one our scheduler understands. let platform_properties = { let mut platform_properties = PlatformProperties::default(); - for property in supported_properties.properties { + for property in connect_worker_request.properties { let platform_property_value = self .scheduler .get_platform_property_manager() @@ -140,9 +154,13 @@ impl WorkerApiServer { // Now register the worker with the scheduler. let worker_id = { - let worker_id = Uuid::new_v4(); + let worker_id = WorkerId(format!( + "{}{}", + connect_worker_request.worker_id_prefix, + Uuid::now_v6(&self.node_id).hyphenated() + )); let worker = Worker::new( - WorkerId(worker_id), + worker_id.clone(), platform_properties, tx, (self.now_fn)()?.as_secs(), @@ -176,7 +194,7 @@ impl WorkerApiServer { &self, keep_alive_request: KeepAliveRequest, ) -> Result, Error> { - let worker_id: WorkerId = keep_alive_request.worker_id.try_into()?; + let worker_id: WorkerId = keep_alive_request.worker_id.into(); self.scheduler .worker_keep_alive_received(&worker_id, (self.now_fn)()?.as_secs()) .await @@ -188,7 +206,7 @@ impl WorkerApiServer { &self, going_away_request: GoingAwayRequest, ) -> Result, Error> { - let worker_id: WorkerId = going_away_request.worker_id.try_into()?; + let worker_id: WorkerId = going_away_request.worker_id.into(); self.scheduler .remove_worker(&worker_id) .await @@ -200,7 +218,7 @@ impl WorkerApiServer { &self, execute_result: ExecuteResult, ) -> Result, Error> { - let worker_id: WorkerId = execute_result.worker_id.try_into()?; + let worker_id: WorkerId = execute_result.worker_id.into(); let operation_id = OperationId::from(execute_result.operation_id); match execute_result @@ -248,7 +266,7 @@ impl WorkerApi for WorkerApiServer { )] async fn connect_worker( &self, - grpc_request: Request, + grpc_request: Request, ) -> Result, Status> { let resp = self .inner_connect_worker(grpc_request.into_inner()) diff --git a/nativelink-service/tests/worker_api_server_test.rs b/nativelink-service/tests/worker_api_server_test.rs index 7c94d4924..b24b8a9d9 100644 --- a/nativelink-service/tests/worker_api_server_test.rs +++ b/nativelink-service/tests/worker_api_server_test.rs @@ -30,7 +30,7 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ }; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_server::WorkerApi; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{ - execute_result, update_for_worker, ExecuteResult, KeepAliveRequest, SupportedProperties, + execute_result, update_for_worker, ConnectWorkerRequest, ExecuteResult, KeepAliveRequest, }; use nativelink_proto::google::rpc::Status as ProtoStatus; use nativelink_scheduler::api_worker_scheduler::ApiWorkerScheduler; @@ -109,7 +109,7 @@ impl WorkerStateManager for MockWorkerStateManager { self.tx_call .send(WorkerStateManagerCalls::UpdateOperation(( operation_id.clone(), - *worker_id, + worker_id.clone(), update, ))) .expect("Could not send request to mpsc"); @@ -161,12 +161,13 @@ async fn setup_api_server(worker_timeout: u64, now_fn: NowFn) -> Result Result Result<(), Box String { - uuid.hyphenated().to_string() -} - impl From<&str> for OperationId { fn from(value: &str) -> Self { match Uuid::parse_str(value) { @@ -140,8 +136,8 @@ impl TryFrom for OperationId { } /// Unique id of worker. -#[derive(Default, Eq, PartialEq, Hash, Copy, Clone, Serialize, Deserialize)] -pub struct WorkerId(pub Uuid); +#[derive(Default, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)] +pub struct WorkerId(pub String); impl MetricsComponent for WorkerId { fn publish( @@ -149,15 +145,13 @@ impl MetricsComponent for WorkerId { _kind: MetricKind, _field_metadata: MetricFieldData, ) -> Result { - Ok(MetricPublishKnownKindData::String(uuid_to_string(&self.0))) + Ok(MetricPublishKnownKindData::String(self.0.clone())) } } impl std::fmt::Display for WorkerId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut buf = Uuid::encode_buffer(); - let worker_id_str = self.0.hyphenated().encode_lower(&mut buf); - f.write_fmt(format_args!("{worker_id_str}")) + f.write_fmt(format_args!("{}", self.0)) } } @@ -167,15 +161,15 @@ impl std::fmt::Debug for WorkerId { } } -impl TryFrom for WorkerId { - type Error = Error; - fn try_from(s: String) -> Result { - match Uuid::parse_str(&s) { - Err(e) => Err(make_input_err!( - "Failed to convert string to WorkerId : {s} : {e:?}", - )), - Ok(my_uuid) => Ok(WorkerId(my_uuid)), - } +impl From for String { + fn from(val: WorkerId) -> Self { + val.0 + } +} + +impl From for WorkerId { + fn from(s: String) -> Self { + WorkerId(s) } } diff --git a/nativelink-worker/src/local_worker.rs b/nativelink-worker/src/local_worker.rs index d3ecdafc2..f39e2de45 100644 --- a/nativelink-worker/src/local_worker.rs +++ b/nativelink-worker/src/local_worker.rs @@ -51,7 +51,7 @@ use crate::running_actions_manager::{ RunningActionsManager, RunningActionsManagerArgs, RunningActionsManagerImpl, }; use crate::worker_api_client_wrapper::{WorkerApiClientTrait, WorkerApiClientWrapper}; -use crate::worker_utils::make_supported_properties; +use crate::worker_utils::make_connect_worker_request; /// Amount of time to wait if we have actions in transit before we try to /// consider an error to have occurred. @@ -499,10 +499,11 @@ impl LocalWorker { &self, client: &mut T, ) -> Result<(String, Streaming), Error> { - let supported_properties = - make_supported_properties(&self.config.platform_properties).await?; + let connect_worker_request = + make_connect_worker_request(self.config.name.clone(), &self.config.platform_properties) + .await?; let mut update_for_worker_stream = client - .connect_worker(supported_properties) + .connect_worker(connect_worker_request) .await .err_tip(|| "Could not call connect_worker() in worker")? .into_inner(); diff --git a/nativelink-worker/src/worker_api_client_wrapper.rs b/nativelink-worker/src/worker_api_client_wrapper.rs index a54b662cb..8c814163f 100644 --- a/nativelink-worker/src/worker_api_client_wrapper.rs +++ b/nativelink-worker/src/worker_api_client_wrapper.rs @@ -16,7 +16,7 @@ use std::future::Future; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::worker_api_client::WorkerApiClient; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{ - ExecuteResult, GoingAwayRequest, KeepAliveRequest, SupportedProperties, UpdateForWorker, + ConnectWorkerRequest, ExecuteResult, GoingAwayRequest, KeepAliveRequest, UpdateForWorker, }; use tonic::codec::Streaming; use tonic::transport::Channel; @@ -27,7 +27,7 @@ use tonic::{Response, Status}; pub trait WorkerApiClientTrait: Clone + Sync + Send + Sized + Unpin { fn connect_worker( &mut self, - request: SupportedProperties, + request: ConnectWorkerRequest, ) -> impl Future>, Status>> + Send; fn keep_alive( @@ -60,7 +60,7 @@ impl From> for WorkerApiClientWrapper { impl WorkerApiClientTrait for WorkerApiClientWrapper { async fn connect_worker( &mut self, - request: SupportedProperties, + request: ConnectWorkerRequest, ) -> Result>, Status> { self.inner.connect_worker(request).await } diff --git a/nativelink-worker/src/worker_utils.rs b/nativelink-worker/src/worker_utils.rs index 06741037e..0a14b9e1a 100644 --- a/nativelink-worker/src/worker_utils.rs +++ b/nativelink-worker/src/worker_utils.rs @@ -22,13 +22,14 @@ use futures::future::try_join_all; use nativelink_config::cas_server::WorkerProperty; use nativelink_error::{make_err, make_input_err, Error, ResultExt}; use nativelink_proto::build::bazel::remote::execution::v2::platform::Property; -use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::SupportedProperties; +use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::ConnectWorkerRequest; use tokio::process; use tracing::{event, Level}; -pub async fn make_supported_properties( +pub async fn make_connect_worker_request( + worker_id_prefix: String, worker_properties: &HashMap, -) -> Result { +) -> Result { let mut futures = vec![]; for (property_name, worker_property) in worker_properties { futures.push(async move { @@ -97,7 +98,8 @@ pub async fn make_supported_properties( }); } - Ok(SupportedProperties { + Ok(ConnectWorkerRequest { + worker_id_prefix, properties: try_join_all(futures).await?.into_iter().flatten().collect(), }) } diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index 7dcc6d44a..7d0350c94 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -38,8 +38,8 @@ use nativelink_proto::build::bazel::remote::execution::v2::platform::Property; use nativelink_proto::build::bazel::remote::execution::v2::Platform; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::update_for_worker::Update; use nativelink_proto::com::github::trace_machina::nativelink::remote_execution::{ - execute_result, ConnectionResult, ExecuteResult, KillOperationRequest, StartExecute, - SupportedProperties, UpdateForWorker, + execute_result, ConnectWorkerRequest, ConnectionResult, ExecuteResult, KillOperationRequest, + StartExecute, UpdateForWorker, }; use nativelink_store::fast_slow_store::FastSlowStore; use nativelink_store::filesystem_store::FilesystemStore; @@ -95,17 +95,18 @@ async fn platform_properties_smoke_test() -> Result<(), Error> { let streaming_response = test_context.maybe_streaming_response.take().unwrap(); // Now wait for our client to send `.connect_worker()` (which has our platform properties). - let mut supported_properties = test_context + let mut connect_worker_request = test_context .client .expect_connect_worker(Ok(streaming_response)) .await; // It is undefined which order these will be returned in, so we sort it. - supported_properties + connect_worker_request .properties .sort_by_key(Message::encode_to_vec); assert_eq!( - supported_properties, - SupportedProperties { + connect_worker_request, + ConnectWorkerRequest { + worker_id_prefix: String::new(), properties: vec![ Property { name: "baz".to_string(), @@ -141,7 +142,7 @@ async fn reconnect_on_server_disconnect_test() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box>, Status>, - ) -> SupportedProperties { + ) -> ConnectWorkerRequest { let mut rx_call_lock = self.rx_call.lock().await; let req = match rx_call_lock .recv() @@ -131,7 +131,7 @@ impl MockWorkerApiClient { impl WorkerApiClientTrait for MockWorkerApiClient { async fn connect_worker( &mut self, - request: SupportedProperties, + request: ConnectWorkerRequest, ) -> Result>, Status> { self.tx_call .send(WorkerClientApiCalls::ConnectWorker(request)) diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 8e20d10e3..f3d1dc931 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -48,7 +48,6 @@ use nativelink_service::health_server::HealthServer; use nativelink_service::worker_api_server::WorkerApiServer; use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; -use nativelink_util::action_messages::WorkerId; use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit}; use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc}; use nativelink_util::health_utils::HealthRegistryBuilder; @@ -663,10 +662,7 @@ async fn inner_main( ) })? .clone() - .set_drain_worker( - &WorkerId::try_from(worker_id.clone())?, - is_draining, - ) + .set_drain_worker(&worker_id.clone().into(), is_draining) .await?; Ok::<_, Error>(format!("Draining worker {worker_id}")) })