Skip to content

TaskId unification #898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 6, 2025
30 changes: 12 additions & 18 deletions crates/hyperqueue/src/client/commands/journal/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,26 @@ fn format_payload(event: EventPayload) -> serde_json::Value {
"allocation-id": allocation_id,
})
}
EventPayload::TaskStarted {
job_id, task_id, ..
} => json!({
EventPayload::TaskStarted { task_id, .. } => json!({
"type": "task-started",
"job": job_id,
"task": task_id,
"job": task_id.job_id(),
"task": task_id.job_task_id(),
"worker": -1
}),
EventPayload::TaskFinished { job_id, task_id } => json!({
EventPayload::TaskFinished { task_id } => json!({
"type": "task-finished",
"job": job_id,
"task": task_id
"job": task_id.job_id(),
"task": task_id.job_task_id(),
}),
EventPayload::TaskCanceled { job_id, task_id } => json!({
EventPayload::TaskCanceled { task_id } => json!({
"type": "task-canceled",
"job": job_id,
"task": task_id
"job": task_id.job_id(),
"task": task_id.job_task_id(),
}),
EventPayload::TaskFailed {
job_id,
task_id,
error,
} => json!({
EventPayload::TaskFailed { task_id, error } => json!({
"type": "task-failed",
"job": job_id,
"task": task_id,
"job": task_id.job_id(),
"task": task_id.job_task_id(),
"error": error
}),
EventPayload::Submit {
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/outputlog.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::JobId;
use crate::client::globalsettings::GlobalSettings;
use crate::common::arraydef::IntArray;
use crate::stream::reader::outputlog::OutputLog;
use clap::Parser;
use std::path::PathBuf;
use tako::JobId;

#[derive(Parser)]
pub struct OutputLogOpts {
Expand Down
3 changes: 2 additions & 1 deletion crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ use crate::common::placeholders::{
use crate::common::utils::fs::get_current_dir;
use crate::common::utils::str::pluralize;
use crate::common::utils::time::parse_hms_or_human_time;
use crate::rpc_call;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
FromClientMessage, IdSelector, JobDescription, JobSubmitDescription, JobTaskDescription,
PinMode, SubmitRequest, SubmitResponse, TaskDescription, TaskKind, TaskKindProgram,
ToClientMessage,
};
use crate::{JobId, JobTaskCount, Map, rpc_call};
use anyhow::{anyhow, bail};
use bstr::BString;
use chumsky::Parser as ChumskyParser;
Expand All @@ -40,6 +40,7 @@ use tako::gateway::{
};
use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef};
use tako::resources::{AllocationRequest, CPU_RESOURCE_NAME, NumOfNodes, ResourceAmount};
use tako::{JobId, JobTaskCount, Map};

const SUBMIT_ARRAY_LIMIT: JobTaskCount = 999;
pub const DEFAULT_CRASH_LIMIT: u32 = 5;
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/client/commands/submit/defs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::JobDataObjectId;
use crate::client::resources::parse_allocation_request;
use crate::common::arraydef::IntArray;
use crate::common::arrayparser::parse_array;
use crate::common::error::HqError;
use crate::common::utils::time::parse_human_time;
use crate::{JobDataObjectId, JobTaskCount, JobTaskId};
use bstr::BString;
use serde::de::MapAccess;
use serde::{Deserialize, Deserializer};
Expand All @@ -13,7 +13,7 @@ use std::time::Duration;
use tako::gateway::{ResourceRequest, ResourceRequestEntries, ResourceRequestEntry};
use tako::program::FileOnCloseBehavior;
use tako::resources::{AllocationRequest, NumOfNodes, ResourceAmount};
use tako::{Map, Priority};
use tako::{JobTaskCount, JobTaskId, Map, Priority};

#[derive(Deserialize)]
#[serde(untagged)]
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/client/commands/submit/jobfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use crate::transfer::messages::{
JobDescription, JobSubmitDescription, JobTaskDescription, PinMode, SubmitRequest,
TaskDescription, TaskKind, TaskKindProgram, TaskWithDependencies,
};
use crate::{JobId, JobTaskCount, JobTaskId};
use clap::Parser;
use smallvec::smallvec;
use std::path::PathBuf;
use tako::Map;
use tako::gateway::{ResourceRequest, ResourceRequestVariants, TaskDataFlags};
use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef};
use tako::{JobId, JobTaskCount, JobTaskId};

#[derive(Parser)]
pub struct JobSubmitFileOpts {
Expand Down Expand Up @@ -128,7 +128,7 @@ fn build_job_desc_individual_tasks(
.map(|t| t.id)
.max()
.flatten()
.unwrap_or(JobTaskId(0));
.unwrap_or(JobTaskId::new(0));

/* Topological sort */
let original_len = tasks.len();
Expand Down
3 changes: 2 additions & 1 deletion crates/hyperqueue/src/client/commands/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ use crate::client::output::cli::{
use crate::client::status::{Status, is_terminated};
use crate::common::arraydef::IntArray;
use crate::common::utils::str::pluralize;
use crate::rpc_call;
use crate::server::job::JobTaskCounters;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
FromClientMessage, IdSelector, JobDetailRequest, JobInfo, JobInfoRequest, TaskIdSelector,
TaskSelector, TaskStatusSelector, ToClientMessage, WaitForJobsRequest,
};
use crate::{JobId, JobTaskCount, rpc_call};
use colored::Colorize;
use tako::{JobId, JobTaskCount};

pub async fn wait_for_jobs(
gsettings: &GlobalSettings,
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use tokio::signal::ctrl_c;
use tokio::task::JoinSet;
use tokio::time::sleep;

use crate::WorkerId;
use crate::client::globalsettings::GlobalSettings;
use crate::client::utils::{PassThroughArgument, passthrough_parser};
use crate::common::cli::DeploySshOpts;
Expand All @@ -42,6 +41,7 @@ use crate::worker::hwdetect::{
};
use crate::worker::parser::{parse_cpu_definition, parse_resource_definition};
use crate::{DEFAULT_WORKER_GROUP_NAME, rpc_call};
use tako::WorkerId;

#[derive(clap::ValueEnum, Clone)]
pub enum WorkerFilter {
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::rpc_call;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{FromClientMessage, ToClientMessage};
use crate::{Map, WorkerId};
use tako::{Map, WorkerId};

/// Maps worker IDs to hostnames.
pub type WorkerMap = Map<WorkerId, String>;
Expand Down
12 changes: 2 additions & 10 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::transfer::messages::{
ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerExitInfo,
WorkerInfo,
};
use crate::{JobId, JobTaskCount, JobTaskId, WorkerId};
use tako::{JobId, JobTaskCount, JobTaskId, WorkerId};

use chrono::{DateTime, Local, SubsecRound, Utc};
use core::time::Duration;
Expand Down Expand Up @@ -366,15 +366,7 @@ impl Output for CliOutput {
vec![
"Last task started".cell().bold(true),
last_task_started
.map(|t| {
format!(
"Job: {}; Task: {}; Time: {}",
t.job_id,
t.task_id,
format_datetime(t.time)
)
.cell()
})
.map(|t| format!("{}; Time: {}", t.task_id, format_datetime(t.time)).cell())
.unwrap_or_else(|| "".cell()),
],
];
Expand Down
13 changes: 6 additions & 7 deletions crates/hyperqueue/src/client/output/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::JobTaskId;
use crate::client::status::{Status, job_status};
use crate::common::placeholders::{
CompletePlaceholderCtx, ResolvablePaths, fill_placeholders_in_paths,
Expand All @@ -10,6 +9,7 @@ use crate::transfer::messages::{
use std::path::PathBuf;
use tako::Map;
use tako::program::StdioDef;
use tako::{JobTaskId, TaskId};

pub struct ResolvedTaskPaths {
pub cwd: PathBuf,
Expand All @@ -29,7 +29,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {
JobTaskDescription::Array { task_desc, ids, .. } => {
for id in ids.iter() {
task_to_desc_map.insert(
JobTaskId(id),
JobTaskId::new(id),
(submit_desc.description().submit_dir.as_path(), task_desc),
);
}
Expand All @@ -47,8 +47,8 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {

job.tasks
.iter()
.map(|(task_id, task)| {
let (submit_dir, task_desc) = task_to_desc_map.get(task_id).unwrap();
.map(|(job_task_id, task)| {
let (submit_dir, task_desc) = task_to_desc_map.get(job_task_id).unwrap();
let paths = match &task_desc.kind {
TaskKind::ExternalProgram(TaskKindProgram { program, .. }) => match &task.state {
JobTaskState::Canceled {
Expand All @@ -62,8 +62,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {
..
} => {
let ctx = CompletePlaceholderCtx {
job_id: job.info.id,
task_id: *task_id,
task_id: TaskId::new(job.info.id, *job_task_id),
instance_id: started_data.context.instance_id,
submit_dir,
server_uid,
Expand All @@ -85,7 +84,7 @@ pub fn resolve_task_paths(job: &JobDetail, server_uid: &str) -> TaskToPathsMap {
_ => None,
},
};
(*task_id, paths)
(*job_task_id, paths)
})
.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::transfer::messages::{
AutoAllocListResponse, JobDetail, JobInfo, JobTaskDescription, PinMode, QueueData, ServerInfo,
TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerInfo,
};
use crate::{JobId, JobTaskId};
use tako::{JobId, JobTaskId};

#[derive(Default)]
pub struct JsonOutput;
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use crate::client::output::Verbosity;
use crate::client::output::common::TaskToPathsMap;
use crate::common::arraydef::IntArray;
use crate::server::job::JobTaskInfo;
use crate::{JobId, JobTaskId};
use core::time::Duration;
use tako::resources::ResourceDescriptor;
use tako::{JobId, JobTaskId};

pub const MAX_DISPLAYED_WORKERS: usize = 2;

Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::transfer::messages::{
AutoAllocListResponse, JobDetail, JobInfo, ServerInfo, WaitForJobsResponse, WorkerExitInfo,
WorkerInfo,
};
use crate::{JobId, JobTaskId};
use tako::{JobId, JobTaskId};

#[derive(Default)]
pub struct Quiet;
Expand Down
3 changes: 2 additions & 1 deletion crates/hyperqueue/src/client/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use crate::client::output::{Verbosity, VerbosityFlag};
use crate::common::arraydef::IntArray;
use crate::common::cli::{TaskSelectorArg, parse_last_range, parse_last_single_id};
use crate::common::error::HqError;
use crate::rpc_call;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
FromClientMessage, IdSelector, JobDetailRequest, SingleIdSelector, TaskIdSelector,
TaskSelector, TaskStatusSelector, ToClientMessage,
};
use crate::{JobId, rpc_call};
use tako::JobId;

#[derive(clap::Parser)]
pub struct TaskOpts {
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/common/arrayparser.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::Set;
use crate::common::arraydef::{IntArray, IntRange};
use crate::common::parser2::{CharParser, ParseError, all_consuming, parse_u32};
use chumsky::Parser;
use chumsky::primitive::just;
use chumsky::text::TextParser;
use tako::Set;

/// Parse integer range in the format n[-end][:step].
fn parse_range() -> impl CharParser<IntRange> {
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/common/manager/slurm.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::Map;
use crate::common::manager::common::format_duration;
use crate::common::utils::time::parse_hms_time;
use std::process::Command;
use std::time::Duration;
use tako::Map;

/// Format a duration as a SLURM time string, e.g. 01:05:02
pub fn format_slurm_duration(duration: &Duration) -> String {
Expand Down
21 changes: 11 additions & 10 deletions crates/hyperqueue/src/common/placeholders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::path::{Path, PathBuf};
use nom::bytes::complete::take_until;
use nom::sequence::delimited;
use nom_supreme::tag::complete::tag;
use tako::InstanceId;
use tako::{InstanceId, TaskId};

use tako::program::{ProgramDefinition, StdioDef};

use crate::common::parser::NomResult;
use crate::{JobId, JobTaskId, Map};
use tako::{JobId, Map};

pub const SERVER_UID_PLACEHOLDER: &str = "SERVER_UID";
pub const TASK_ID_PLACEHOLDER: &str = "TASK_ID";
Expand All @@ -32,8 +32,7 @@ const KNOWN_PLACEHOLDERS: [&str; 6] = [
type PlaceholderMap<'a> = Map<&'static str, Cow<'a, str>>;

pub struct CompletePlaceholderCtx<'a> {
pub job_id: JobId,
pub task_id: JobTaskId,
pub task_id: TaskId,
pub instance_id: InstanceId,
pub submit_dir: &'a Path,
pub server_uid: &'a str,
Expand All @@ -58,8 +57,11 @@ impl<'a> ResolvablePaths<'a> {
/// Fills all known placeholders in the given paths.
pub fn fill_placeholders_in_paths(paths: ResolvablePaths, ctx: CompletePlaceholderCtx) {
let mut placeholders = PlaceholderMap::new();
placeholders.insert(JOB_ID_PLACEHOLDER, ctx.job_id.to_string().into());
placeholders.insert(TASK_ID_PLACEHOLDER, ctx.task_id.to_string().into());
placeholders.insert(JOB_ID_PLACEHOLDER, ctx.task_id.job_id().to_string().into());
placeholders.insert(
TASK_ID_PLACEHOLDER,
ctx.task_id.job_task_id().to_string().into(),
);
placeholders.insert(INSTANCE_ID_PLACEHOLDER, ctx.instance_id.to_string().into());
placeholders.insert(
SUBMIT_DIR_PLACEHOLDER,
Expand Down Expand Up @@ -249,12 +251,12 @@ mod tests {
use std::path::{Path, PathBuf};
use tako::program::{FileOnCloseBehavior, ProgramDefinition, StdioDef};

use crate::Map;
use crate::common::env::{HQ_INSTANCE_ID, HQ_JOB_ID, HQ_SUBMIT_DIR, HQ_TASK_ID};
use crate::common::placeholders::{
CompletePlaceholderCtx, ResolvablePaths, StringPart, fill_placeholders_after_submit,
fill_placeholders_in_paths, parse_resolvable_string,
};
use tako::{Map, TaskId};

#[test]
fn test_parse_empty_string() {
Expand Down Expand Up @@ -443,14 +445,13 @@ mod tests {

fn ctx<'a>(
job_id: u32,
task_id: u32,
job_task_id: u32,
instance_id: u32,
submit_dir: &'a Path,
server_uid: &'a str,
) -> CompletePlaceholderCtx<'a> {
CompletePlaceholderCtx {
job_id: job_id.into(),
task_id: task_id.into(),
task_id: TaskId::new(job_id.into(), job_task_id.into()),
instance_id: instance_id.into(),
submit_dir,
server_uid,
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/dashboard/data/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::dashboard::data::{Time, TimeRange};
use crate::server::autoalloc::{AllocationId, QueueId};
use crate::server::event::Event;
use crate::transfer::messages::AllocationQueueParams;
use crate::{JobId, JobTaskId};
use std::time::{Duration, SystemTime};
use tako::WorkerId;
use tako::{JobId, JobTaskId};

pub struct DashboardData {
/// Tracks worker connection and loss events
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::Map;
use crate::server::autoalloc::{AllocationId, QueueId};
use crate::server::event::Event;
use crate::server::event::payload::EventPayload;
use crate::transfer::messages::AllocationQueueParams;
use std::time::SystemTime;
use tako::Map;

pub struct AllocationQueueInfo {
pub queue_params: AllocationQueueParams,
Expand Down
Loading
Loading