Skip to content

Commit

Permalink
Implement planner for atomic tasks
Browse files Browse the repository at this point in the history
First version of the planner

Change-type: minor
  • Loading branch information
pipex committed Jan 28, 2025
1 parent 3a72642 commit 95412f3
Show file tree
Hide file tree
Showing 10 changed files with 348 additions and 62 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ description = "An automated job orchestration library to build and execute dynam
[dependencies]
json-patch = "3"
jsonptr = "0.6.0"
log = "0.4.25"
matchit = "0.8.4"
serde = { version = "1.0.197" }
serde_json = "1.0.120"
thiserror = "2"

[dev-dependencies]
dedent = "0.1.1"
env_logger = "0.11.6"
tokio = { version = "1.36.0", features = ["rt", "macros", "time"] }
49 changes: 49 additions & 0 deletions src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ impl<T> Dag<T> {
Dag::default()
}

/// Check if the DAG is empty
///
/// # Example
/// ```rust
/// use gustav::Dag;
///
/// let dag: Dag<i32> = Dag::default();
/// assert!(dag.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.tail.is_none()
}

pub fn with_head(self, value: T) -> Dag<T> {
let head = Node::item(value, self.head).into_link();
let mut tail = self.tail;
Expand Down Expand Up @@ -214,6 +227,42 @@ impl<T> Dag<T> {
true
}

/// Traverse the DAG sequentially, applying a fold function at each node.
///
/// # Arguments
/// - `initial`: Initial accumulator value.
/// - `fold_fn`: Function to process each node and update the accumulator.
///
/// # Example
/// ```rust
/// use gustav::Dag;
/// use gustav::{dag,seq};
///
/// let dag: Dag<char> = dag!(seq!('h', 'e', 'l', 'l', 'o'), seq!(' '), seq!('m', 'y'))
/// + seq!(' ')
/// + dag!(
/// seq!('o', 'l', 'd'),
/// seq!(' '),
/// seq!('f', 'r', 'i', 'e', 'n', 'd')
/// );
///
/// let msg = dag.fold(
/// String::new(),
/// |acc, c| acc + c.to_string().as_ref(),
/// );
///
/// assert_eq!(msg, "hello my old friend")
/// ```
pub fn fold<U>(&self, initial: U, fold_fn: impl Fn(U, &T) -> U) -> U {
let mut acc = initial;
for node in self.iter() {
if let Node::Item { value, .. } = &*node.borrow() {
acc = fold_fn(acc, value);
}
}
acc
}

/// Create a linear DAG from a list of elements.
///
/// # Arguments
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum Error {
#[error("condition failed: ${0}")]
TaskConditionFailed(#[from] super::task::ConditionFailed),

#[error("plan could not be found: ${0}")]
#[error("plan search failed: ${0}")]
PlanSearchFailed(#[from] super::worker::PlanSearchError),

#[error(transparent)]
Expand Down
38 changes: 19 additions & 19 deletions src/task/job.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use json_patch::Patch;
use std::cmp::Ordering;
use std::fmt;

use super::boxed::*;
use super::context::Context;
Expand All @@ -11,8 +12,8 @@ use super::{Handler, Task};
/// executed and cannot be expanded
/// - List jobs define work in terms of other tasks, they are expanded recursively
/// in order to get to a list of atoms
#[derive(Clone, PartialEq, PartialOrd, Eq, Ord)]
enum Degree {
#[derive(Clone, PartialEq, PartialOrd, Eq, Ord, Debug)]
pub(crate) enum Degree {
List,
Atom,
}
Expand All @@ -22,34 +23,33 @@ enum Degree {
///
/// Jobs are re-usable
pub struct Job {
id: &'static str,
degree: Degree,
pub(crate) id: &'static str,
pub(crate) degree: Degree,
builder: BoxedIntoTask,
}

impl PartialEq for Job {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
impl fmt::Debug for Job {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
#[derive(Debug)]
#[allow(dead_code)]
struct Job<'a> {
id: &'a str,
degree: &'a Degree,
}

let Self { id, degree, .. } = self;
fmt::Debug::fmt(&Job { id, degree }, f)
}
}

impl PartialOrd for Job {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
impl PartialEq for Job {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}

impl Eq for Job {}

impl Ord for Job {
fn cmp(&self, other: &Self) -> Ordering {
// We order jobs by degree. When searching for applicable
// jobs, we want to give List jobs priority over atomic jobs
// as these can be used to direct the search
self.degree.cmp(&other.degree)
}
}

impl Job {
pub(crate) fn from_action<A, T, I>(action: A) -> Self
where
Expand Down
24 changes: 16 additions & 8 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use json_patch::{Patch, PatchOperation};
use std::fmt::{self, Display};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;

use crate::error::{Error, IntoError};
use crate::system::System;
Expand Down Expand Up @@ -62,12 +63,13 @@ where
}

type ActionOutput = Pin<Box<dyn Future<Output = Result<Patch>>>>;
type DryRun = Box<dyn Fn(&System, &Context) -> Result<Patch>>;
type Run = Box<dyn Fn(&System, &Context) -> ActionOutput>;
type Expand = Box<dyn Fn(&System, &Context) -> core::result::Result<Vec<Task>, Error>>;
type DryRun = Rc<dyn Fn(&System, &Context) -> Result<Patch>>;
type Run = Rc<dyn Fn(&System, &Context) -> ActionOutput>;
type Expand = Rc<dyn Fn(&System, &Context) -> core::result::Result<Vec<Task>, Error>>;

/// A task is either a concrete unit (atom) of work or a list of tasks
/// that can be run in sequence or in parallel
#[derive(Clone)]
pub enum Task {
Atom {
id: &'static str,
Expand All @@ -88,15 +90,15 @@ impl Task {
H: Handler<T, Patch, I>,
I: 'static,
{
let hc = handler.clone();
let handler_clone = handler.clone();
Self::Atom {
id,
context,
dry_run: Box::new(move |system: &System, context: &Context| {
let effect = hc.call(system, context);
dry_run: Rc::new(move |system: &System, context: &Context| {
let effect = handler_clone.call(system, context);
effect.pure()
}),
run: Box::new(move |system: &System, context: &Context| {
run: Rc::new(move |system: &System, context: &Context| {
let effect = handler.call(system, context);

Box::pin(async {
Expand All @@ -116,7 +118,7 @@ impl Task {
Self::List {
id,
context,
expand: Box::new(move |system: &System, context: &Context| {
expand: Rc::new(move |system: &System, context: &Context| {
// List tasks cannot perform changes to the system
// so the Effect returned by this handler is assumed to
// be pure
Expand Down Expand Up @@ -200,6 +202,12 @@ impl Task {
}
}

impl Display for Task {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}({})", self.id(), self.context().path)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/worker/distance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use super::intent;

#[derive(Debug)]
pub struct Distance(BTreeSet<Operation>);

impl Distance {
Expand Down
18 changes: 11 additions & 7 deletions src/worker/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::{BTreeSet, HashMap};
use super::intent::{Intent, Operation};
use crate::path::PathArgs;

#[derive(Default)]
#[derive(Default, Debug)]
pub struct Domain {
// The router stores a list of intents matching a route
router: Router<BTreeSet<Intent>>,
Expand Down Expand Up @@ -50,18 +50,22 @@ impl Domain {
}

// Insert the route to the queue
queue.insert(intent);
let updated = queue.insert(intent);

// (re)insert the queue to the router, we should not have
// conflicts here
router.insert(route, queue).expect("route should be valid");

// Only allow one assignment of a job to a route
if let Some(oldroute) = index.insert(job_id.clone().into_boxed_str(), String::from(route)) {
panic!(
"cannot assign job '{}' to route '{}', a previous assignment exists to '{}'",
job_id, route, oldroute
)
if updated {
if let Some(oldroute) =
index.insert(job_id.clone().into_boxed_str(), String::from(route))
{
panic!(
"cannot assign job '{}' to route '{}', a previous assignment exists to '{}'",
job_id, route, oldroute
)
}
}

Self { router, index }
Expand Down
9 changes: 7 additions & 2 deletions src/worker/intent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub enum Operation {
None,
}

#[derive(Debug)]
pub struct Intent {
pub(crate) operation: Operation,
pub(crate) job: Job,
Expand Down Expand Up @@ -68,7 +69,9 @@ define_intent!(none, Operation::None);

impl PartialEq for Intent {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
self.job == other.job
&& self.operation == other.operation
&& self.priority == other.priority
}
}
impl Eq for Intent {}
Expand All @@ -82,8 +85,10 @@ impl PartialOrd for Intent {
impl Ord for Intent {
fn cmp(&self, other: &Self) -> Ordering {
self.job
.cmp(&other.job)
.degree
.cmp(&other.job.degree)
.then(self.operation.cmp(&other.operation))
.then(self.priority.cmp(&other.priority))
.then(self.job.id.cmp(other.job.id))
}
}
Loading

0 comments on commit 95412f3

Please sign in to comment.