Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement mount points for task execution. #323

Merged
merged 6 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions wdl-engine/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

* Added calculation for mounting input files for future backends that use
containers ([#323](https://github.com/stjude-rust-labs/wdl/pull/323)).
* Added retry logic for task execution ([#320](https://github.com/stjude-rust-labs/wdl/pull/320)).
* Added a `Config` type for specifying evaluation configuration ([#320](https://github.com/stjude-rust-labs/wdl/pull/320)).
* Added progress callback to `WorkflowEvaluator` ([#310](https://github.com/stjude-rust-labs/wdl/pull/310)).
Expand Down
87 changes: 43 additions & 44 deletions wdl-engine/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use indexmap::IndexMap;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;

use crate::MountPoints;
use crate::Value;

pub mod local;
Expand Down Expand Up @@ -72,6 +73,8 @@ pub struct TaskExecutionConstraints {
/// ```
#[derive(Debug)]
pub struct TaskExecutionRoot {
/// The root directory for task execution.
root_dir: PathBuf,
/// The path to the directory for files created by the stdlib before and
/// after command evaluation.
temp_dir: PathBuf,
Expand All @@ -80,7 +83,7 @@ pub struct TaskExecutionRoot {
///
/// This needs to be a different location from `temp_dir` because commands
/// are re-evaluated on retry.
command_temp_dir: PathBuf,
attempt_temp_dir: PathBuf,
/// The path to the working directory for the execution.
work_dir: PathBuf,
/// The path to the command file.
Expand All @@ -94,43 +97,49 @@ pub struct TaskExecutionRoot {
impl TaskExecutionRoot {
/// Creates a task execution root for the given path and execution attempt.
pub fn new(path: &Path, attempt: u64) -> Result<Self> {
let path = absolute(path).with_context(|| {
let root_dir = absolute(path).with_context(|| {
format!(
"failed to determine absolute path of `{path}`",
path = path.display()
)
})?;

let mut attempts = path.join("attempts");
let mut attempts = root_dir.join("attempts");
attempts.push(attempt.to_string());

// Create both temp directories now as it may be needed for task evaluation
let temp_dir = path.join("tmp");
let temp_dir = root_dir.join("tmp");
fs::create_dir_all(&temp_dir).with_context(|| {
format!(
"failed to create directory `{path}`",
path = temp_dir.display()
)
})?;

let command_temp_dir = attempts.join("tmp");
fs::create_dir_all(&command_temp_dir).with_context(|| {
let attempt_temp_dir = attempts.join("tmp");
fs::create_dir_all(&attempt_temp_dir).with_context(|| {
format!(
"failed to create directory `{path}`",
path = command_temp_dir.display()
path = attempt_temp_dir.display()
)
})?;

Ok(Self {
root_dir,
temp_dir,
command_temp_dir,
attempt_temp_dir,
work_dir: attempts.join("work"),
command: attempts.join("command"),
stdout: attempts.join("stdout"),
stderr: attempts.join("stderr"),
})
}

/// Gets the path to the root itself.
pub fn path(&self) -> &Path {
&self.root_dir
}

/// Gets the temporary directory path for task evaluation before and after
/// command evaluation.
///
Expand All @@ -148,7 +157,7 @@ impl TaskExecutionRoot {
/// The temporary directory is created before spawning the task so that it
/// is available for task evaluation.
pub fn attempt_temp_dir(&self) -> &Path {
&self.command_temp_dir
&self.attempt_temp_dir
}

/// Gets the working directory for task execution.
Expand Down Expand Up @@ -187,15 +196,13 @@ pub struct TaskSpawnRequest {
/// The command of the task.
command: String,
/// The requirements of the task.
requirements: HashMap<String, Value>,
requirements: Arc<HashMap<String, Value>>,
/// The hints of the task.
hints: HashMap<String, Value>,
hints: Arc<HashMap<String, Value>>,
/// The environment variables of the task.
env: HashMap<String, String>,
/// The mapping between host paths and guest paths.
///
/// This is only populated for backends that have a container root.
mapping: HashMap<String, String>,
env: Arc<HashMap<String, String>>,
/// The mount points to use for the spawn request.
mounts: Arc<MountPoints>,
/// The channel to send a message on when the task is spawned.
///
/// This value will be `None` once the task is spawned.
Expand All @@ -210,10 +217,10 @@ impl TaskSpawnRequest {
pub fn new(
root: Arc<TaskExecutionRoot>,
command: String,
requirements: HashMap<String, Value>,
hints: HashMap<String, Value>,
env: HashMap<String, String>,
mapping: HashMap<String, String>,
requirements: Arc<HashMap<String, Value>>,
hints: Arc<HashMap<String, Value>>,
env: Arc<HashMap<String, String>>,
mounts: Arc<MountPoints>,
) -> (Self, oneshot::Receiver<()>) {
let (tx, rx) = oneshot::channel();

Expand All @@ -224,7 +231,7 @@ impl TaskSpawnRequest {
requirements,
hints,
env,
mapping,
mounts,
spawned: Some(tx),
},
rx,
Expand Down Expand Up @@ -256,29 +263,12 @@ impl TaskSpawnRequest {
&self.env
}

/// Gets the mapping between host paths and guest paths.
///
/// This is only populated for backends that have a container root.
pub fn mapping(&self) -> &HashMap<String, String> {
&self.mapping
/// Gets the mount points for the task.
pub fn mounts(&self) -> &Arc<MountPoints> {
&self.mounts
}
}

/// Represents the response from spawning a task.
#[derive(Debug)]
pub struct TaskSpawnResponse {
/// The requirements the task was spawned with.
pub requirements: HashMap<String, Value>,
/// The hints the task was spawned with.
pub hints: HashMap<String, Value>,
/// The environment the task was spawned with.
pub env: HashMap<String, String>,
/// The status code of the task's execution.
///
/// This may be `Err` if the task failed to spawn.
pub status_code: Result<i32>,
}

/// Represents a task execution backend.
pub trait TaskExecutionBackend: Send + Sync {
/// Gets the maximum concurrent tasks supported by the backend.
Expand All @@ -294,13 +284,22 @@ pub trait TaskExecutionBackend: Send + Sync {
hints: &HashMap<String, Value>,
) -> Result<TaskExecutionConstraints>;

/// Gets the container root directory for the backend (e.g. `/mnt/task`)
/// Gets the container (guest) root directory for the backend (e.g.
/// `/mnt/task`).
///
/// Containerized execution uses the following guest paths:
///
/// * `<root>/inputs/` - where inputs are mounted; this is mounted
/// read-only.
/// * `<root>/work/` - the task working directory; this is mounted
/// read-write.
/// * `<root>/command` - the command to execute; this is mounted read-only.
///
/// Returns `None` if the task execution does not use a container.
fn container_root(&self) -> Option<&Path>;
fn container_root_dir(&self) -> Option<&Path>;

/// Spawns a task with the execution backend.
///
/// Upon success, returns a receiver for receiving the response.
fn spawn(&self, request: TaskSpawnRequest) -> Result<Receiver<TaskSpawnResponse>>;
fn spawn(&self, request: TaskSpawnRequest) -> Result<Receiver<Result<i32>>>;
}
59 changes: 23 additions & 36 deletions wdl-engine/src/backend/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use wdl_ast::v1::TASK_REQUIREMENT_MEMORY;
use super::TaskExecutionBackend;
use super::TaskExecutionConstraints;
use super::TaskSpawnRequest;
use super::TaskSpawnResponse;
use crate::Coercible;
use crate::SYSTEM;
use crate::Value;
Expand Down Expand Up @@ -86,14 +85,14 @@ struct LocalTaskSpawnRequest {
/// Note that memory isn't actually reserved for the task process.
memory: u64,
/// The sender to send the response back on.
tx: oneshot::Sender<TaskSpawnResponse>,
tx: oneshot::Sender<Result<i32>>,
}

/// Represents a local task spawn response.
#[derive(Debug)]
struct LocalTaskSpawnResponse {
/// The inner task spawn response.
inner: TaskSpawnResponse,
/// The result of execution.
result: Result<i32>,
/// The requested CPU reservation for the task.
///
/// Note that CPU isn't actually reserved for the task process.
Expand All @@ -103,7 +102,7 @@ struct LocalTaskSpawnResponse {
/// Note that memory isn't actually reserved for the task process.
memory: u64,
/// The sender to send the response back on.
tx: oneshot::Sender<TaskSpawnResponse>,
tx: oneshot::Sender<Result<i32>>,
}

/// Represents state for the local task execution backend.
Expand Down Expand Up @@ -220,7 +219,7 @@ impl LocalTaskExecutionBackend {
Some(Ok(response)) = state.spawned.join_next() => {
state.cpu += response.cpu;
state.memory += response.memory;
response.tx.send(response.inner).ok();
response.tx.send(response.result).ok();

// Look for tasks to unpark
while let Some(pos) = state.parked.iter().position(|r| r.cpu <= state.cpu && r.memory <= state.memory) {
Expand Down Expand Up @@ -251,16 +250,11 @@ impl LocalTaskExecutionBackend {
if request.cpu > total_cpu {
request
.tx
.send(TaskSpawnResponse {
requirements: request.inner.requirements,
hints: request.inner.hints,
env: request.inner.env,
status_code: Err(anyhow!(
"requested task CPU count of {cpu} exceeds the total host CPU count of \
{total_cpu}",
cpu = request.cpu
)),
})
.send(Err(anyhow!(
"requested task CPU count of {cpu} exceeds the total host CPU count of \
{total_cpu}",
cpu = request.cpu
)))
.ok();
return;
}
Expand All @@ -269,17 +263,12 @@ impl LocalTaskExecutionBackend {
if request.memory > total_memory {
request
.tx
.send(TaskSpawnResponse {
requirements: request.inner.requirements,
hints: request.inner.hints,
env: request.inner.env,
status_code: Err(anyhow!(
"requested task memory of {memory} byte{s} exceeds the total host memory \
of {total_memory}",
memory = request.memory,
s = if request.memory == 1 { "" } else { "s" }
)),
})
.send(Err(anyhow!(
"requested task memory of {memory} byte{s} exceeds the total host memory of \
{total_memory}",
memory = request.memory,
s = if request.memory == 1 { "" } else { "s" }
)))
.ok();
return;
}
Expand Down Expand Up @@ -314,14 +303,8 @@ impl LocalTaskExecutionBackend {
let spawned = request.inner.spawned.take().unwrap();
spawned.send(()).ok();

let status_code = Self::spawn_task(&request.inner).await;
LocalTaskSpawnResponse {
inner: TaskSpawnResponse {
requirements: request.inner.requirements,
hints: request.inner.hints,
env: request.inner.env,
status_code,
},
result: Self::spawn_task(&request.inner).await,
cpu: request.cpu,
memory: request.memory,
tx: request.tx,
Expand Down Expand Up @@ -460,12 +443,16 @@ impl TaskExecutionBackend for LocalTaskExecutionBackend {
})
}

fn container_root(&self) -> Option<&Path> {
fn container_root_dir(&self) -> Option<&Path> {
// Local execution does not use a container
None
}

fn spawn(&self, request: TaskSpawnRequest) -> Result<oneshot::Receiver<TaskSpawnResponse>> {
fn spawn(&self, request: TaskSpawnRequest) -> Result<oneshot::Receiver<Result<i32>>> {
if !request.mounts.is_empty() {
bail!("cannot spawn a local task with mount points");
}

let (tx, rx) = oneshot::channel();
let cpu = cpu(&request.requirements);
let memory = memory(&request.requirements)? as u64;
Expand Down
Loading