diff --git a/crates/hyperqueue/src/client/commands/journal/output.rs b/crates/hyperqueue/src/client/commands/journal/output.rs index 1f2698e1f..8898db1bf 100644 --- a/crates/hyperqueue/src/client/commands/journal/output.rs +++ b/crates/hyperqueue/src/client/commands/journal/output.rs @@ -76,32 +76,26 @@ fn format_payload(event: EventPayload) -> serde_json::Value { "allocation-id": allocation_id, }) } - EventPayload::TaskStarted { - job_id, task_id, .. - } => json!({ + EventPayload::TaskStarted { task_id, .. } => json!({ "type": "task-started", - "job": job_id, - "task": task_id, + "job": task_id.job_id(), + "task": task_id.job_task_id(), "worker": -1 }), - EventPayload::TaskFinished { job_id, task_id } => json!({ + EventPayload::TaskFinished { task_id } => json!({ "type": "task-finished", - "job": job_id, - "task": task_id + "job": task_id.job_id(), + "task": task_id.job_task_id(), }), - EventPayload::TaskCanceled { job_id, task_id } => json!({ + EventPayload::TaskCanceled { task_id } => json!({ "type": "task-canceled", - "job": job_id, - "task": task_id + "job": task_id.job_id(), + "task": task_id.job_task_id(), }), - EventPayload::TaskFailed { - job_id, - task_id, - error, - } => json!({ + EventPayload::TaskFailed { task_id, error } => json!({ "type": "task-failed", - "job": job_id, - "task": task_id, + "job": task_id.job_id(), + "task": task_id.job_task_id(), "error": error }), EventPayload::Submit { diff --git a/crates/hyperqueue/src/client/commands/outputlog.rs b/crates/hyperqueue/src/client/commands/outputlog.rs index dbb78214f..0481f96c9 100644 --- a/crates/hyperqueue/src/client/commands/outputlog.rs +++ b/crates/hyperqueue/src/client/commands/outputlog.rs @@ -1,9 +1,9 @@ -use crate::JobId; use crate::client::globalsettings::GlobalSettings; use crate::common::arraydef::IntArray; use crate::stream::reader::outputlog::OutputLog; use clap::Parser; use std::path::PathBuf; +use tako::JobId; #[derive(Parser)] pub struct OutputLogOpts { diff --git a/crates/hyperqueue/src/client/commands/submit/command.rs b/crates/hyperqueue/src/client/commands/submit/command.rs index ffa9f27ad..9d5fd644b 100644 --- a/crates/hyperqueue/src/client/commands/submit/command.rs +++ b/crates/hyperqueue/src/client/commands/submit/command.rs @@ -21,13 +21,13 @@ use crate::common::placeholders::{ use crate::common::utils::fs::get_current_dir; use crate::common::utils::str::pluralize; use crate::common::utils::time::parse_hms_or_human_time; +use crate::rpc_call; use crate::transfer::connection::ClientSession; use crate::transfer::messages::{ FromClientMessage, IdSelector, JobDescription, JobSubmitDescription, JobTaskDescription, PinMode, SubmitRequest, SubmitResponse, TaskDescription, TaskKind, TaskKindProgram, ToClientMessage, }; -use crate::{JobId, JobTaskCount, Map, rpc_call}; use anyhow::{anyhow, bail}; use bstr::BString; use chumsky::Parser as ChumskyParser; @@ -40,6 +40,7 @@ use tako::gateway::{ }; use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef}; use tako::resources::{AllocationRequest, CPU_RESOURCE_NAME, NumOfNodes, ResourceAmount}; +use tako::{JobId, JobTaskCount, Map}; const SUBMIT_ARRAY_LIMIT: JobTaskCount = 999; pub const DEFAULT_CRASH_LIMIT: u32 = 5; diff --git a/crates/hyperqueue/src/client/commands/submit/defs.rs b/crates/hyperqueue/src/client/commands/submit/defs.rs index fdeccf5ba..60d1a8625 100644 --- a/crates/hyperqueue/src/client/commands/submit/defs.rs +++ b/crates/hyperqueue/src/client/commands/submit/defs.rs @@ -1,9 +1,9 @@ +use crate::JobDataObjectId; use crate::client::resources::parse_allocation_request; use crate::common::arraydef::IntArray; use crate::common::arrayparser::parse_array; use crate::common::error::HqError; use crate::common::utils::time::parse_human_time; -use crate::{JobDataObjectId, JobTaskCount, JobTaskId}; use bstr::BString; use serde::de::MapAccess; use serde::{Deserialize, Deserializer}; @@ -13,7 +13,7 @@ use std::time::Duration; use tako::gateway::{ResourceRequest, ResourceRequestEntries, ResourceRequestEntry}; use tako::program::FileOnCloseBehavior; use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount}; -use tako::{Map, Priority}; +use tako::{JobTaskCount, JobTaskId, Map, Priority}; #[derive(Deserialize)] #[serde(untagged)] diff --git a/crates/hyperqueue/src/client/commands/submit/jobfile.rs b/crates/hyperqueue/src/client/commands/submit/jobfile.rs index 4ada61512..54ac6f02e 100644 --- a/crates/hyperqueue/src/client/commands/submit/jobfile.rs +++ b/crates/hyperqueue/src/client/commands/submit/jobfile.rs @@ -13,13 +13,13 @@ use crate::transfer::messages::{ JobDescription, JobSubmitDescription, JobTaskDescription, PinMode, SubmitRequest, TaskDescription, TaskKind, TaskKindProgram, TaskWithDependencies, }; -use crate::{JobId, JobTaskCount, JobTaskId}; use clap::Parser; use smallvec::smallvec; use std::path::PathBuf; use tako::Map; use tako::gateway::{ResourceRequest, ResourceRequestVariants, TaskDataFlags}; use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef}; +use tako::{JobId, JobTaskCount, JobTaskId}; #[derive(Parser)] pub struct JobSubmitFileOpts { @@ -128,7 +128,7 @@ fn build_job_desc_individual_tasks( .map(|t| t.id) .max() .flatten() - .unwrap_or(JobTaskId(0)); + .unwrap_or(JobTaskId::new(0)); /* Topological sort */ let original_len = tasks.len(); diff --git a/crates/hyperqueue/src/client/commands/wait.rs b/crates/hyperqueue/src/client/commands/wait.rs index 8cad626fa..1e0808722 100644 --- a/crates/hyperqueue/src/client/commands/wait.rs +++ b/crates/hyperqueue/src/client/commands/wait.rs @@ -12,14 +12,15 @@ use crate::client::output::cli::{ use crate::client::status::{Status, is_terminated}; use crate::common::arraydef::IntArray; use crate::common::utils::str::pluralize; +use crate::rpc_call; use crate::server::job::JobTaskCounters; use crate::transfer::connection::ClientSession; use crate::transfer::messages::{ FromClientMessage, IdSelector, JobDetailRequest, JobInfo, JobInfoRequest, TaskIdSelector, TaskSelector, TaskStatusSelector, ToClientMessage, WaitForJobsRequest, }; -use crate::{JobId, JobTaskCount, rpc_call}; use colored::Colorize; +use tako::{JobId, JobTaskCount}; pub async fn wait_for_jobs( gsettings: &GlobalSettings, diff --git a/crates/hyperqueue/src/client/commands/worker.rs b/crates/hyperqueue/src/client/commands/worker.rs index d3ea2be3e..c679b5916 100644 --- a/crates/hyperqueue/src/client/commands/worker.rs +++ b/crates/hyperqueue/src/client/commands/worker.rs @@ -20,7 +20,6 @@ use tokio::signal::ctrl_c; use tokio::task::JoinSet; use tokio::time::sleep; -use crate::WorkerId; use crate::client::globalsettings::GlobalSettings; use crate::client::utils::{PassThroughArgument, passthrough_parser}; use crate::common::cli::DeploySshOpts; @@ -42,6 +41,7 @@ use crate::worker::hwdetect::{ }; use crate::worker::parser::{parse_cpu_definition, parse_resource_definition}; use crate::{DEFAULT_WORKER_GROUP_NAME, rpc_call}; +use tako::WorkerId; #[derive(clap::ValueEnum, Clone)] pub enum WorkerFilter { diff --git a/crates/hyperqueue/src/client/job.rs b/crates/hyperqueue/src/client/job.rs index 91fa90faa..8c333119d 100644 --- a/crates/hyperqueue/src/client/job.rs +++ b/crates/hyperqueue/src/client/job.rs @@ -1,7 +1,7 @@ use crate::rpc_call; use crate::transfer::connection::ClientSession; use crate::transfer::messages::{FromClientMessage, ToClientMessage}; -use crate::{Map, WorkerId}; +use tako::{Map, WorkerId}; /// Maps worker IDs to hostnames. pub type WorkerMap = Map; diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index f272163ca..f494a6818 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -18,7 +18,7 @@ use crate::transfer::messages::{ ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerExitInfo, WorkerInfo, }; -use crate::{JobId, JobTaskCount, JobTaskId, WorkerId}; +use tako::{JobId, JobTaskCount, JobTaskId, WorkerId}; use chrono::{DateTime, Local, SubsecRound, Utc}; use core::time::Duration; @@ -366,15 +366,7 @@ impl Output for CliOutput { vec![ "Last task started".cell().bold(true), last_task_started - .map(|t| { - format!( - "Job: {}; Task: {}; Time: {}", - t.job_id, - t.task_id, - format_datetime(t.time) - ) - .cell() - }) + .map(|t| format!("{}; Time: {}", t.task_id, format_datetime(t.time)).cell()) .unwrap_or_else(|| "".cell()), ], ]; diff --git a/crates/hyperqueue/src/client/output/common.rs b/crates/hyperqueue/src/client/output/common.rs index c4f3f151c..c6e46ff98 100644 --- a/crates/hyperqueue/src/client/output/common.rs +++ b/crates/hyperqueue/src/client/output/common.rs @@ -1,4 +1,3 @@ -use crate::JobTaskId; use crate::client::status::{Status, job_status}; use crate::common::placeholders::{ CompletePlaceholderCtx, ResolvablePaths, fill_placeholders_in_paths, @@ -10,6 +9,7 @@ use crate::transfer::messages::{ use std::path::PathBuf; use tako::Map; use tako::program::StdioDef; +use tako::{JobTaskId, TaskId}; pub struct ResolvedTaskPaths { pub cwd: PathBuf, @@ -29,7 +29,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap { JobTaskDescription::Array { task_desc, ids, .. } => { for id in ids.iter() { task_to_desc_map.insert( - JobTaskId(id), + JobTaskId::new(id), (submit_desc.description().submit_dir.as_path(), task_desc), ); } @@ -47,8 +47,8 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap { job.tasks .iter() - .map(|(task_id, task)| { - let (submit_dir, task_desc) = task_to_desc_map.get(task_id).unwrap(); + .map(|(job_task_id, task)| { + let (submit_dir, task_desc) = task_to_desc_map.get(job_task_id).unwrap(); let paths = match &task_desc.kind { TaskKind::ExternalProgram(TaskKindProgram { program, .. }) => match &task.state { JobTaskState::Canceled { @@ -62,8 +62,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap { .. } => { let ctx = CompletePlaceholderCtx { - job_id: job.info.id, - task_id: *task_id, + task_id: TaskId::new(job.info.id, *job_task_id), instance_id: started_data.context.instance_id, submit_dir, server_uid, @@ -85,7 +84,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap { _ => None, }, }; - (*task_id, paths) + (*job_task_id, paths) }) .collect() } diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index b947754f4..30677a72a 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -27,7 +27,7 @@ use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, JobTaskDescription, PinMode, QueueData, ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerInfo, }; -use crate::{JobId, JobTaskId}; +use tako::{JobId, JobTaskId}; #[derive(Default)] pub struct JsonOutput; diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs index e9c8f32c1..1cf766807 100644 --- a/crates/hyperqueue/src/client/output/outputs.rs +++ b/crates/hyperqueue/src/client/output/outputs.rs @@ -11,9 +11,9 @@ use crate::client::output::Verbosity; use crate::client::output::common::TaskToPathsMap; use crate::common::arraydef::IntArray; use crate::server::job::JobTaskInfo; -use crate::{JobId, JobTaskId}; use core::time::Duration; use tako::resources::ResourceDescriptor; +use tako::{JobId, JobTaskId}; pub const MAX_DISPLAYED_WORKERS: usize = 2; diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 4d76e186c..9ab3d527e 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -21,7 +21,7 @@ use crate::transfer::messages::{ AutoAllocListResponse, JobDetail, JobInfo, ServerInfo, WaitForJobsResponse, WorkerExitInfo, WorkerInfo, }; -use crate::{JobId, JobTaskId}; +use tako::{JobId, JobTaskId}; #[derive(Default)] pub struct Quiet; diff --git a/crates/hyperqueue/src/client/task.rs b/crates/hyperqueue/src/client/task.rs index 379ead669..ada7bb12e 100644 --- a/crates/hyperqueue/src/client/task.rs +++ b/crates/hyperqueue/src/client/task.rs @@ -5,12 +5,13 @@ use crate::client::output::{Verbosity, VerbosityFlag}; use crate::common::arraydef::IntArray; use crate::common::cli::{TaskSelectorArg, parse_last_range, parse_last_single_id}; use crate::common::error::HqError; +use crate::rpc_call; use crate::transfer::connection::ClientSession; use crate::transfer::messages::{ FromClientMessage, IdSelector, JobDetailRequest, SingleIdSelector, TaskIdSelector, TaskSelector, TaskStatusSelector, ToClientMessage, }; -use crate::{JobId, rpc_call}; +use tako::JobId; #[derive(clap::Parser)] pub struct TaskOpts { diff --git a/crates/hyperqueue/src/common/arrayparser.rs b/crates/hyperqueue/src/common/arrayparser.rs index 62049e466..adc8099d0 100644 --- a/crates/hyperqueue/src/common/arrayparser.rs +++ b/crates/hyperqueue/src/common/arrayparser.rs @@ -1,9 +1,9 @@ -use crate::Set; use crate::common::arraydef::{IntArray, IntRange}; use crate::common::parser2::{CharParser, ParseError, all_consuming, parse_u32}; use chumsky::Parser; use chumsky::primitive::just; use chumsky::text::TextParser; +use tako::Set; /// Parse integer range in the format n[-end][:step]. fn parse_range() -> impl CharParser { diff --git a/crates/hyperqueue/src/common/manager/slurm.rs b/crates/hyperqueue/src/common/manager/slurm.rs index 22b239be3..fae51650b 100644 --- a/crates/hyperqueue/src/common/manager/slurm.rs +++ b/crates/hyperqueue/src/common/manager/slurm.rs @@ -1,8 +1,8 @@ -use crate::Map; use crate::common::manager::common::format_duration; use crate::common::utils::time::parse_hms_time; use std::process::Command; use std::time::Duration; +use tako::Map; /// Format a duration as a SLURM time string, e.g. 01:05:02 pub fn format_slurm_duration(duration: &Duration) -> String { diff --git a/crates/hyperqueue/src/common/placeholders.rs b/crates/hyperqueue/src/common/placeholders.rs index 399222f1b..9daffe0a3 100644 --- a/crates/hyperqueue/src/common/placeholders.rs +++ b/crates/hyperqueue/src/common/placeholders.rs @@ -6,12 +6,12 @@ use std::path::{Path, PathBuf}; use nom::bytes::complete::take_until; use nom::sequence::delimited; use nom_supreme::tag::complete::tag; -use tako::InstanceId; +use tako::{InstanceId, TaskId}; use tako::program::{ProgramDefinition, StdioDef}; use crate::common::parser::NomResult; -use crate::{JobId, JobTaskId, Map}; +use tako::{JobId, Map}; pub const SERVER_UID_PLACEHOLDER: &str = "SERVER_UID"; pub const TASK_ID_PLACEHOLDER: &str = "TASK_ID"; @@ -32,8 +32,7 @@ const KNOWN_PLACEHOLDERS: [&str; 6] = [ type PlaceholderMap<'a> = Map<&'static str, Cow<'a, str>>; pub struct CompletePlaceholderCtx<'a> { - pub job_id: JobId, - pub task_id: JobTaskId, + pub task_id: TaskId, pub instance_id: InstanceId, pub submit_dir: &'a Path, pub server_uid: &'a str, @@ -58,8 +57,11 @@ impl<'a> ResolvablePaths<'a> { /// Fills all known placeholders in the given paths. pub fn fill_placeholders_in_paths(paths: ResolvablePaths, ctx: CompletePlaceholderCtx) { let mut placeholders = PlaceholderMap::new(); - placeholders.insert(JOB_ID_PLACEHOLDER, ctx.job_id.to_string().into()); - placeholders.insert(TASK_ID_PLACEHOLDER, ctx.task_id.to_string().into()); + placeholders.insert(JOB_ID_PLACEHOLDER, ctx.task_id.job_id().to_string().into()); + placeholders.insert( + TASK_ID_PLACEHOLDER, + ctx.task_id.job_task_id().to_string().into(), + ); placeholders.insert(INSTANCE_ID_PLACEHOLDER, ctx.instance_id.to_string().into()); placeholders.insert( SUBMIT_DIR_PLACEHOLDER, @@ -249,12 +251,12 @@ mod tests { use std::path::{Path, PathBuf}; use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef}; - use crate::Map; use crate::common::env::{HQ_INSTANCE_ID, HQ_JOB_ID, HQ_SUBMIT_DIR, HQ_TASK_ID}; use crate::common::placeholders::{ CompletePlaceholderCtx, ResolvablePaths, StringPart, fill_placeholders_after_submit, fill_placeholders_in_paths, parse_resolvable_string, }; + use tako::{Map, TaskId}; #[test] fn test_parse_empty_string() { @@ -443,14 +445,13 @@ mod tests { fn ctx<'a>( job_id: u32, - task_id: u32, + job_task_id: u32, instance_id: u32, submit_dir: &'a Path, server_uid: &'a str, ) -> CompletePlaceholderCtx<'a> { CompletePlaceholderCtx { - job_id: job_id.into(), - task_id: task_id.into(), + task_id: TaskId::new(job_id.into(), job_task_id.into()), instance_id: instance_id.into(), submit_dir, server_uid, diff --git a/crates/hyperqueue/src/dashboard/data/data.rs b/crates/hyperqueue/src/dashboard/data/data.rs index b2ddd6ff5..c31118e1c 100644 --- a/crates/hyperqueue/src/dashboard/data/data.rs +++ b/crates/hyperqueue/src/dashboard/data/data.rs @@ -8,9 +8,9 @@ use crate::dashboard::data::{Time, TimeRange}; use crate::server::autoalloc::{AllocationId, QueueId}; use crate::server::event::Event; use crate::transfer::messages::AllocationQueueParams; -use crate::{JobId, JobTaskId}; use std::time::{Duration, SystemTime}; use tako::WorkerId; +use tako::{JobId, JobTaskId}; pub struct DashboardData { /// Tracks worker connection and loss events diff --git a/crates/hyperqueue/src/dashboard/data/timelines/alloc_timeline.rs b/crates/hyperqueue/src/dashboard/data/timelines/alloc_timeline.rs index 503869134..e336f8046 100644 --- a/crates/hyperqueue/src/dashboard/data/timelines/alloc_timeline.rs +++ b/crates/hyperqueue/src/dashboard/data/timelines/alloc_timeline.rs @@ -1,9 +1,9 @@ -use crate::Map; use crate::server::autoalloc::{AllocationId, QueueId}; use crate::server::event::Event; use crate::server::event::payload::EventPayload; use crate::transfer::messages::AllocationQueueParams; use std::time::SystemTime; +use tako::Map; pub struct AllocationQueueInfo { pub queue_params: AllocationQueueParams, diff --git a/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs b/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs index 7dd4480aa..b83a23ce9 100644 --- a/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs +++ b/crates/hyperqueue/src/dashboard/data/timelines/job_timeline.rs @@ -1,10 +1,9 @@ use crate::server::event::Event; use crate::server::event::payload::EventPayload; use crate::transfer::messages::{JobDescription, JobSubmitDescription}; -use crate::{JobId, JobTaskId, WorkerId}; use chrono::{DateTime, Utc}; use std::time::SystemTime; -use tako::Map; +use tako::{JobId, JobTaskId, Map, TaskId, WorkerId}; pub struct DashboardJobInfo { pub job: JobDescription, @@ -82,14 +81,13 @@ impl JobTimeline { } EventPayload::TaskStarted { - job_id, task_id, instance_id: _, workers, } => { - if let Some(info) = self.job_timeline.get_mut(job_id) { + if let Some(info) = self.job_timeline.get_mut(&task_id.job_id()) { info.job_tasks_info.insert( - *task_id, + task_id.job_task_id(), TaskInfo { worker_id: workers[0], start_time: event.time.into(), @@ -99,23 +97,17 @@ impl JobTimeline { ); } } - EventPayload::TaskFinished { job_id, task_id } => { + EventPayload::TaskFinished { task_id } => { update_task_status( &mut self.job_timeline, - *job_id, *task_id, DashboardTaskState::Finished, event.time, ); } - EventPayload::TaskFailed { - job_id, - task_id, - error: _, - } => { + EventPayload::TaskFailed { task_id, error: _ } => { update_task_status( &mut self.job_timeline, - *job_id, *task_id, DashboardTaskState::Failed, event.time, @@ -169,13 +161,12 @@ impl JobTimeline { fn update_task_status( job_timeline: &mut Map, - job_id: JobId, - task_id: JobTaskId, + task_id: TaskId, task_status: DashboardTaskState, at_time: DateTime, ) { - if let Some(job_info) = job_timeline.get_mut(&job_id) { - if let Some(task_info) = job_info.job_tasks_info.get_mut(&task_id) { + if let Some(job_info) = job_timeline.get_mut(&task_id.job_id()) { + if let Some(task_info) = job_info.job_tasks_info.get_mut(&task_id.job_task_id()) { task_info.set_end_time_and_status(&at_time.into(), task_status); } } diff --git a/crates/hyperqueue/src/dashboard/data/timelines/worker_timeline.rs b/crates/hyperqueue/src/dashboard/data/timelines/worker_timeline.rs index 7060a03db..f4ee64381 100644 --- a/crates/hyperqueue/src/dashboard/data/timelines/worker_timeline.rs +++ b/crates/hyperqueue/src/dashboard/data/timelines/worker_timeline.rs @@ -1,13 +1,12 @@ -use crate::WorkerId; use crate::dashboard::data::Time; use crate::dashboard::data::time_based_vec::{ItemWithTime, TimeBasedVec}; use crate::dashboard::data::time_interval::TimeRange; use crate::server::event::Event; use crate::server::event::payload::EventPayload; use std::time::SystemTime; -use tako::Map; use tako::gateway::LostWorkerReason; use tako::worker::{WorkerConfiguration, WorkerOverview}; +use tako::{Map, WorkerId}; #[derive(Clone)] pub struct WorkerDisconnectInfo { diff --git a/crates/hyperqueue/src/dashboard/ui/screens/cluster/worker/mod.rs b/crates/hyperqueue/src/dashboard/ui/screens/cluster/worker/mod.rs index 07585aca7..4426875bb 100644 --- a/crates/hyperqueue/src/dashboard/ui/screens/cluster/worker/mod.rs +++ b/crates/hyperqueue/src/dashboard/ui/screens/cluster/worker/mod.rs @@ -1,4 +1,3 @@ -use crate::JobTaskId; use crate::dashboard::data::DashboardData; use crate::dashboard::data::timelines::job_timeline::TaskInfo; use crate::dashboard::ui::screens::cluster::worker::cpu_util_table::render_cpu_util_table; @@ -12,8 +11,8 @@ use crate::dashboard::ui::widgets::tasks_table::TasksTable; use crate::dashboard::ui::widgets::text::draw_text; use crossterm::event::{KeyCode, KeyEvent}; use ratatui::layout::{Constraint, Direction, Layout, Rect}; -use tako::WorkerId; use tako::hwstats::MemoryStats; +use tako::{JobTaskId, WorkerId}; mod cpu_util_table; mod worker_config_table; diff --git a/crates/hyperqueue/src/dashboard/ui/screens/jobs/job_tasks_chart.rs b/crates/hyperqueue/src/dashboard/ui/screens/jobs/job_tasks_chart.rs index 1d0267376..727cf842a 100644 --- a/crates/hyperqueue/src/dashboard/ui/screens/jobs/job_tasks_chart.rs +++ b/crates/hyperqueue/src/dashboard/ui/screens/jobs/job_tasks_chart.rs @@ -1,4 +1,3 @@ -use crate::JobId; use crate::dashboard::data::timelines::job_timeline::DashboardTaskState; use crate::dashboard::data::{DashboardData, ItemWithTime, TimeRange}; use crate::dashboard::ui::terminal::DashboardFrame; @@ -8,6 +7,7 @@ use crate::dashboard::ui::widgets::chart::{ use ratatui::layout::Rect; use ratatui::style::Color; use std::time::SystemTime; +use tako::JobId; #[derive(Default)] pub struct JobTaskChart { diff --git a/crates/hyperqueue/src/dashboard/ui/screens/jobs/jobs_table.rs b/crates/hyperqueue/src/dashboard/ui/screens/jobs/jobs_table.rs index b6e9d24f0..e1585cfa7 100644 --- a/crates/hyperqueue/src/dashboard/ui/screens/jobs/jobs_table.rs +++ b/crates/hyperqueue/src/dashboard/ui/screens/jobs/jobs_table.rs @@ -5,12 +5,12 @@ use crate::dashboard::ui::terminal::DashboardFrame; use crate::dashboard::ui::widgets::table::{StatefulTable, TableColumnHeaders}; -use crate::JobId; use crossterm::event::{KeyCode, KeyEvent}; use ratatui::layout::{Alignment, Constraint, Rect}; use ratatui::style::Style; use ratatui::text::Text; use ratatui::widgets::{Cell, Row}; +use tako::JobId; #[derive(Default)] pub struct JobsTable { diff --git a/crates/hyperqueue/src/dashboard/ui/screens/jobs/overview.rs b/crates/hyperqueue/src/dashboard/ui/screens/jobs/overview.rs index 9efd1b117..fb1d26529 100644 --- a/crates/hyperqueue/src/dashboard/ui/screens/jobs/overview.rs +++ b/crates/hyperqueue/src/dashboard/ui/screens/jobs/overview.rs @@ -11,10 +11,10 @@ use crate::dashboard::data::timelines::job_timeline::TaskInfo; use crate::dashboard::ui::screens::jobs::jobs_table::JobsTable; use crate::dashboard::ui::widgets::tasks_table::TasksTable; -use crate::JobTaskId; use crate::dashboard::ui::screens::jobs::job_info_display::JobInfoTable; use crate::dashboard::ui::screens::jobs::job_tasks_chart::JobTaskChart; use ratatui::layout::{Constraint, Direction, Layout, Rect}; +use tako::JobTaskId; use tako::WorkerId; pub struct JobOverview { diff --git a/crates/hyperqueue/src/dashboard/ui/widgets/tasks_table.rs b/crates/hyperqueue/src/dashboard/ui/widgets/tasks_table.rs index e8d09b367..882fbcc72 100644 --- a/crates/hyperqueue/src/dashboard/ui/widgets/tasks_table.rs +++ b/crates/hyperqueue/src/dashboard/ui/widgets/tasks_table.rs @@ -1,7 +1,6 @@ use crate::common::format::human_duration; use crate::dashboard::data::timelines::job_timeline::{DashboardTaskState, TaskInfo}; -use crate::JobTaskId; use crate::dashboard::ui::terminal::DashboardFrame; use crate::dashboard::ui::widgets::table::{StatefulTable, TableColumnHeaders}; use chrono::{DateTime, Local}; @@ -10,7 +9,7 @@ use ratatui::layout::{Constraint, Rect}; use ratatui::style::{Color, Modifier, Style}; use ratatui::widgets::{Cell, Row}; use std::time::SystemTime; -use tako::WorkerId; +use tako::{JobTaskId, WorkerId}; // Task State Strings const RUNNING: &str = "RUNNING"; diff --git a/crates/hyperqueue/src/lib.rs b/crates/hyperqueue/src/lib.rs index 1ef71659e..296954363 100644 --- a/crates/hyperqueue/src/lib.rs +++ b/crates/hyperqueue/src/lib.rs @@ -13,20 +13,12 @@ pub mod worker; pub(crate) mod tests; use serde::{Deserialize, Serialize}; -pub use tako::{Map, Set}; pub type Error = crate::common::error::HqError; pub type Result = std::result::Result; // ID types -use tako::define_id_type; - -pub use tako::WorkerId; -pub type TakoTaskId = tako::TaskId; -pub type Priority = tako::Priority; - -define_id_type!(JobId, u32); -define_id_type!(JobTaskId, u32); +use tako::{JobId, JobTaskId}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct JobDataObjectId { @@ -36,13 +28,10 @@ pub struct JobDataObjectId { impl JobDataObjectId { pub fn to_dataobj_id(&self, job_id: JobId) -> DataObjectId { - DataObjectId::new(make_tako_id(job_id, self.task_id), self.output_id) + DataObjectId::new(tako::TaskId::new(job_id, self.task_id), self.output_id) } } -pub type JobTaskCount = u32; -pub type JobTaskStep = u32; - pub const DEFAULT_WORKER_GROUP_NAME: &str = "default"; // Reexports @@ -56,28 +45,3 @@ pub const HQ_VERSION: &str = { None => const_format::concatcp!(env!("CARGO_PKG_VERSION"), "-dev"), } }; - -pub fn make_tako_id(job_id: JobId, task_id: JobTaskId) -> TakoTaskId { - TakoTaskId::new(((job_id.as_num() as u64) << 32) + task_id.as_num() as u64) -} - -pub fn unwrap_tako_id(tako_task_id: TakoTaskId) -> (JobId, JobTaskId) { - let num = tako_task_id.as_num(); - ( - JobId::new((num >> 32) as u32), - JobTaskId::new((num & 0xffffffff) as u32), - ) -} - -#[cfg(test)] -mod test { - use crate::{JobId, JobTaskId, make_tako_id, unwrap_tako_id}; - - #[test] - fn test_make_tako_id() { - assert_eq!( - unwrap_tako_id(make_tako_id(JobId(123), JobTaskId(5))), - (JobId(123), JobTaskId(5)) - ); - } -} diff --git a/crates/hyperqueue/src/server/autoalloc/estimator.rs b/crates/hyperqueue/src/server/autoalloc/estimator.rs index e9dd723fe..e7f13139e 100644 --- a/crates/hyperqueue/src/server/autoalloc/estimator.rs +++ b/crates/hyperqueue/src/server/autoalloc/estimator.rs @@ -1,4 +1,3 @@ -use crate::JobId; use crate::server::autoalloc::QueueInfo; use crate::server::autoalloc::state::AllocationQueue; use crate::server::job::Job; @@ -6,6 +5,7 @@ use crate::server::state::State; use crate::server::worker::Worker; use crate::transfer::messages::{JobTaskDescription, TaskDescription}; use std::time::Duration; +use tako::JobId; use tako::Map; pub type WaitingTaskCount = u64; diff --git a/crates/hyperqueue/src/server/autoalloc/process.rs b/crates/hyperqueue/src/server/autoalloc/process.rs index 96197adf6..cf2a135f9 100644 --- a/crates/hyperqueue/src/server/autoalloc/process.rs +++ b/crates/hyperqueue/src/server/autoalloc/process.rs @@ -9,6 +9,7 @@ use tempfile::TempDir; use crate::common::manager::info::{ManagerInfo, ManagerType}; use crate::common::rpc::RpcReceiver; +use crate::get_or_return; use crate::server::autoalloc::config::{ MAX_QUEUED_STATUS_ERROR_COUNT, MAX_RUNNING_STATUS_ERROR_COUNT, MAX_SUBMISSION_FAILS, SUBMISSION_DELAYS, get_refresh_timeout, get_status_check_interval, max_allocation_fails, @@ -28,7 +29,7 @@ use crate::server::autoalloc::{Allocation, AllocationId, AutoAllocResult, QueueI use crate::server::event::streamer::EventStreamer; use crate::server::state::StateRef; use crate::transfer::messages::{AllocationQueueParams, QueueData, QueueState}; -use crate::{JobId, get_or_return}; +use tako::JobId; #[derive(Copy, Clone)] enum RefreshReason { diff --git a/crates/hyperqueue/src/server/autoalloc/service.rs b/crates/hyperqueue/src/server/autoalloc/service.rs index 86fefb7d6..0163ffe92 100644 --- a/crates/hyperqueue/src/server/autoalloc/service.rs +++ b/crates/hyperqueue/src/server/autoalloc/service.rs @@ -7,7 +7,6 @@ use tako::WorkerId; use tako::gateway::LostWorkerReason; use tako::worker::WorkerConfiguration; -use crate::JobId; use crate::common::manager::info::{GetManagerInfo, ManagerInfo}; use crate::common::rpc::{ResponseToken, RpcSender, initiate_request, make_rpc_queue}; use crate::server::autoalloc::process::autoalloc_process; @@ -16,6 +15,7 @@ use crate::server::autoalloc::{Allocation, QueueId}; use crate::server::event::streamer::EventStreamer; use crate::server::state::StateRef; use crate::transfer::messages::{AllocationQueueParams, QueueData}; +use tako::JobId; #[derive(Debug)] pub enum AutoAllocMessage { diff --git a/crates/hyperqueue/src/server/autoalloc/state.rs b/crates/hyperqueue/src/server/autoalloc/state.rs index bb86e23da..bb69d2a00 100644 --- a/crates/hyperqueue/src/server/autoalloc/state.rs +++ b/crates/hyperqueue/src/server/autoalloc/state.rs @@ -8,13 +8,13 @@ use tako::Set; use tako::WorkerId; use tako::gateway::LostWorkerReason; -use crate::Map; use crate::common::idcounter::IdCounter; use crate::common::manager::info::ManagerType; use crate::common::utils::time::now_monotonic; use crate::server::autoalloc::config::MAX_KEPT_DIRECTORIES; use crate::server::autoalloc::queue::QueueHandler; use crate::server::autoalloc::{LostWorkerDetails, QueueInfo}; +use tako::Map; // Main state holder pub struct AutoAllocState { diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index dd892a465..8507447a9 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -7,7 +7,7 @@ use orion::kdf::SecretKey; use tako::gateway::{ CancelTasks, FromGatewayMessage, StopWorkerRequest, ToGatewayMessage, WorkerOverviewListenerOp, }; -use tako::{Set, TaskGroup}; +use tako::{Set, TaskGroup, TaskId}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{Notify, mpsc}; @@ -24,7 +24,7 @@ use crate::transfer::messages::{ TaskSelector, ToClientMessage, WorkerListResponse, }; use crate::transfer::messages::{ForgetJobResponse, WaitForJobsResponse}; -use crate::{JobId, JobTaskCount, WorkerId, unwrap_tako_id}; +use tako::{JobId, JobTaskCount, WorkerId}; pub mod autoalloc; mod submit; @@ -337,8 +337,7 @@ fn reconstruct_historical_events( events.push(Event::at( started_data.start_date, EventPayload::TaskStarted { - job_id: job.job_id, - task_id: *id, + task_id: TaskId::new(job.job_id, *id), instance_id: started_data.context.instance_id, workers: started_data.worker_ids.clone(), }, @@ -352,8 +351,7 @@ fn reconstruct_historical_events( events.push(Event::at( *end_date, EventPayload::TaskFinished { - job_id: job.job_id, - task_id: *id, + task_id: TaskId::new(job.job_id, *id), }, )); } @@ -363,8 +361,7 @@ fn reconstruct_historical_events( events.push(Event::at( *end_date, EventPayload::TaskFailed { - job_id: job.job_id, - task_id: *id, + task_id: TaskId::new(job.job_id, *id), error: error.clone(), }, )); @@ -373,8 +370,7 @@ fn reconstruct_historical_events( events.push(Event::at( *cancelled_date, EventPayload::TaskCanceled { - job_id: job.job_id, - task_id: *id, + task_id: TaskId::new(job.job_id, *id), }, )); } @@ -718,8 +714,8 @@ async fn cancel_job(state_ref: &StateRef, senders: &Senders, job_id: JobId) -> C let canceled_ids: Vec<_> = canceled_tasks .iter() .map(|tako_id| { - let (j_id, task_id) = unwrap_tako_id(*tako_id); - assert_eq!(j_id, job_id); + assert_eq!(tako_id.job_id(), job_id); + let task_id = tako_id.job_task_id(); job.set_cancel_state(task_id, senders); task_id }) diff --git a/crates/hyperqueue/src/server/client/submit.rs b/crates/hyperqueue/src/server/client/submit.rs index 0776c8a04..c2a10e9a9 100644 --- a/crates/hyperqueue/src/server/client/submit.rs +++ b/crates/hyperqueue/src/server/client/submit.rs @@ -4,12 +4,12 @@ use std::time::Duration; use bstr::BString; use chrono::{DateTime, Utc}; -use tako::Map; use tako::Set; use tako::gateway::{ FromGatewayMessage, NewTasksMessage, ResourceRequestVariants, SharedTaskConfiguration, TaskConfiguration, TaskDataFlags, ToGatewayMessage, }; +use tako::{Map, TaskId}; use thin_vec::ThinVec; use crate::common::arraydef::IntArray; @@ -24,7 +24,7 @@ use crate::transfer::messages::{ SubmitResponse, TaskBuildDescription, TaskDescription, TaskIdSelector, TaskKind, TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage, }; -use crate::{JobId, JobTaskCount, JobTaskId, Priority, TakoTaskId, make_tako_id}; +use tako::{JobId, JobTaskCount, JobTaskId, Priority}; fn create_new_task_message( job_id: JobId, @@ -239,8 +239,7 @@ fn prepare_job(job_id: JobId, submit_desc: &mut JobSubmitDescription, state: &mu } fn serialize_task_body( - job_id: JobId, - task_id: JobTaskId, + task_id: TaskId, entry: Option, task_desc: &TaskDescription, submit_dir: &PathBuf, @@ -248,7 +247,6 @@ fn serialize_task_body( ) -> Box<[u8]> { let body_msg = TaskBuildDescription { task_kind: Cow::Borrowed(&task_desc.kind), - job_id, task_id, submit_dir: Cow::Borrowed(submit_dir), stream_path: stream_path.map(Cow::Borrowed), @@ -268,7 +266,7 @@ fn build_tasks_array( submit_dir: &PathBuf, stream_path: Option<&PathBuf>, ) -> NewTasksMessage { - let build_task_conf = |body: Box<[u8]>, tako_id: TakoTaskId| TaskConfiguration { + let build_task_conf = |body: Box<[u8]>, tako_id: TaskId| TaskConfiguration { id: tako_id, shared_data_index: 0, task_deps: ThinVec::new(), @@ -279,34 +277,22 @@ fn build_tasks_array( let tasks = match entries { None => ids .iter() - .map(|task_id| { + .map(|job_task_id| { + let task_id = TaskId::new(job_id, job_task_id.into()); build_task_conf( - serialize_task_body( - job_id, - task_id.into(), - None, - task_desc, - submit_dir, - stream_path, - ), - make_tako_id(job_id, task_id.into()), + serialize_task_body(task_id, None, task_desc, submit_dir, stream_path), + task_id, ) }) .collect(), Some(entries) => ids .iter() .zip(entries) - .map(|(task_id, entry)| { + .map(|(job_task_id, entry)| { + let task_id = TaskId::new(job_id, job_task_id.into()); build_task_conf( - serialize_task_body( - job_id, - task_id.into(), - Some(entry), - task_desc, - submit_dir, - stream_path, - ), - make_tako_id(job_id, task_id.into()), + serialize_task_body(task_id, Some(entry), task_desc, submit_dir, stream_path), + task_id, ) }) .collect(), @@ -366,8 +352,7 @@ fn build_tasks_graph( let mut task_configs = Vec::with_capacity(tasks.len()); for task in tasks { let body = serialize_task_body( - job_id, - task.id, + TaskId::new(job_id, task.id), None, &task.task_desc, submit_dir, @@ -389,11 +374,11 @@ fn build_tasks_graph( let task_deps = task_dep_ids .into_iter() - .map(|task_id| make_tako_id(job_id, task_id)) + .map(|task_id| TaskId::new(job_id, task_id)) .collect(); task_configs.push(TaskConfiguration { - id: make_tako_id(job_id, task.id), + id: TaskId::new(job_id, task.id), shared_data_index, task_deps, dataobj_deps, @@ -425,7 +410,6 @@ pub(crate) fn handle_open_job( #[cfg(test)] mod tests { use crate::common::arraydef::IntArray; - use crate::make_tako_id; use crate::server::client::submit::build_tasks_graph; use crate::server::client::validate_submit; use crate::server::job::{Job, SubmittedJobDescription}; @@ -437,7 +421,6 @@ mod tests { use smallvec::smallvec; use std::path::PathBuf; use std::time::Duration; - use tako::Priority; use tako::gateway::{ NewTasksMessage, ResourceRequest, ResourceRequestEntry, ResourceRequestVariants, TaskDataFlags, @@ -445,6 +428,7 @@ mod tests { use tako::internal::tests::utils::sorted_vec; use tako::program::ProgramDefinition; use tako::resources::{AllocationRequest, CPU_RESOURCE_NAME, ResourceAmount}; + use tako::{Priority, TaskId}; #[test] fn test_build_graph_deduplicate_shared_confs() { @@ -584,25 +568,25 @@ mod tests { assert_eq!( sorted_vec(msg.tasks[0].task_deps.to_vec()), vec![ - make_tako_id(1.into(), 1.into()), - make_tako_id(1.into(), 2.into()) + TaskId::new(1.into(), 1.into()), + TaskId::new(1.into(), 2.into()) ] ); assert_eq!( msg.tasks[1].task_deps, - vec![make_tako_id(1.into(), 0.into())] + vec![TaskId::new(1.into(), 0.into())] ); assert_eq!( sorted_vec(msg.tasks[2].task_deps.to_vec()), vec![ - make_tako_id(1.into(), 3.into()), - make_tako_id(1.into(), 4.into()) + TaskId::new(1.into(), 3.into()), + TaskId::new(1.into(), 4.into()) ] ); assert_eq!(msg.tasks[3].task_deps, vec![]); assert_eq!( msg.tasks[4].task_deps, - vec![make_tako_id(1.into(), 0.into()),] + vec![TaskId::new(1.into(), 0.into()),] ); } diff --git a/crates/hyperqueue/src/server/event/journal/prune.rs b/crates/hyperqueue/src/server/event/journal/prune.rs index cc5191fc6..46246ad60 100644 --- a/crates/hyperqueue/src/server/event/journal/prune.rs +++ b/crates/hyperqueue/src/server/event/journal/prune.rs @@ -1,6 +1,6 @@ -use crate::JobId; use crate::server::event::journal::{JournalReader, JournalWriter}; use crate::server::event::payload::EventPayload; +use tako::JobId; use tako::{Set, WorkerId}; pub(crate) fn prune_journal( @@ -20,11 +20,13 @@ pub(crate) fn prune_journal( EventPayload::Submit { job_id, .. } | EventPayload::JobCompleted(job_id) | EventPayload::JobOpen(job_id, _) - | EventPayload::JobClose(job_id) - | EventPayload::TaskStarted { job_id, .. } - | EventPayload::TaskFinished { job_id, .. } - | EventPayload::TaskFailed { job_id, .. } - | EventPayload::TaskCanceled { job_id, .. } => live_job_ids.contains(job_id), + | EventPayload::JobClose(job_id) => live_job_ids.contains(job_id), + EventPayload::TaskStarted { task_id, .. } + | EventPayload::TaskFinished { task_id, .. } + | EventPayload::TaskFailed { task_id, .. } + | EventPayload::TaskCanceled { task_id, .. } => { + live_job_ids.contains(&task_id.job_id()) + } EventPayload::AllocationQueueCreated(_, _) | EventPayload::AllocationQueueRemoved(_) | EventPayload::AllocationQueued { .. } diff --git a/crates/hyperqueue/src/server/event/journal/stream.rs b/crates/hyperqueue/src/server/event/journal/stream.rs index 67187d900..928894a61 100644 --- a/crates/hyperqueue/src/server/event/journal/stream.rs +++ b/crates/hyperqueue/src/server/event/journal/stream.rs @@ -1,4 +1,3 @@ -use crate::JobId; use crate::common::utils::str::pluralize; use crate::server::event::Event; use crate::server::event::journal::JournalReader; @@ -10,6 +9,7 @@ use std::fs::{remove_file, rename}; use std::future::Future; use std::path::{Path, PathBuf}; use std::time::Duration; +use tako::JobId; use tako::{Set, WorkerId}; use tokio::sync::mpsc; diff --git a/crates/hyperqueue/src/server/event/payload.rs b/crates/hyperqueue/src/server/event/payload.rs index 26fc73b90..cdd60a356 100644 --- a/crates/hyperqueue/src/server/event/payload.rs +++ b/crates/hyperqueue/src/server/event/payload.rs @@ -1,14 +1,13 @@ -use crate::JobId; use crate::common::serialization::Serialized; use crate::server::autoalloc::AllocationId; use crate::server::autoalloc::QueueId; use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; -use crate::{JobTaskId, WorkerId}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use tako::gateway::LostWorkerReason; use tako::worker::{WorkerConfiguration, WorkerOverview}; -use tako::{InstanceId, static_assert_size}; +use tako::{InstanceId, TaskId, static_assert_size}; +use tako::{JobId, WorkerId}; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum EventPayload { @@ -34,26 +33,22 @@ pub enum EventPayload { JobClose(JobId), /// Task has started to execute on some worker TaskStarted { - job_id: JobId, - task_id: JobTaskId, + task_id: TaskId, instance_id: InstanceId, workers: SmallVec<[WorkerId; 1]>, }, /// Task has been finished TaskFinished { - job_id: JobId, - task_id: JobTaskId, + task_id: TaskId, }, /// Task has failed to execute TaskFailed { - job_id: JobId, - task_id: JobTaskId, + task_id: TaskId, error: String, }, /// Task has been canceled TaskCanceled { - job_id: JobId, - task_id: JobTaskId, + task_id: TaskId, }, /// New allocation queue has been created AllocationQueueCreated(QueueId, Box), diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index ab7a44a1a..bddfbf3b2 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -4,12 +4,11 @@ use crate::server::event::Event; use crate::server::event::journal::{EventStreamMessage, EventStreamSender}; use crate::server::event::payload::EventPayload; use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; -use crate::{JobId, JobTaskId, WorkerId}; use chrono::{DateTime, Utc}; use smallvec::SmallVec; use tako::gateway::LostWorkerReason; use tako::worker::{WorkerConfiguration, WorkerOverview}; -use tako::{InstanceId, Set, WrappedRcRefCell}; +use tako::{InstanceId, JobId, Set, TaskId, WorkerId, WrappedRcRefCell}; use tokio::sync::{mpsc, oneshot}; struct Inner { @@ -112,15 +111,13 @@ impl EventStreamer { #[inline] pub fn on_task_started( &self, - job_id: JobId, - task_id: JobTaskId, + task_id: TaskId, instance_id: InstanceId, worker_ids: SmallVec<[WorkerId; 1]>, now: DateTime, ) { self.send_event( EventPayload::TaskStarted { - job_id, task_id, instance_id, workers: worker_ids, @@ -131,36 +128,26 @@ impl EventStreamer { } #[inline] - pub fn on_task_finished(&self, job_id: JobId, task_id: JobTaskId, now: DateTime) { + pub fn on_task_finished(&self, task_id: TaskId, now: DateTime) { self.send_event( - EventPayload::TaskFinished { job_id, task_id }, + EventPayload::TaskFinished { task_id }, Some(now), ForwardMode::StreamAndPersist, ); } - pub fn on_task_canceled(&self, job_id: JobId, task_id: JobTaskId, now: DateTime) { + pub fn on_task_canceled(&self, task_id: TaskId, now: DateTime) { self.send_event( - EventPayload::TaskCanceled { job_id, task_id }, + EventPayload::TaskCanceled { task_id }, Some(now), ForwardMode::StreamAndPersist, ); } #[inline] - pub fn on_task_failed( - &self, - job_id: JobId, - task_id: JobTaskId, - error: String, - now: DateTime, - ) { + pub fn on_task_failed(&self, task_id: TaskId, error: String, now: DateTime) { self.send_event( - EventPayload::TaskFailed { - job_id, - task_id, - error, - }, + EventPayload::TaskFailed { task_id, error }, Some(now), ForwardMode::StreamAndPersist, ); diff --git a/crates/hyperqueue/src/server/job.rs b/crates/hyperqueue/src/server/job.rs index c35a8db12..6b193f5ff 100644 --- a/crates/hyperqueue/src/server/job.rs +++ b/crates/hyperqueue/src/server/job.rs @@ -7,12 +7,12 @@ use crate::transfer::messages::{ TaskSelector, TaskStatusSelector, }; use crate::worker::start::RunningTaskContext; -use crate::{JobId, JobTaskCount, JobTaskId, Map, TakoTaskId, WorkerId, make_tako_id}; use chrono::{DateTime, Utc}; use smallvec::SmallVec; use std::sync::Arc; use tako::comm::deserialize; use tako::task::SerializedTaskContext; +use tako::{JobId, JobTaskCount, JobTaskId, Map, TaskId, WorkerId}; use tokio::sync::oneshot; /// State of a task that has been started at least once. @@ -271,11 +271,11 @@ impl Job { .map(|(task_id, task_info)| (*task_id, &task_info.state)) } - pub fn non_finished_task_ids(&self) -> Vec { + pub fn non_finished_task_ids(&self) -> Vec { self.iter_task_states() .filter_map(|(task_id, state)| match state { JobTaskState::Waiting | JobTaskState::Running { .. } => { - Some(make_tako_id(self.job_id, task_id)) + Some(TaskId::new(self.job_id, task_id)) } JobTaskState::Finished { .. } | JobTaskState::Failed { .. } @@ -354,7 +354,9 @@ impl Job { task.state ), }; - senders.events.on_task_finished(self.job_id, task_id, now); + senders + .events + .on_task_finished(TaskId::new(self.job_id, task_id), now); self.check_termination(senders, now); } @@ -399,7 +401,7 @@ impl Job { senders .events - .on_task_failed(self.job_id, task_id, error, now); + .on_task_failed(TaskId::new(self.job_id, task_id), error, now); self.check_termination(senders, now); task_id } @@ -424,7 +426,9 @@ impl Job { state => panic!("Invalid job state that is being canceled: {task_id:?} {state:?}"), } - senders.events.on_task_canceled(self.job_id, task_id, now); + senders + .events + .on_task_canceled(TaskId::new(self.job_id, task_id), now); self.counters.n_canceled_tasks += 1; self.check_termination(senders, now); task_id diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs index a753a7278..9cb1171bc 100644 --- a/crates/hyperqueue/src/server/restore.rs +++ b/crates/hyperqueue/src/server/restore.rs @@ -7,10 +7,9 @@ use crate::server::job::{Job, JobTaskState, StartedTaskData, SubmittedJobDescrip use crate::server::state::State; use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; use crate::worker::start::RunningTaskContext; -use crate::{JobId, JobTaskId, Map, make_tako_id, unwrap_tako_id}; use std::path::Path; use tako::gateway::NewTasksMessage; -use tako::{InstanceId, ItemId, WorkerId}; +use tako::{InstanceId, ItemId, JobId, JobTaskId, Map, TaskId, WorkerId}; struct RestorerTaskInfo { state: JobTaskState, @@ -71,17 +70,16 @@ impl RestorerJob { let job = state.get_job_mut(job_id).unwrap(); new_tasks.tasks.retain_mut(|t| { - let (_, task_id) = unwrap_tako_id(t.id); t.task_deps - .retain(|d| !is_task_completed(&self.tasks, unwrap_tako_id(*d).1)); - !is_task_completed(&self.tasks, task_id) + .retain(|d| !is_task_completed(&self.tasks, d.job_task_id())); + !is_task_completed(&self.tasks, t.id.job_task_id()) }); for (task_id, job_task) in job.tasks.iter_mut() { if let Some(task) = self.tasks.get_mut(task_id) { if task.crash_counter > 0 || task.instance_id.is_some() { new_tasks.adjust_instance_id_and_crash_counters.insert( - make_tako_id(job_id, *task_id), + TaskId::new(job_id, *task_id), ( task.instance_id.map(|x| x.as_num() + 1).unwrap_or(0).into(), task.crash_counter, @@ -236,17 +234,14 @@ impl StateRestorer { self.jobs.remove(&job_id); } EventPayload::TaskStarted { - job_id, task_id, instance_id, workers, } => { - log::debug!( - "Replaying: TaskStarted {job_id} {task_id} {instance_id} {workers:?}" - ); - if let Some(job) = self.jobs.get_mut(&job_id) { + log::debug!("Replaying: TaskStarted {task_id} {instance_id} {workers:?}"); + if let Some(job) = self.jobs.get_mut(&task_id.job_id()) { job.tasks.insert( - task_id, + task_id.job_task_id(), RestorerTaskInfo { state: JobTaskState::Running { started_data: StartedTaskData { @@ -261,10 +256,10 @@ impl StateRestorer { ); } } - EventPayload::TaskFinished { job_id, task_id } => { - log::debug!("Replaying: TaskFinished {job_id} {task_id}"); - if let Some(job) = self.jobs.get_mut(&job_id) { - let task = job.tasks.get_mut(&task_id).unwrap(); + EventPayload::TaskFinished { task_id } => { + log::debug!("Replaying: TaskFinished {task_id}"); + if let Some(job) = self.jobs.get_mut(&task_id.job_id()) { + let task = job.tasks.get_mut(&task_id.job_task_id()).unwrap(); task.state = match std::mem::replace(&mut task.state, JobTaskState::Waiting) { JobTaskState::Running { started_data } => JobTaskState::Finished { @@ -275,14 +270,10 @@ impl StateRestorer { } } } - EventPayload::TaskFailed { - job_id, - task_id, - error, - } => { - log::debug!("Replaying: TaskFailed {job_id} {task_id}"); - if let Some(job) = self.jobs.get_mut(&job_id) { - let task = job.tasks.get_mut(&task_id).unwrap(); + EventPayload::TaskFailed { task_id, error } => { + log::debug!("Replaying: TaskFailed {task_id}"); + if let Some(job) = self.jobs.get_mut(&task_id.job_id()) { + let task = job.tasks.get_mut(&task_id.job_task_id()).unwrap(); task.state = match std::mem::replace(&mut task.state, JobTaskState::Waiting) { JobTaskState::Waiting => JobTaskState::Failed { @@ -299,10 +290,10 @@ impl StateRestorer { } } } - EventPayload::TaskCanceled { job_id, task_id } => { - log::debug!("Replaying: TaskCanceled {job_id} {task_id}"); - if let Some(job) = self.jobs.get_mut(&job_id) { - let task = job.tasks.get_mut(&task_id); + EventPayload::TaskCanceled { task_id } => { + log::debug!("Replaying: TaskCanceled {task_id}"); + if let Some(job) = self.jobs.get_mut(&task_id.job_id()) { + let task = job.tasks.get_mut(&task_id.job_task_id()); if let Some(task) = task { task.state = match std::mem::replace(&mut task.state, JobTaskState::Waiting) { @@ -319,7 +310,7 @@ impl StateRestorer { } } else { job.tasks.insert( - task_id, + task_id.job_task_id(), RestorerTaskInfo { state: JobTaskState::Canceled { started_data: None, diff --git a/crates/hyperqueue/src/server/state.rs b/crates/hyperqueue/src/server/state.rs index 33e186abc..6f3a06bec 100644 --- a/crates/hyperqueue/src/server/state.rs +++ b/crates/hyperqueue/src/server/state.rs @@ -1,21 +1,21 @@ use std::cmp::min; use chrono::Utc; -use tako::ItemId; use tako::define_wrapped_type; use tako::gateway::{ CancelTasks, FromGatewayMessage, LostWorkerMessage, NewWorkerMessage, TaskFailedMessage, TaskState, TaskUpdate, ToGatewayMessage, }; +use tako::{ItemId, TaskId}; +use crate::WrappedRcRefCell; use crate::server::Senders; use crate::server::autoalloc::LostWorkerDetails; use crate::server::job::Job; use crate::server::restore::StateRestorer; use crate::server::worker::Worker; use crate::transfer::messages::ServerInfo; -use crate::{JobId, Map, TakoTaskId, WorkerId}; -use crate::{WrappedRcRefCell, unwrap_tako_id}; +use tako::{JobId, Map, WorkerId}; pub struct State { jobs: Map, @@ -30,7 +30,7 @@ fn cancel_tasks_from_callback( state_ref: &StateRef, senders: &Senders, job_id: JobId, - tasks: Vec, + tasks: Vec, ) { if tasks.is_empty() { return; @@ -49,9 +49,8 @@ fn cancel_tasks_from_callback( log::debug!("Tasks {:?} canceled", msg.cancelled_tasks); log::debug!("Tasks {:?} already finished", msg.already_finished); for tako_id in msg.cancelled_tasks { - let (j_id, task_id) = unwrap_tako_id(tako_id); - assert_eq!(j_id, job.job_id); - job.set_cancel_state(task_id, &senders2); + assert_eq!(tako_id.job_id(), job.job_id); + job.set_cancel_state(tako_id.job_task_id(), &senders2); } } } @@ -153,18 +152,17 @@ impl State { ) { log::debug!("Task id={} failed: {:?}", msg.id, msg.info); - let (job_id, task_id) = unwrap_tako_id(msg.id); + let job_id = msg.id.job_id(); let job = self.get_job_mut(job_id).unwrap(); for tako_id in msg.cancelled_tasks { log::debug!( "Task id={} canceled because of task dependency fails", tako_id ); - let (j_id, task_id) = unwrap_tako_id(tako_id); - assert_eq!(job_id, j_id); - job.set_cancel_state(task_id, senders); + assert_eq!(tako_id.job_id(), job_id); + job.set_cancel_state(tako_id.job_task_id(), senders); } - job.set_failed_state(task_id, msg.info.message, senders); + job.set_failed_state(msg.id.job_task_id(), msg.info.message, senders); if let Some(max_fails) = &job.job_desc.max_fails { if job.counters.n_failed_tasks > *max_fails { @@ -182,33 +180,26 @@ impl State { worker_ids, context, } => { - let (job_id, task_id) = unwrap_tako_id(msg.id); - let job = self.get_job_mut(job_id).unwrap(); + let job = self.get_job_mut(msg.id.job_id()).unwrap(); let now = Utc::now(); - job.set_running_state(task_id, worker_ids.clone(), context, now); + job.set_running_state(msg.id.job_task_id(), worker_ids.clone(), context, now); for worker_id in &worker_ids { if let Some(worker) = self.workers.get_mut(worker_id) { - worker.update_task_started(job_id, task_id, now); + worker.update_task_started(msg.id, now); } } - senders.events.on_task_started( - job_id, - task_id, - instance_id, - worker_ids.clone(), - now, - ); + senders + .events + .on_task_started(msg.id, instance_id, worker_ids.clone(), now); } TaskState::Finished => { let now = Utc::now(); - let (job_id, task_id) = unwrap_tako_id(msg.id); - let job = self.get_job_mut(job_id).unwrap(); - job.set_finished_state(task_id, now, senders); + let job = self.get_job_mut(msg.id.job_id()).unwrap(); + job.set_finished_state(msg.id.job_task_id(), now, senders); } TaskState::Waiting => { - let (job_id, task_id) = unwrap_tako_id(msg.id); - let job = self.get_job_mut(job_id).unwrap(); - job.set_waiting_state(task_id); + let job = self.get_job_mut(msg.id.job_id()).unwrap(); + job.set_waiting_state(msg.id.job_task_id()); } TaskState::Invalid => { unreachable!() @@ -236,9 +227,8 @@ impl State { ) { log::debug!("Worker lost id={}", msg.worker_id); for tako_id in msg.running_tasks { - let (job_id, task_id) = unwrap_tako_id(tako_id); - let job = self.get_job_mut(job_id).unwrap(); - job.set_waiting_state(task_id); + let job = self.get_job_mut(tako_id.job_id()).unwrap(); + job.set_waiting_state(tako_id.job_task_id()); } let worker = self.workers.get_mut(&msg.worker_id).unwrap(); diff --git a/crates/hyperqueue/src/server/worker.rs b/crates/hyperqueue/src/server/worker.rs index 2a92933e9..f6e9e395b 100644 --- a/crates/hyperqueue/src/server/worker.rs +++ b/crates/hyperqueue/src/server/worker.rs @@ -4,7 +4,7 @@ use tako::worker::WorkerConfiguration; use crate::server::worker::WorkerState::Offline; use crate::transfer::messages::{TaskTimestamp, WorkerExitInfo, WorkerInfo}; -use crate::{JobId, JobTaskId, WorkerId}; +use tako::{TaskId, WorkerId}; #[derive(Default)] pub struct ConnectedWorkerData { @@ -52,12 +52,8 @@ impl Worker { } } - pub fn update_task_started(&mut self, job_id: JobId, task_id: JobTaskId, now: DateTime) { - self.last_task_started = Some(TaskTimestamp { - job_id, - task_id, - time: now, - }); + pub fn update_task_started(&mut self, task_id: TaskId, now: DateTime) { + self.last_task_started = Some(TaskTimestamp { task_id, time: now }); } pub fn set_offline_state(&mut self, reason: LostWorkerReason) { diff --git a/crates/hyperqueue/src/stream/reader/outputlog.rs b/crates/hyperqueue/src/stream/reader/outputlog.rs index 1735cf46a..fe3aada83 100644 --- a/crates/hyperqueue/src/stream/reader/outputlog.rs +++ b/crates/hyperqueue/src/stream/reader/outputlog.rs @@ -5,7 +5,6 @@ use crate::common::serialization::SerializationConfig; use crate::stream::StreamSerializationConfig; use crate::transfer::stream::{ChannelId, StreamChunkHeader}; use crate::worker::streamer::{STREAM_FILE_HEADER, STREAM_FILE_SUFFIX, StreamFileHeader}; -use crate::{JobId, JobTaskId, Set}; use bincode::Options; use chrono::{DateTime, Utc}; use colored::{Color, Colorize}; @@ -19,7 +18,7 @@ use std::io::{BufReader, BufWriter, Read, Seek, Write}; use std::num::NonZeroUsize; use std::ops::Deref; use std::path::{Path, PathBuf}; -use tako::InstanceId; +use tako::{InstanceId, JobId, JobTaskId, Set}; #[derive(Clone)] pub struct ChunkInfo { @@ -176,8 +175,8 @@ impl OutputLog { let mut file = BufReader::new(File::open(path)?); let _header = OutputLog::check_header(&mut file)?; while let Some(chunk_header) = Self::read_chunk(&mut file)? { - let job = index.entry(chunk_header.job).or_default(); - let task = job.entry(chunk_header.task).or_default(); + let job = index.entry(chunk_header.task.job_id()).or_default(); + let task = job.entry(chunk_header.task.job_task_id()).or_default(); if task .instances .last() diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index 0407b8103..9080dbd55 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -3,19 +3,20 @@ use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; +use crate::JobDataObjectId; use crate::client::status::Status; use crate::common::arraydef::IntArray; use crate::common::manager::info::ManagerType; use crate::server::autoalloc::{Allocation, QueueId, QueueInfo}; use crate::server::event::Event; use crate::server::job::{JobTaskCounters, JobTaskInfo, SubmittedJobDescription}; -use crate::{JobDataObjectId, JobId, JobTaskCount, JobTaskId, Map, WorkerId}; use bstr::BString; use std::path::PathBuf; use std::time::Duration; use tako::gateway::{LostWorkerReason, ResourceRequestVariants, TaskDataFlags, WorkerRuntimeInfo}; use tako::program::ProgramDefinition; use tako::worker::WorkerConfiguration; +use tako::{JobId, JobTaskCount, JobTaskId, Map, TaskId, WorkerId}; // Messages client -> server #[allow(clippy::large_enum_variant)] @@ -71,8 +72,7 @@ impl PinMode { #[derive(Serialize, Deserialize, Debug)] pub struct TaskBuildDescription<'a> { pub task_kind: Cow<'a, TaskKind>, - pub job_id: JobId, - pub task_id: JobTaskId, + pub task_id: TaskId, pub submit_dir: Cow<'a, PathBuf>, pub stream_path: Option>, pub entry: Option, @@ -425,8 +425,7 @@ pub struct WorkerExitInfo { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct TaskTimestamp { - pub job_id: JobId, - pub task_id: JobTaskId, + pub task_id: TaskId, pub time: DateTime, } diff --git a/crates/hyperqueue/src/transfer/stream.rs b/crates/hyperqueue/src/transfer/stream.rs index 7aaee9af2..2dc01ecfb 100644 --- a/crates/hyperqueue/src/transfer/stream.rs +++ b/crates/hyperqueue/src/transfer/stream.rs @@ -1,42 +1,16 @@ -use crate::{JobId, JobTaskId}; use chrono::serde::ts_milliseconds; use chrono::{DateTime, Utc}; use serde::Deserialize; use serde::Serialize; -use tako::InstanceId; +use tako::{InstanceId, TaskId}; pub type ChannelId = u32; -#[derive(Serialize, Deserialize, Debug)] -pub struct StreamTaskStart { - pub job: JobId, - pub task: JobTaskId, - pub instance: InstanceId, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct StreamData { - pub job: JobId, - pub task: JobTaskId, - pub instance: InstanceId, - pub channel: ChannelId, - #[serde(with = "serde_bytes")] - pub data: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct StreamTaskEnd { - pub job: JobId, - pub task: JobTaskId, - pub instance: InstanceId, -} - #[derive(Serialize, Deserialize, Debug)] pub struct StreamChunkHeader { #[serde(with = "ts_milliseconds")] pub time: DateTime, - pub job: JobId, - pub task: JobTaskId, + pub task: TaskId, pub instance: InstanceId, pub channel: ChannelId, pub size: u64, // size == 0 indicates end of the stream diff --git a/crates/hyperqueue/src/worker/start/mod.rs b/crates/hyperqueue/src/worker/start/mod.rs index a656098a0..12a431956 100644 --- a/crates/hyperqueue/src/worker/start/mod.rs +++ b/crates/hyperqueue/src/worker/start/mod.rs @@ -4,9 +4,8 @@ use std::path::PathBuf; use tokio::sync::oneshot::Receiver; use crate::transfer::messages::{TaskBuildDescription, TaskKind}; -use crate::{JobId, JobTaskId}; -use tako::InstanceId; use tako::launcher::{StopReason, TaskBuildContext, TaskLaunchData, TaskLauncher}; +use tako::{InstanceId, TaskId}; use crate::worker::start::program::build_program_task; use crate::worker::streamer::StreamerRef; @@ -48,7 +47,6 @@ impl TaskLauncher for HqTaskLauncher { let desc: TaskBuildDescription = tako::comm::deserialize(build_ctx.body())?; let shared = SharedTaskDescription { - job_id: desc.job_id, task_id: desc.task_id, submit_dir: desc.submit_dir.into_owned(), stream_path: desc.stream_path.map(|x| x.into_owned()), @@ -67,8 +65,7 @@ impl TaskLauncher for HqTaskLauncher { } struct SharedTaskDescription { - job_id: JobId, - task_id: JobTaskId, + task_id: TaskId, submit_dir: PathBuf, stream_path: Option, entry: Option, diff --git a/crates/hyperqueue/src/worker/start/program.rs b/crates/hyperqueue/src/worker/start/program.rs index bc125a33f..bb8425b08 100644 --- a/crates/hyperqueue/src/worker/start/program.rs +++ b/crates/hyperqueue/src/worker/start/program.rs @@ -28,7 +28,6 @@ use crate::transfer::stream::ChannelId; use crate::worker::start::{RunningTaskContext, SharedTaskDescription}; use crate::worker::streamer::StreamSender; use crate::worker::streamer::StreamerRef; -use crate::{JobId, JobTaskId}; use tako::comm::serialize; use tako::launcher::{ StopReason, TaskBuildContext, TaskLaunchData, TaskResult, command_from_definitions, @@ -38,7 +37,7 @@ use tako::resources::{ AMD_GPU_RESOURCE_NAME, Allocation, CPU_RESOURCE_ID, CPU_RESOURCE_NAME, NVIDIA_GPU_RESOURCE_NAME, ResourceAllocation, }; -use tako::{InstanceId, format_comma_delimited}; +use tako::{InstanceId, TaskId, format_comma_delimited}; const MAX_CUSTOM_ERROR_LENGTH: usize = 2048; // 2KiB @@ -57,26 +56,24 @@ pub(super) fn build_program_task( task_dir, } = program; let SharedTaskDescription { - job_id, task_id, submit_dir, stream_path, entry, } = shared; - let (program, job_id, job_task_id, instance_id, task_dir): ( + let (program, task_id, instance_id, task_dir): ( ProgramDefinition, - JobId, - JobTaskId, + TaskId, InstanceId, Option, ) = { program .env - .insert(HQ_JOB_ID.into(), job_id.to_string().into()); + .insert(HQ_JOB_ID.into(), task_id.job_id().to_string().into()); program .env - .insert(HQ_TASK_ID.into(), task_id.to_string().into()); + .insert(HQ_TASK_ID.into(), task_id.job_task_id().to_string().into()); program.env.insert( HQ_SUBMIT_DIR.into(), BString::from(submit_dir.to_string_lossy().as_bytes()), @@ -171,7 +168,6 @@ pub(super) fn build_program_task( } let ctx = CompletePlaceholderCtx { - job_id, task_id, instance_id: build_ctx.instance_id(), submit_dir: &submit_dir, @@ -193,7 +189,7 @@ pub(super) fn build_program_task( ) })?; - (program, job_id, task_id, build_ctx.instance_id(), task_dir) + (program, task_id, build_ctx.instance_id(), task_dir) }; let context = RunningTaskContext { instance_id }; @@ -202,8 +198,7 @@ pub(super) fn build_program_task( let task_future = create_task_future( streamer_ref.clone(), program, - job_id, - job_task_id, + task_id, instance_id, stop_receiver, task_dir, @@ -217,14 +212,13 @@ pub(super) fn build_program_task( } async fn resend_stdio( - job_id: JobId, - job_task_id: JobTaskId, + task_id: TaskId, channel: ChannelId, stdio: Option, stream: StreamSender, ) -> tako::Result<()> { if let Some(mut stdio) = stdio { - log::debug!("Resending stream {}/{}/{}", job_id, job_task_id, channel); + log::debug!("Resending stream {task_id}/{channel}"); loop { let mut buffer = vec![0; STDIO_BUFFER_SIZE]; let size = stdio.read(&mut buffer[..]).await?; @@ -504,8 +498,7 @@ fn check_error_filename(task_dir: TempDir) -> Option { async fn create_task_future( streamer_ref: StreamerRef, program: ProgramDefinition, - job_id: JobId, - job_task_id: JobTaskId, + task_id: TaskId, instance_id: InstanceId, end_receiver: Receiver, task_dir: Option, @@ -563,8 +556,7 @@ async fn create_task_future( .get_stream( &streamer_ref, stream_path.as_ref().unwrap(), - job_id, - job_task_id, + task_id, instance_id, ) .map_err(|e| tako::Error::GenericError(e.to_string()))?; @@ -576,13 +568,12 @@ async fn create_task_future( let stderr = child.stderr.take(); let response = tokio::try_join!( child_wait(child, &program.stdin).map_err(tako::Error::from), - resend_stdio(job_id, job_task_id, 0, stdout, stream2.clone()) - .map_err(streamer_error), - resend_stdio(job_id, job_task_id, 1, stderr, stream2).map_err(streamer_error), + resend_stdio(task_id, 0, stdout, stream2.clone()).map_err(streamer_error), + resend_stdio(task_id, 1, stderr, stream2).map_err(streamer_error), ); status_to_result(response?.0) }; - let r = handle_task_with_signals(main_fut, pid, job_id, job_task_id, end_receiver).await; + let r = handle_task_with_signals(main_fut, pid, task_id, end_receiver).await; stream.flush().await?; r } else { @@ -595,7 +586,7 @@ async fn create_task_future( .await; result }; - handle_task_with_signals(task_fut, pid, job_id, job_task_id, end_receiver).await + handle_task_with_signals(task_fut, pid, task_id, end_receiver).await } } @@ -630,8 +621,7 @@ fn signal_name(signal: i32) -> &'static str { async fn create_task_future( _streamer_ref: StreamerRef, _program: ProgramDefinition, - _job_id: JobId, - _job_task_id: JobTaskId, + _task_id: TaskId, _instance_id: InstanceId, _end_receiver: Receiver, _task_dir: Option, @@ -661,8 +651,7 @@ async fn cleanup_task_file(result: Option<&TaskResult>, stdio: &StdioDef) { async fn handle_task_with_signals>>( task_future: F, pid: u32, - job_id: JobId, - job_task_id: JobTaskId, + task_id: TaskId, end_receiver: Receiver, ) -> tako::Result { let send_signal = |signal: Signal| -> tako::Result<()> { @@ -676,9 +665,7 @@ async fn handle_task_with_signals>>( let event_fut = async move { let stop_reason = end_receiver.await; let stop_reason = stop_reason.expect("Stop reason could not be received"); - log::debug!( - "Received stop command, attempting to end task {job_id}/{job_task_id} with SIGINT" - ); + log::debug!("Received stop command, attempting to end task {task_id} with SIGINT"); // We have received a stop command for this task. // We should attempt to kill it and wait until the child process from `task_future` resolves. @@ -697,17 +684,15 @@ async fn handle_task_with_signals>>( match tokio::time::timeout(Duration::from_secs(1), fut).await { Ok(_) => { // The task has finished gracefully - log::debug!("Task {job_id}/{job_task_id} has ended gracefully after a signal"); + log::debug!("Task {task_id} has ended gracefully after a signal"); result } Err(_) => { // The task did not exit, kill it if let Err(error) = send_signal(Signal::SIGKILL) { - log::error!( - "Unable to kill process Task {job_id}/{job_task_id}: {error:?}" - ); + log::error!("Unable to kill process Task {task_id}: {error:?}"); } else { - log::debug!("Task {job_id}/{job_task_id} has been killed"); + log::debug!("Task {task_id} has been killed"); } result } @@ -715,7 +700,7 @@ async fn handle_task_with_signals>>( } // The task has finished Either::Right((result, _)) => { - log::debug!("Task {job_id}/{job_task_id} has finished normally"); + log::debug!("Task {task_id} has finished normally"); result } } diff --git a/crates/hyperqueue/src/worker/streamer.rs b/crates/hyperqueue/src/worker/streamer.rs index bb959e00a..57ca50ba5 100644 --- a/crates/hyperqueue/src/worker/streamer.rs +++ b/crates/hyperqueue/src/worker/streamer.rs @@ -3,7 +3,6 @@ use crate::common::error::HqError; use crate::common::serialization::SerializationConfig; use crate::stream::StreamSerializationConfig; use crate::transfer::stream::{ChannelId, StreamChunkHeader}; -use crate::{JobId, JobTaskId, Map}; use bincode::Options; use chrono::Utc; use rand::Rng; @@ -11,7 +10,8 @@ use rand::distr::Alphanumeric; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::path::{Path, PathBuf}; -use tako::{InstanceId, WorkerId, define_wrapped_type}; +use tako::Map; +use tako::{InstanceId, TaskId, WorkerId, define_wrapped_type}; use tokio::fs::File; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::mpsc::Sender; @@ -55,22 +55,15 @@ impl Streamer { &mut self, streamer_ref: &StreamerRef, stream_path: &Path, - job_id: JobId, - job_task_id: JobTaskId, + task_id: TaskId, instance_id: InstanceId, ) -> crate::Result { - log::debug!( - "New stream for {}/{} ({})", - job_id, - job_task_id, - stream_path.display() - ); + log::debug!("New stream for {task_id} ({})", stream_path.display()); let sender = if let Some(ref mut info) = self.streams.get_mut(stream_path) { info.sender.clone() } else { log::debug!( - "Starting a new stream instance for job_id = {}, stream_path = {}", - job_id, + "Starting a new stream instance, stream_path = {}", stream_path.display() ); if !stream_path.is_dir() { @@ -98,8 +91,7 @@ impl Streamer { queue_sender }; Ok(StreamSender { - job_id, - job_task_id, + task_id, instance_id, sender, }) @@ -122,8 +114,7 @@ impl StreamerRef { #[derive(Clone)] pub struct StreamSender { sender: Sender, - job_id: JobId, - job_task_id: JobTaskId, + task_id: TaskId, instance_id: InstanceId, } @@ -150,8 +141,7 @@ impl StreamSender { .send(StreamerMessage::Write { header: StreamChunkHeader { time: Utc::now(), - job: self.job_id, - task: self.job_task_id, + task: self.task_id, instance: self.instance_id, channel, size: data.len() as u64, diff --git a/crates/pyhq/src/client/job.rs b/crates/pyhq/src/client/job.rs index 8bb4cb66b..47c3962e8 100644 --- a/crates/pyhq/src/client/job.rs +++ b/crates/pyhq/src/client/job.rs @@ -9,6 +9,7 @@ use hyperqueue::client::resources::parse_allocation_request; use hyperqueue::client::status::{Status, is_terminated}; use hyperqueue::common::arraydef::IntArray; use hyperqueue::common::utils::fs::get_current_dir; +use hyperqueue::rpc_call; use hyperqueue::server::job::JobTaskState; use hyperqueue::transfer::messages::{ ForgetJobRequest, FromClientMessage, IdSelector, JobDescription, JobDetailRequest, @@ -17,7 +18,6 @@ use hyperqueue::transfer::messages::{ TaskKind, TaskKindProgram, TaskSelector, TaskStatusSelector, TaskWithDependencies, ToClientMessage, }; -use hyperqueue::{JobTaskCount, Set, rpc_call, tako}; use pyo3::exceptions::PyException; use pyo3::prelude::PyAnyMethods; use pyo3::types::PyTuple; @@ -30,6 +30,7 @@ use tako::gateway::{ }; use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef}; use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount}; +use tako::{JobTaskCount, Set}; #[derive(Debug, FromPyObject)] enum AllocationValue { diff --git a/crates/tako/benches/benchmarks/core.rs b/crates/tako/benches/benchmarks/core.rs index c2a376c80..5fa497a86 100644 --- a/crates/tako/benches/benchmarks/core.rs +++ b/crates/tako/benches/benchmarks/core.rs @@ -1,6 +1,5 @@ use criterion::measurement::WallTime; use criterion::{BatchSize, BenchmarkGroup, BenchmarkId, Criterion, black_box}; -use tako::ItemId; use tako::Set; use tako::TaskId; use tako::internal::server::core::Core; @@ -18,7 +17,7 @@ fn bench_remove_single_task(c: &mut BenchmarkGroup) { || { let mut core = Core::default(); add_tasks(&mut core, task_count); - (core, TaskId::new(0)) + (core, TaskId::new_test(0)) }, |(core, task_id)| { let mut objs_to_remove = ObjsToRemoveFromWorkers::new(); @@ -65,9 +64,7 @@ fn bench_add_task(c: &mut BenchmarkGroup) { let mut core = Core::default(); add_tasks(&mut core, task_count); - let task = create_task(TaskId::new( - (task_count + 1) as ::IdType, - )); + let task = create_task(TaskId::new_test(task_count + 1)); (core, Some(task)) }, |(core, task)| { @@ -90,7 +87,7 @@ fn bench_add_tasks(c: &mut BenchmarkGroup) { || { let core = Core::default(); let tasks: Vec<_> = (0..task_count) - .map(|id| create_task(TaskId::new(id as ::IdType))) + .map(|id| create_task(TaskId::new_test(id as u32))) .collect(); (core, tasks) }, @@ -121,7 +118,7 @@ fn bench_iterate_tasks(c: &mut BenchmarkGroup) { |ref mut core| { let mut sum = 0; for task in core.task_map().tasks() { - sum += task.id().as_num(); + sum += task.id().job_task_id().as_num(); } black_box(sum); }, diff --git a/crates/tako/benches/benchmarks/worker.rs b/crates/tako/benches/benchmarks/worker.rs index fbdb98c03..b8561efd4 100644 --- a/crates/tako/benches/benchmarks/worker.rs +++ b/crates/tako/benches/benchmarks/worker.rs @@ -4,7 +4,6 @@ use std::time::Duration; use criterion::measurement::WallTime; use criterion::{BatchSize, BenchmarkGroup, BenchmarkId, Criterion}; use smallvec::smallvec; -use tako::ItemId; use tako::TaskId; use tako::gateway::TaskDataFlags; use tako::internal::messages::worker::ComputeTaskMsg; @@ -57,10 +56,10 @@ fn create_worker_state() -> WorkerStateRef { ) } -fn create_worker_task(id: u64) -> Task { +fn create_worker_task(id: u32) -> Task { Task::new( ComputeTaskMsg { - id: TaskId::new(id as ::IdType), + id: TaskId::new_test(id), instance_id: Default::default(), user_priority: 0, scheduler_priority: 0, @@ -155,7 +154,7 @@ fn bench_cancel_waiting_task(c: &mut BenchmarkGroup) { state.add_task(create_worker_task(id)); } } - (state, TaskId::new(0)) + (state, TaskId::new_test(0)) }, |(state, task_id)| { let mut state = state.get_mut(); diff --git a/crates/tako/benches/utils/mod.rs b/crates/tako/benches/utils/mod.rs index 00a1bfaa8..6634d5cb3 100644 --- a/crates/tako/benches/utils/mod.rs +++ b/crates/tako/benches/utils/mod.rs @@ -1,6 +1,5 @@ use std::rc::Rc; use std::time::Duration; -use tako::ItemId; use tako::gateway::TaskDataFlags; use tako::internal::server::core::Core; use tako::internal::server::task::{Task, TaskConfiguration}; @@ -55,10 +54,10 @@ pub fn create_worker(id: u64) -> Worker { ) } -pub fn add_tasks(core: &mut Core, count: usize) -> Vec { - let mut tasks = Vec::with_capacity(count); +pub fn add_tasks(core: &mut Core, count: u32) -> Vec { + let mut tasks = Vec::with_capacity(count as usize); for id in 0..count { - let task_id = TaskId::new(id as ::IdType); + let task_id = TaskId::new_test(id); let task = create_task(task_id); core.add_task(task); tasks.push(task_id); diff --git a/crates/tako/src/internal/common/ids.rs b/crates/tako/src/internal/common/ids.rs new file mode 100644 index 000000000..ebd895f5e --- /dev/null +++ b/crates/tako/src/internal/common/ids.rs @@ -0,0 +1,60 @@ +use crate::define_id_type; +use serde::{Deserialize, Serialize}; +use std::fmt::{Debug, Display, Formatter}; + +define_id_type!(JobId, u32); +define_id_type!(JobTaskId, u32); +define_id_type!(WorkerId, u32); +define_id_type!(InstanceId, u32); + +#[derive(Default, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize)] +pub struct TaskId { + job_id: JobId, + job_task_id: JobTaskId, +} + +impl Display for TaskId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}@{}", self.job_id, self.job_task_id) + } +} + +impl Debug for TaskId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(self, f) + } +} + +impl TaskId { + #[inline] + pub fn new(job_id: JobId, job_task_id: JobTaskId) -> Self { + Self { + job_id, + job_task_id, + } + } + + #[inline] + pub fn job_id(&self) -> JobId { + self.job_id + } + + #[inline] + pub fn job_task_id(&self) -> JobTaskId { + self.job_task_id + } + + pub fn new_test(job_task_id: u32) -> Self { + Self { + job_id: 0.into(), + job_task_id: job_task_id.into(), + } + } +} + +#[cfg(test)] +impl From for TaskId { + fn from(value: u32) -> Self { + Self::new_test(value) + } +} diff --git a/crates/tako/src/internal/common/mod.rs b/crates/tako/src/internal/common/mod.rs index 0a18ab297..70d13484d 100644 --- a/crates/tako/src/internal/common/mod.rs +++ b/crates/tako/src/internal/common/mod.rs @@ -1,10 +1,9 @@ #[macro_use] pub(crate) mod trace; -pub use data_structures::{Map, Set}; -pub use wrapped::WrappedRcRefCell; pub(crate) mod data_structures; pub(crate) mod error; +pub(crate) mod ids; pub(crate) mod index; pub mod resources; pub(crate) mod rpc; @@ -12,3 +11,6 @@ pub(crate) mod stablemap; pub(crate) mod taskgroup; pub(crate) mod utils; pub(crate) mod wrapped; + +pub use data_structures::{Map, Set}; +pub use wrapped::WrappedRcRefCell; diff --git a/crates/tako/src/internal/scheduler/metrics.rs b/crates/tako/src/internal/scheduler/metrics.rs index 70d12f918..d880a3681 100644 --- a/crates/tako/src/internal/scheduler/metrics.rs +++ b/crates/tako/src/internal/scheduler/metrics.rs @@ -45,7 +45,6 @@ fn crawl &Set>(tasks: &mut TaskMap, predecessor_fn: F1) #[cfg(test)] mod tests { use crate::TaskId; - use crate::internal::common::index::ItemId; use crate::internal::scheduler::metrics::compute_b_level_metric; use crate::internal::server::core::Core; use crate::internal::tests::utils::workflows::submit_example_2; @@ -65,8 +64,8 @@ mod tests { check_task_priority(&core, 1, 4); } - fn check_task_priority(core: &Core, task_id: u64, priority: i32) { - let task_id = TaskId::new(task_id as ::IdType); + fn check_task_priority(core: &Core, task_id: u32, priority: i32) { + let task_id = TaskId::new_test(task_id); assert_eq!(core.get_task(task_id).get_scheduler_priority(), priority); } } diff --git a/crates/tako/src/internal/server/task.rs b/crates/tako/src/internal/server/task.rs index b313241d9..22718ef81 100644 --- a/crates/tako/src/internal/server/task.rs +++ b/crates/tako/src/internal/server/task.rs @@ -213,7 +213,7 @@ impl Task { } pub(crate) fn increment_instance_id(&mut self) { - self.instance_id = InstanceId(self.instance_id.as_num() + 1); + self.instance_id = InstanceId::new(self.instance_id.as_num() + 1); } pub(crate) fn increment_crash_counter(&mut self) -> bool { diff --git a/crates/tako/src/internal/server/workerload.rs b/crates/tako/src/internal/server/workerload.rs index 36814a2ca..6e51381b0 100644 --- a/crates/tako/src/internal/server/workerload.rs +++ b/crates/tako/src/internal/server/workerload.rs @@ -365,32 +365,32 @@ mod tests { let rq2 = ResBuilder::default().add(0, 4).finish(); let rqv = ResourceRequestVariants::new(smallvec![rq1, rq2]); - load.add_request(TaskId::new(1), &rqv, &wr); + load.add_request(TaskId::new_test(1), &rqv, &wr); assert_eq!(load.n_resources, ra_builder(&[2, 2, 0])); assert!(load.non_first_rq.is_empty()); - load.add_request(TaskId::new(2), &rqv, &wr); + load.add_request(TaskId::new_test(2), &rqv, &wr); assert_eq!(load.n_resources, ra_builder(&[4, 4, 0])); assert!(load.non_first_rq.is_empty()); - load.add_request(TaskId::new(3), &rqv, &wr); + load.add_request(TaskId::new_test(3), &rqv, &wr); assert_eq!(load.n_resources, ra_builder(&[8, 4, 0])); assert_eq!(load.non_first_rq.len(), 1); - assert_eq!(load.non_first_rq.get(&TaskId::new(3)), Some(&1)); + assert_eq!(load.non_first_rq.get(&TaskId::new_test(3)), Some(&1)); - load.add_request(TaskId::new(4), &rqv, &wr); + load.add_request(TaskId::new_test(4), &rqv, &wr); assert_eq!(load.n_resources, ra_builder(&[12, 4, 0])); assert_eq!(load.non_first_rq.len(), 2); - assert_eq!(load.non_first_rq.get(&TaskId::new(4)), Some(&1)); + assert_eq!(load.non_first_rq.get(&TaskId::new_test(4)), Some(&1)); - load.add_request(TaskId::new(5), &rqv, &wr); + load.add_request(TaskId::new_test(5), &rqv, &wr); assert!( load.n_resources == ra_builder(&[16, 4, 0]) || load.n_resources == ra_builder(&[14, 6, 0]) ); let resources = load.n_resources.clone(); - load.remove_request(TaskId::new(3), &rqv, &wr); + load.remove_request(TaskId::new_test(3), &rqv, &wr); assert_eq!( load.n_resources, vec![ @@ -400,9 +400,9 @@ mod tests { ] .into() ); - assert!(load.non_first_rq.get(&TaskId::new(3)).is_none()); + assert!(load.non_first_rq.get(&TaskId::new_test(3)).is_none()); - load.remove_request(TaskId::new(1), &rqv, &wr); + load.remove_request(TaskId::new_test(1), &rqv, &wr); assert_eq!( load.n_resources, vec![ @@ -412,6 +412,6 @@ mod tests { ] .into() ); - assert!(load.non_first_rq.get(&TaskId::new(1)).is_none()); + assert!(load.non_first_rq.get(&TaskId::new_test(1)).is_none()); } } diff --git a/crates/tako/src/internal/tests/integration/test_basic.rs b/crates/tako/src/internal/tests/integration/test_basic.rs index 1cb7007b7..6c498d9b5 100644 --- a/crates/tako/src/internal/tests/integration/test_basic.rs +++ b/crates/tako/src/internal/tests/integration/test_basic.rs @@ -2,7 +2,6 @@ use crate::gateway::{ FromGatewayMessage, NewWorkerAllocationResponse, NewWorkerQuery, ToGatewayMessage, WorkerTypeQuery, }; -use crate::internal::common::index::AsIdVec; use crate::internal::tests::integration::utils::api::cancel; use crate::internal::tests::integration::utils::check_file_contents; use crate::internal::tests::integration::utils::server::{ServerHandle, run_test}; @@ -12,7 +11,7 @@ use crate::internal::tests::integration::utils::task::{ }; use crate::program::StdioDef; use crate::resources::ResourceDescriptor; -use crate::wait_for_msg; +use crate::{TaskId, wait_for_msg}; use std::time::Duration; use tokio::time::sleep; @@ -92,7 +91,7 @@ async fn test_cancel_immediately() { .submit(GraphBuilder::singleton(simple_task(&["sleep", "1"], 1))) .await; let response = cancel(&mut handle, &ids).await; - assert_eq!(response.cancelled_tasks, vec![1].to_ids()); + assert_eq!(response.cancelled_tasks, vec![TaskId::new_test(1)]); }) .await; } @@ -129,7 +128,7 @@ async fn test_cancel_error_task() { sleep(Duration::from_millis(300)).await; let response = cancel(&mut handle, &[1]).await; - assert_eq!(response.already_finished, vec![1].to_ids()); + assert_eq!(response.already_finished, vec![TaskId::new_test(1)]); }) .await; } diff --git a/crates/tako/src/internal/tests/integration/utils/task.rs b/crates/tako/src/internal/tests/integration/utils/task.rs index 50df87c5b..c21a0ce17 100644 --- a/crates/tako/src/internal/tests/integration/utils/task.rs +++ b/crates/tako/src/internal/tests/integration/utils/task.rs @@ -2,7 +2,6 @@ use bincode::Options; use std::path::PathBuf; use std::time::Duration; -use crate::internal::common::index::ItemId; use derive_builder::Builder; use smallvec::smallvec; use thin_vec::ThinVec; @@ -18,7 +17,7 @@ use crate::program::{ProgramDefinition, StdioDef}; use crate::resources::{AllocationRequest, ResourceAmount}; pub struct GraphBuilder { - id_counter: u64, + id_counter: u32, tasks: Vec, configurations: Vec, } @@ -119,7 +118,7 @@ pub fn build_task_def_from_config( }; ( TaskConfiguration { - id: TaskId::new(id.unwrap_or(1) as ::IdType), + id: TaskId::new_test(id.unwrap_or(1)), shared_data_index: 0, task_deps: ThinVec::new(), dataobj_deps: ThinVec::new(), @@ -133,7 +132,7 @@ pub fn build_task_def_from_config( #[builder(pattern = "owned")] pub struct TaskConfig { #[builder(default)] - pub id: Option, + pub id: Option, #[builder(default)] time_limit: Option, @@ -195,7 +194,7 @@ pub fn simple_args(args: &[&'static str]) -> Vec { args.iter().map(|&v| v.to_string()).collect() } -pub fn simple_task(args: &[&'static str], id: u64) -> TaskConfigBuilder { +pub fn simple_task(args: &[&'static str], id: u32) -> TaskConfigBuilder { TaskConfigBuilder::default() .args(simple_args(args)) .id(Some(id)) diff --git a/crates/tako/src/internal/tests/test_reactor.rs b/crates/tako/src/internal/tests/test_reactor.rs index 07d89167e..e4d2026ef 100644 --- a/crates/tako/src/internal/tests/test_reactor.rs +++ b/crates/tako/src/internal/tests/test_reactor.rs @@ -2,7 +2,6 @@ use std::time::Duration; use crate::datasrv::DataObjectId; use crate::gateway::LostWorkerReason; -use crate::internal::common::index::AsIdVec; use crate::internal::common::resources::ResourceDescriptor; use crate::internal::messages::common::TaskFailInfo; use crate::internal::messages::worker::{ @@ -82,8 +81,8 @@ fn test_worker_add() { assert!( matches!(comm.take_broadcasts(1)[0], ToWorkerMessage::NewWorker(NewWorkerMsg { - worker_id: WorkerId(402), address: ref _a, resources: ref r, - }) if r.n_resources == vec![ResourceAmount::new_units(4)]) + worker_id, address: ref _a, resources: ref r, + }) if worker_id.as_num() == 402 && r.n_resources == vec![ResourceAmount::new_units(4)]) ); comm.check_need_scheduling(); @@ -137,8 +136,8 @@ fn test_worker_add() { assert!( matches!(comm.take_broadcasts(1)[0], ToWorkerMessage::NewWorker(NewWorkerMsg { - worker_id: WorkerId(502), address: ref a, resources: ref r, - }) if a == "test2:123" && r.n_resources == vec![ResourceAmount::new_units(5), ResourceAmount::new_units(0), ResourceAmount::new_units(100_000_000)]) + worker_id, address: ref a, resources: ref r, + }) if worker_id.as_num() == 502 && a == "test2:123" && r.n_resources == vec![ResourceAmount::new_units(5), ResourceAmount::new_units(0), ResourceAmount::new_units(100_000_000)]) ); comm.check_need_scheduling(); comm.emptiness_check(); @@ -190,7 +189,7 @@ fn test_submit_jobs() { assert_eq!(t6.get_unfinished_deps(), 4); } -fn no_data_task_finished(task_id: u64) -> TaskFinishedMsg { +fn no_data_task_finished(task_id: u32) -> TaskFinishedMsg { TaskFinishedMsg { id: task_id.into(), outputs: vec![], @@ -241,23 +240,26 @@ fn test_assignments_and_finish() { assert!(matches!( msgs[0], ToWorkerMessage::ComputeTask(ComputeTaskMsg { - id: TaskId(11), + id, user_priority: 12, .. - }) + }) if id.job_task_id().as_num() == 11 )); assert!(matches!( msgs[1], ToWorkerMessage::ComputeTask(ComputeTaskMsg { - id: TaskId(15), + id, scheduler_priority: 0, .. - }) + }) if id.job_task_id().as_num() == 15 )); let msgs = comm.take_worker_msgs(101, 1); assert!(matches!( msgs[0], - ToWorkerMessage::ComputeTask(ComputeTaskMsg { id: TaskId(12), .. }) + ToWorkerMessage::ComputeTask(ComputeTaskMsg { + id, + .. + }) if id.job_task_id().as_num() == 12 )); comm.emptiness_check(); @@ -275,7 +277,7 @@ fn test_assignments_and_finish() { check_worker_tasks_exact(&core, 102, &[]); comm.check_need_scheduling(); - assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(15)); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new_test(15)); comm.emptiness_check(); assert!(core.find_task(15.into()).is_none()); @@ -291,7 +293,7 @@ fn test_assignments_and_finish() { check_worker_tasks_exact(&core, 102, &[]); comm.check_need_scheduling(); - assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(12)); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new_test(12)); comm.emptiness_check(); on_task_finished(&mut core, &mut comm, 100.into(), no_data_task_finished(11)); @@ -304,10 +306,13 @@ fn test_assignments_and_finish() { let msgs = comm.take_worker_msgs(101, 1); assert!(matches!( msgs[0], - ToWorkerMessage::ComputeTask(ComputeTaskMsg { id: TaskId(13), .. }) + ToWorkerMessage::ComputeTask(ComputeTaskMsg { + id, + .. + }) if id.job_task_id().as_num() == 13 )); - assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(11)); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new_test(11)); comm.emptiness_check(); core.sanity_check(); @@ -315,7 +320,10 @@ fn test_assignments_and_finish() { comm.check_need_scheduling(); - assert_eq!(comm.take_client_task_finished(1), vec![13].to_ids()); + assert_eq!( + comm.take_client_task_finished(1), + vec![TaskId::new_test(13)] + ); comm.emptiness_check(); core.sanity_check(); @@ -349,8 +357,15 @@ fn test_running_task_on_error() { let mut msgs = comm.take_client_task_errors(1); let (id, cs, _) = msgs.pop().unwrap(); - assert_eq!(id.as_num(), 13); - assert_eq!(sorted_vec(cs), vec![15, 16, 17].to_ids()); + assert_eq!(id.job_task_id().as_num(), 13); + assert_eq!( + sorted_vec(cs), + vec![ + TaskId::new_test(15), + TaskId::new_test(16), + TaskId::new_test(17) + ] + ); comm.emptiness_check(); assert!(core.find_task(16.into()).is_none()); @@ -383,7 +398,7 @@ fn test_steal_tasks_ok() { let msgs = comm.take_worker_msgs(101, 1); assert!( - matches!(&msgs[0], ToWorkerMessage::StealTasks(ids) if ids.ids == vec![task_id].to_ids()) + matches!(&msgs[0], ToWorkerMessage::StealTasks(ids) if ids.ids == vec![TaskId::new_test(task_id)]) ); comm.emptiness_check(); @@ -406,7 +421,8 @@ fn test_steal_tasks_ok() { let msgs = comm.take_worker_msgs(100, 1); assert!(matches!( &msgs[0], - ToWorkerMessage::ComputeTask(ComputeTaskMsg { id: TaskId(13), .. }) + ToWorkerMessage::ComputeTask(ComputeTaskMsg { id, .. }) + if id.job_task_id().as_num() == 13 )); comm.emptiness_check(); core.sanity_check(); @@ -428,7 +444,9 @@ fn test_steal_tasks_running() { scheduler.finish_scheduling(&mut core, &mut comm); let msgs = comm.take_worker_msgs(101, 1); - assert!(matches!(&msgs[0], ToWorkerMessage::StealTasks(ids) if ids.ids == vec![13].to_ids())); + assert!( + matches!(&msgs[0], ToWorkerMessage::StealTasks(ids) if ids.ids == vec![TaskId::new_test(13)]) + ); comm.emptiness_check(); assert!(!worker_has_task(&core, 101, 13)); @@ -471,7 +489,7 @@ fn finish_task_without_outputs() { let mut comm = create_test_comm(); on_task_finished(&mut core, &mut comm, 100.into(), no_data_task_finished(1)); comm.check_need_scheduling(); - assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(1)); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new_test(1)); comm.emptiness_check(); core.sanity_check(); } @@ -514,16 +532,19 @@ fn test_task_cancel() { .map(|id| id.into()) .collect::>() ); - assert_eq!(sorted_vec(ft), vec![11, 33].to_ids()); + assert_eq!( + sorted_vec(ft), + vec![TaskId::new_test(11), TaskId::new_test(33)] + ); let msgs = comm.take_worker_msgs(100, 1); assert!( - matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if ids == &vec![41].to_ids()) + matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if ids == &vec![TaskId::new_test(41)]) ); let msgs = comm.take_worker_msgs(101, 1); assert!( - matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if sorted_vec(ids.clone()) == vec![12, 40].to_ids()) + matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if sorted_vec(ids.clone()) == vec![TaskId::new_test(12), TaskId::new_test(40)]) ); assert_eq!(core.task_map().len(), 1); @@ -542,7 +563,7 @@ fn test_worker_lost_with_mn_task_non_root() { submit_test_tasks(&mut core, vec![task1]); start_mn_task_on_worker( &mut core, - TaskId::new(1), + TaskId::new_test(1), vec![WorkerId::new(103), WorkerId::new(101), WorkerId::new(100)], ); let mut comm = create_test_comm(); @@ -556,15 +577,14 @@ fn test_worker_lost_with_mn_task_non_root() { assert_eq!(comm.take_lost_workers().len(), 1); let msgs = comm.take_worker_msgs(103, 1); assert!( - matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if ids == &vec![1].to_ids()) + matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if ids == &vec![TaskId::new_test(1)]) ); assert!(matches!( comm.take_broadcasts(1)[0], - ToWorkerMessage::LostWorker(WorkerId(101)) - )); + ToWorkerMessage::LostWorker(w) if w.as_num() == 101)); comm.check_need_scheduling(); comm.emptiness_check(); - assert!(core.get_task(TaskId::new(1)).is_waiting()); + assert!(core.get_task(TaskId::new_test(1)).is_waiting()); } #[test] @@ -575,7 +595,7 @@ fn test_worker_lost_with_mn_task_root() { submit_test_tasks(&mut core, vec![task1]); start_mn_task_on_worker( &mut core, - TaskId::new(1), + TaskId::new_test(1), vec![WorkerId::new(103), WorkerId::new(101), WorkerId::new(100)], ); let mut comm = create_test_comm(); @@ -589,11 +609,11 @@ fn test_worker_lost_with_mn_task_root() { assert_eq!(comm.take_lost_workers().len(), 1); assert!(matches!( comm.take_broadcasts(1)[0], - ToWorkerMessage::LostWorker(WorkerId(103)) + ToWorkerMessage::LostWorker(w) if w.as_num() == 103 )); comm.check_need_scheduling(); comm.emptiness_check(); - assert!(core.get_task(TaskId::new(1)).is_waiting()); + assert!(core.get_task(TaskId::new_test(1)).is_waiting()); } #[test] @@ -602,7 +622,7 @@ fn test_worker_crashing_task() { let t1 = task(1); submit_test_tasks(&mut core, vec![t1]); - assert_eq!(core.get_task(TaskId::new(1)).crash_counter, 0); + assert_eq!(core.get_task(TaskId::new_test(1)).crash_counter, 0); for x in 1..=5 { let mut comm = create_test_comm(); @@ -620,20 +640,20 @@ fn test_worker_crashing_task() { comm.check_need_scheduling(); assert!(matches!( comm.take_broadcasts(1)[0], - ToWorkerMessage::LostWorker(WorkerId(wid)) - if worker_id == wid + ToWorkerMessage::LostWorker(w) + if worker_id == w.as_num() )); if x == 5 { let errs = comm.take_client_task_errors(1); - assert_eq!(errs[0].0, TaskId::new(1)); + assert_eq!(errs[0].0, TaskId::new_test(1)); assert_eq!( errs[0].2.message, "Task was running on a worker that was lost; the task has occurred 5 times in this situation and limit was reached." ); - assert_eq!(std::mem::take(&mut lw[0].1), vec![].to_ids()); + assert_eq!(std::mem::take(&mut lw[0].1), vec![]); } else { - assert_eq!(std::mem::take(&mut lw[0].1), vec![1].to_ids()); - assert_eq!(core.get_task(TaskId::new(1)).crash_counter, x); + assert_eq!(std::mem::take(&mut lw[0].1), vec![TaskId::new_test(1)]); + assert_eq!(core.get_task(TaskId::new_test(1)).crash_counter, x); } comm.emptiness_check(); } @@ -647,7 +667,7 @@ fn test_task_mn_fail() { submit_test_tasks(&mut core, vec![task1]); start_mn_task_on_worker( &mut core, - TaskId::new(1), + TaskId::new_test(1), vec![WorkerId::new(103), WorkerId::new(101), WorkerId::new(100)], ); let mut comm = create_test_comm(); @@ -662,7 +682,7 @@ fn test_task_mn_fail() { ); core.sanity_check(); let msgs = comm.take_client_task_errors(1); - assert_eq!(msgs[0].0, TaskId::new(1)); + assert_eq!(msgs[0].0, TaskId::new_test(1)); comm.emptiness_check(); assert!(core.find_task(1.into()).is_none()); for w in &[100, 101, 102, 103] { @@ -683,20 +703,20 @@ fn test_task_mn_cancel() { submit_test_tasks(&mut core, vec![task1]); start_mn_task_on_worker( &mut core, - TaskId::new(1), + TaskId::new_test(1), vec![WorkerId::new(103), WorkerId::new(101), WorkerId::new(100)], ); let mut comm = create_test_comm(); - let (ct, ft) = on_cancel_tasks(&mut core, &mut comm, &[TaskId(1)]); + let (ct, ft) = on_cancel_tasks(&mut core, &mut comm, &[TaskId::new_test(1)]); core.sanity_check(); let msgs = comm.take_worker_msgs(103, 1); assert!( - matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if ids == &vec![1].to_ids()) + matches!(&msgs[0], &ToWorkerMessage::CancelTasks(TaskIdsMsg { ref ids }) if ids == &vec![TaskId::new_test(1)]) ); comm.check_need_scheduling(); comm.emptiness_check(); assert!(ft.is_empty()); - assert_eq!(ct, vec![TaskId::new(1)]); + assert_eq!(ct, vec![TaskId::new_test(1)]); let mut scheduler = create_test_scheduler(); scheduler.run_scheduling(&mut core, &mut comm); @@ -732,26 +752,26 @@ fn test_running_task() { comm.emptiness_check(); on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(1)); - assert_eq!(comm.take_client_task_running(1), vec![1].to_ids()); + assert_eq!(comm.take_client_task_running(1), vec![TaskId::new_test(1)]); comm.emptiness_check(); on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(2)); - assert_eq!(comm.take_client_task_running(1)[0], TaskId::new(2)); + assert_eq!(comm.take_client_task_running(1)[0], TaskId::new_test(2)); comm.emptiness_check(); assert!(matches!( core.task(1).state, TaskRuntimeState::Running { - worker_id: WorkerId(101), + worker_id, .. - } + } if worker_id.as_num() == 101 )); assert!(matches!( core.task(2).state, TaskRuntimeState::Running { - worker_id: WorkerId(101), + worker_id, .. - } + } if worker_id.as_num() == 101 )); on_remove_worker( @@ -764,12 +784,12 @@ fn test_running_task() { assert_eq!(lw[0].0, WorkerId::new(101)); assert_eq!( sorted_vec(std::mem::take(&mut lw[0].1)), - vec![1, 2].to_ids() + vec![TaskId::new_test(1), TaskId::new_test(2)] ); comm.check_need_scheduling(); assert!(matches!( comm.take_broadcasts(1)[0], - ToWorkerMessage::LostWorker(WorkerId(101)) + ToWorkerMessage::LostWorker(w) if w.as_num() == 101 )); comm.emptiness_check(); } @@ -788,7 +808,7 @@ fn test_finished_before_steal_response() { on_task_finished(&mut core, &mut comm, 101.into(), no_data_task_finished(1)); comm.check_need_scheduling(); - assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new(1)); + assert_eq!(comm.take_client_task_finished(1)[0], TaskId::new_test(1)); comm.emptiness_check(); assert!(!worker_has_task(&core, 101, 1)); @@ -822,7 +842,7 @@ fn test_running_before_steal_response() { let mut comm = create_test_comm(); on_task_running(&mut core, &mut comm, 101.into(), task_running_msg(1)); comm.check_need_scheduling(); - assert_eq!(comm.take_client_task_running(1)[0], TaskId::new(1)); + assert_eq!(comm.take_client_task_running(1)[0], TaskId::new_test(1)); comm.emptiness_check(); assert!(worker_has_task(&core, 101, 1)); @@ -940,7 +960,7 @@ fn lost_worker_with_running_and_assign_tasks() { assert_eq!( comm.take_lost_workers(), - vec![(WorkerId::new(101), vec![12].to_ids())] + vec![(WorkerId::new(101), vec![TaskId::new_test(12)])] ); assert_eq!(core.take_single_node_ready_to_assign().len(), 3); @@ -951,12 +971,12 @@ fn lost_worker_with_running_and_assign_tasks() { core.assert_fresh(&[11, 12, 40]); assert!(matches!( core.get_task(41.into()).state, - TaskRuntimeState::Stealing(WorkerId(100), None) + TaskRuntimeState::Stealing(w, None) if w.as_num() == 100 )); assert!(matches!( comm.take_broadcasts(1)[0], - ToWorkerMessage::LostWorker(WorkerId(101)) + ToWorkerMessage::LostWorker(w) if w.as_num() == 101 )); comm.check_need_scheduling(); @@ -1119,12 +1139,12 @@ fn test_data_deps_no_output() { outputs: vec![], }, ); - assert_eq!(comm.take_client_task_finished(1), vec![TaskId::new(1)]); + assert_eq!(comm.take_client_task_finished(1), vec![TaskId::new_test(1)]); let errors = comm.take_client_task_errors(1); - assert_eq!(errors[0].0, TaskId::new(2)); + assert_eq!(errors[0].0, TaskId::new_test(2)); assert_eq!( &errors[0].2.message, - "Task 1 did not produced expected output(s): 11" + "Task 0@1 did not produced expected output(s): 11" ); comm.check_need_scheduling(); comm.emptiness_check(); @@ -1171,12 +1191,12 @@ fn test_data_deps_missing_outputs() { ], }, ); - assert_eq!(comm.take_client_task_finished(1), vec![TaskId::new(1)]); + assert_eq!(comm.take_client_task_finished(1), vec![TaskId::new_test(1)]); let errors = comm.take_client_task_errors(1); - assert_eq!(errors[0].0, TaskId::new(2)); + assert_eq!(errors[0].0, TaskId::new_test(2)); assert_eq!( &errors[0].2.message, - "Task 1 did not produced expected output(s): 11, 100" + "Task 0@1 did not produced expected output(s): 11, 100" ); let messages = comm.take_worker_msgs(100, 2); assert!( @@ -1199,7 +1219,7 @@ fn test_data_deps_basic() { .data_dep(&t2, 478) .build(); submit_test_tasks(&mut core, vec![t1, t2, t3]); - assert_eq!(core.get_task(2.into()).task_deps, [TaskId::new(1)]); + assert_eq!(core.get_task(2.into()).task_deps, [TaskId::new_test(1)]); core.assert_waiting(&[2, 3]); core.assert_ready(&[1]); create_test_workers(&mut core, &[4]); diff --git a/crates/tako/src/internal/tests/test_scheduler_mn.rs b/crates/tako/src/internal/tests/test_scheduler_mn.rs index 3fd818120..ac29e0031 100644 --- a/crates/tako/src/internal/tests/test_scheduler_mn.rs +++ b/crates/tako/src/internal/tests/test_scheduler_mn.rs @@ -220,7 +220,7 @@ fn test_schedule_mn_fill() { assert!(w.mn_task().is_some()); } for t in &[1, 2, 3, 4] { - assert!(core.get_task(TaskId::new(*t)).is_mn_running()); + assert!(core.get_task(TaskId::new_test(*t)).is_mn_running()); } } @@ -242,7 +242,7 @@ fn test_mn_not_enough() { assert!(w.mn_task().is_none()); } for t in &[1, 2, 3, 4] { - assert!(core.get_task(TaskId::new(*t)).is_waiting()); + assert!(core.get_task(TaskId::new_test(*t)).is_waiting()); } let (mn_queue, _, _) = core.multi_node_queue_split(); diff --git a/crates/tako/src/internal/tests/test_scheduler_sn.rs b/crates/tako/src/internal/tests/test_scheduler_sn.rs index b581ba621..0f25347a2 100644 --- a/crates/tako/src/internal/tests/test_scheduler_sn.rs +++ b/crates/tako/src/internal/tests/test_scheduler_sn.rs @@ -2,7 +2,6 @@ use crate::TaskId; use crate::internal::common::Set; -use crate::internal::common::index::ItemId; use crate::internal::messages::worker::{ StealResponse, StealResponseMsg, TaskOutput, ToWorkerMessage, }; @@ -394,7 +393,7 @@ fn test_resources_no_workers2() { let unschedulable_index = task_cpu_counts .iter() .position(|&count| count > 10) - .unwrap() as ::IdType + .unwrap() as u32 + rt.task_id_counter; rt.new_workers(&[8, 8, 8]); @@ -415,7 +414,7 @@ fn test_resources_no_workers2() { let sn = rt.core().take_sleeping_tasks(); assert_eq!(sn.len(), 1); - assert_eq!(sn[0], TaskId::new(unschedulable_index)); + assert_eq!(sn[0], TaskId::new_test(unschedulable_index)); } check(&[9, 10, 11]); @@ -537,7 +536,7 @@ fn test_generic_resource_assign2() { .unwrap() .sn_tasks() .iter() - .all(|task_id| task_id.as_num() < 50) + .all(|task_id| task_id.job_task_id().as_num() < 50) ); assert!(!rt.worker(100).is_parked()); @@ -836,14 +835,14 @@ fn test_task_data_deps_initial_placing() { #[test] fn test_task_data_deps_balancing() { let _ = env_logger::builder().is_test(true).try_init(); - for odd in [0, 1] { + for odd in [0u32, 1u32] { for late_worker in [true, false] { let mut core = Core::default(); let t1 = TaskBuilder::new(1).build(); let t2 = TaskBuilder::new(2).build(); let mut ts: Vec<_> = (10u32..110u32) .map(|i| { - TaskBuilder::new(TaskId::new(i as u64)) + TaskBuilder::new(TaskId::new_test(i)) .data_dep(&t1, i - 10) .data_dep(&t2, i - 10) .build() @@ -857,7 +856,7 @@ fn test_task_data_deps_balancing() { } else { create_test_workers(&mut core, &[1, 1]); } - let mut set_data = |task_id: u64, worker_id: u32| { + let mut set_data = |task_id: u32, worker_id: u32| { start_and_finish_on_worker_with_data( &mut core, task_id, @@ -865,7 +864,7 @@ fn test_task_data_deps_balancing() { (0u32..100u32) .map(|i| TaskOutput { id: i.into(), - size: if (i % 2) as u64 == odd { 100 } else { 5_000 }, + size: if (i % 2) == odd { 100 } else { 5_000 }, }) .collect(), ) @@ -886,7 +885,13 @@ fn test_task_data_deps_balancing() { let n1_count = worker .sn_tasks() .iter() - .map(|task_id| if task_id.as_num() % 2 == odd { 1 } else { 0 }) + .map(|task_id| { + if task_id.job_task_id().as_num() % 2 == odd { + 1 + } else { + 0 + } + }) .sum::(); assert!(n1_count > 40); } diff --git a/crates/tako/src/internal/tests/test_worker.rs b/crates/tako/src/internal/tests/test_worker.rs index a35a2b6d0..504ab66f0 100644 --- a/crates/tako/src/internal/tests/test_worker.rs +++ b/crates/tako/src/internal/tests/test_worker.rs @@ -111,7 +111,7 @@ fn test_worker_start_task() { comm.check_start_task_notifications(1); comm.check_emptiness(); - assert_eq!(state.find_task(7.into()).unwrap().id, TaskId::new(7)); + assert_eq!(state.find_task(7.into()).unwrap().id, TaskId::new_test(7)); assert!(state.running_tasks.is_empty()); let requests = state.ready_task_queue.requests(); assert_eq!(requests.len(), 1); @@ -133,7 +133,7 @@ fn test_worker_start_task_resource_variants() { comm.check_start_task_notifications(1); comm.check_emptiness(); - assert_eq!(state.find_task(7.into()).unwrap().id, TaskId::new(7)); + assert_eq!(state.find_task(7.into()).unwrap().id, TaskId::new_test(7)); assert!(state.running_tasks.is_empty()); let requests = state.ready_task_queue.requests(); assert_eq!(requests.len(), 1); diff --git a/crates/tako/src/internal/tests/utils/env.rs b/crates/tako/src/internal/tests/utils/env.rs index cdc609081..18e9f2018 100644 --- a/crates/tako/src/internal/tests/utils/env.rs +++ b/crates/tako/src/internal/tests/utils/env.rs @@ -32,7 +32,7 @@ use std::time::Duration; pub struct TestEnv { core: Core, scheduler: SchedulerState, - pub task_id_counter: ::IdType, + pub task_id_counter: u32, worker_id_counter: ::IdType, } diff --git a/crates/tako/src/internal/worker/test_util.rs b/crates/tako/src/internal/worker/test_util.rs index 6a57f7c0b..91a2c75ef 100644 --- a/crates/tako/src/internal/worker/test_util.rs +++ b/crates/tako/src/internal/worker/test_util.rs @@ -110,19 +110,19 @@ impl ResourceQueueBuilder { self.queue.new_worker(worker_id, wr); } - pub fn start_tasks(&mut self) -> Map> { + pub fn start_tasks(&mut self) -> Map> { self.queue .try_start_tasks(&self.task_map, None) .into_iter() - .map(|(t, a, _)| (t.as_num(), a)) + .map(|(t, a, _)| (t.job_task_id().as_num(), a)) .collect() } - pub fn start_tasks_duration(&mut self, duration: Duration) -> Map> { + pub fn start_tasks_duration(&mut self, duration: Duration) -> Map> { self.queue .try_start_tasks(&self.task_map, Some(duration)) .into_iter() - .map(|(t, a, _)| (t.as_num(), a)) + .map(|(t, a, _)| (t.job_task_id().as_num(), a)) .collect() } } diff --git a/crates/tako/src/lib.rs b/crates/tako/src/lib.rs index 293924364..9a156d92c 100644 --- a/crates/tako/src/lib.rs +++ b/crates/tako/src/lib.rs @@ -16,9 +16,9 @@ pub use crate::internal::common::taskgroup::TaskGroup; pub use crate::internal::common::utils::format_comma_delimited; pub use crate::internal::common::{Map, Set}; -define_id_type!(WorkerId, u32); -define_id_type!(TaskId, u64); -define_id_type!(InstanceId, u32); +pub use crate::internal::common::ids::{InstanceId, JobId, JobTaskId, TaskId, WorkerId}; + +pub type JobTaskCount = u32; // Priority: Bigger number -> Higher priority pub type Priority = i32; diff --git a/tests/test_events.py b/tests/test_events.py index e5f91a25e..88dc1b603 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -71,13 +71,13 @@ def test_worker_journal_replay(hq_env: HqEnv, tmp_path): assert events[0]["event"]["type"] == "server-start" -def test_worker_replay_without_journal(hq_env: HqEnv, tmp_path): +def test_worker_replay_without_journal(hq_env: HqEnv): hq_env.start_server() w = hq_env.start_worker() hq_env.start_worker() wait_for_worker_state(hq_env, [1, 2], "RUNNING") hq_env.command(["worker", "stop", "1"]) - wait_for_worker_state(hq_env, [1], "STOPPED") + wait_for_worker_state(hq_env, [1], "STOPPED", check_running_processes=False) hq_env.check_process_exited(w) events = get_replayed_events(hq_env) assert_contains_event(events, lambda e: e["event"]["type"] == "worker-connected" and e["event"]["id"] == 1) diff --git a/tests/test_journal.py b/tests/test_journal.py index fac07c4b5..fd90426fa 100644 --- a/tests/test_journal.py +++ b/tests/test_journal.py @@ -348,7 +348,7 @@ def collect_ids(): def test_restore_dependencies1(hq_env: HqEnv, tmp_path): journal_path = os.path.join(tmp_path, "my.journal") hq_env.start_server(args=["--journal", journal_path]) - hq_env.start_worker() + hq_env.start_worker(cpus=2) tmp_path.joinpath("sleep_time").write_text("100") tmp_path.joinpath("job.toml").write_text( """ diff --git a/tests/test_worker.py b/tests/test_worker.py index 8ad3df11d..f7c0ecaf1 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -317,7 +317,7 @@ def test_deploy_ssh_error(hq_env: HqEnv): with hq_env.mock.mock_program_with_code( "ssh", """ - exit(42) + exit(42) """, ): nodefile = prepare_localhost_nodefile() @@ -351,8 +351,8 @@ def test_deploy_ssh_wait_for_worker(hq_env: HqEnv): with hq_env.mock.mock_program_with_code( "ssh", """ - import time - time.sleep(1) + import time + time.sleep(1) """, ): nodefile = prepare_localhost_nodefile() @@ -364,12 +364,12 @@ def test_deploy_ssh_multiple_workers(hq_env: HqEnv): with hq_env.mock.mock_program_with_code( "ssh", """ - import os + import os - os.makedirs("workers", exist_ok=True) - with open(f"workers/{os.getpid()}", "w") as f: - f.write(str(os.getpid())) - """, + os.makedirs("workers", exist_ok=True) + with open(f"workers/{os.getpid()}", "w") as f: + f.write(str(os.getpid())) + """, ): nodefile = prepare_localhost_nodefile(count=3) hq_env.command(["worker", "deploy-ssh", nodefile]) @@ -381,8 +381,8 @@ def test_deploy_ssh_show_output(hq_env: HqEnv): with hq_env.mock.mock_program_with_code( "ssh", """ - print("FOOBAR") - """, + print("FOOBAR") + """, ): nodefile = prepare_localhost_nodefile(count=3) output = hq_env.command(["worker", "deploy-ssh", "--show-output", nodefile]) @@ -424,15 +424,15 @@ def test_worker_state_info(hq_env: HqEnv): wait_for_job_state(hq_env, 1, "RUNNING") table = hq_env.command(["worker", "info", "1"], as_table=True) table.check_row_value("State", "RUNNING") - assert "Job: 1;" in table.get_row_value("Last task started") + assert "1@" in table.get_row_value("Last task started") table.check_row_value("Runtime Info", "assigned tasks: 2; running tasks: 1") hq_env.start_worker() hq_env.command(["submit", "--nodes=2", "--", "sleep", "1"]) wait_for_job_state(hq_env, 2, "RUNNING") table = hq_env.command(["worker", "info", "1"], as_table=True) a = table.get_row_value("Runtime Info") - assert "Job: 2;" in table.get_row_value("Last task started") + assert "2@0" in table.get_row_value("Last task started") table = hq_env.command(["worker", "info", "2"], as_table=True) b = table.get_row_value("Runtime Info") - assert "Job: 2;" in table.get_row_value("Last task started") + assert "2@0" in table.get_row_value("Last task started") assert {a, b} == {"running multinode task; main node", "running multinode task; secondary node"} diff --git a/tests/utils/wait.py b/tests/utils/wait.py index 168c58413..2bd2501eb 100644 --- a/tests/utils/wait.py +++ b/tests/utils/wait.py @@ -90,12 +90,17 @@ def wait_for_task_state(env, job_id: int, task_ids: Union[int, List[int]], state ids = ",".join(str(t) for t in task_ids) states = [s.lower() for s in states] + result = None def check(): + nonlocal result result = env.command(["--output-mode=json", "task", "info", str(job_id), ids], as_json=True) return [r["state"] for r in result] == states - wait_until(check, **kwargs) + def on_timeout(): + return f"most recent output:\n{result}" + + wait_until(check, on_timeout=on_timeout, **kwargs) def wait_for_pid_exit(pid: int):