Skip to content

Commit

Permalink
Implement planner for atomic tasks
Browse files Browse the repository at this point in the history
First version of the planner

Change-type: minor
  • Loading branch information
pipex committed Jan 27, 2025
1 parent 3a72642 commit 3dab999
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 34 deletions.
49 changes: 49 additions & 0 deletions src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ impl<T> Dag<T> {
Dag::default()
}

/// Check if the DAG is empty
///
/// # Example
/// ```rust
/// use gustav::Dag;
///
/// let dag: Dag<i32> = Dag::default();
/// assert!(dag.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.tail.is_none()
}

pub fn with_head(self, value: T) -> Dag<T> {
let head = Node::item(value, self.head).into_link();
let mut tail = self.tail;
Expand Down Expand Up @@ -214,6 +227,42 @@ impl<T> Dag<T> {
true
}

/// Traverse the DAG sequentially, applying a fold function at each node.
///
/// # Arguments
/// - `initial`: Initial accumulator value.
/// - `fold_fn`: Function to process each node and update the accumulator.
///
/// # Example
/// ```rust
/// use gustav::Dag;
/// use gustav::{dag,seq};
///
/// let dag: Dag<char> = dag!(seq!('h', 'e', 'l', 'l', 'o'), seq!(' '), seq!('m', 'y'))
/// + seq!(' ')
/// + dag!(
/// seq!('o', 'l', 'd'),
/// seq!(' '),
/// seq!('f', 'r', 'i', 'e', 'n', 'd')
/// );
///
/// let msg = dag.fold(
/// String::new(),
/// |acc, c| acc + c.to_string().as_ref(),
/// );
///
/// assert_eq!(msg, "hello my old friend")
/// ```
pub fn fold<U>(&self, initial: U, fold_fn: impl Fn(U, &T) -> U) -> U {
let mut acc = initial;
for node in self.iter() {
if let Node::Item { value, .. } = &*node.borrow() {
acc = fold_fn(acc, value);
}
}
acc
}

/// Create a linear DAG from a list of elements.
///
/// # Arguments
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum Error {
#[error("condition failed: ${0}")]
TaskConditionFailed(#[from] super::task::ConditionFailed),

#[error("plan could not be found: ${0}")]
#[error("plan search failed: ${0}")]
PlanSearchFailed(#[from] super::worker::PlanSearchError),

#[error(transparent)]
Expand Down
24 changes: 16 additions & 8 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use json_patch::{Patch, PatchOperation};
use std::fmt::{self, Display};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;

use crate::error::{Error, IntoError};
use crate::system::System;
Expand Down Expand Up @@ -62,12 +63,13 @@ where
}

type ActionOutput = Pin<Box<dyn Future<Output = Result<Patch>>>>;
type DryRun = Box<dyn Fn(&System, &Context) -> Result<Patch>>;
type Run = Box<dyn Fn(&System, &Context) -> ActionOutput>;
type Expand = Box<dyn Fn(&System, &Context) -> core::result::Result<Vec<Task>, Error>>;
type DryRun = Rc<dyn Fn(&System, &Context) -> Result<Patch>>;
type Run = Rc<dyn Fn(&System, &Context) -> ActionOutput>;
type Expand = Rc<dyn Fn(&System, &Context) -> core::result::Result<Vec<Task>, Error>>;

/// A task is either a concrete unit (atom) of work or a list of tasks
/// that can be run in sequence or in parallel
#[derive(Clone)]
pub enum Task {
Atom {
id: &'static str,
Expand All @@ -88,15 +90,15 @@ impl Task {
H: Handler<T, Patch, I>,
I: 'static,
{
let hc = handler.clone();
let handler_clone = handler.clone();
Self::Atom {
id,
context,
dry_run: Box::new(move |system: &System, context: &Context| {
let effect = hc.call(system, context);
dry_run: Rc::new(move |system: &System, context: &Context| {
let effect = handler_clone.call(system, context);
effect.pure()
}),
run: Box::new(move |system: &System, context: &Context| {
run: Rc::new(move |system: &System, context: &Context| {
let effect = handler.call(system, context);

Box::pin(async {
Expand All @@ -116,7 +118,7 @@ impl Task {
Self::List {
id,
context,
expand: Box::new(move |system: &System, context: &Context| {
expand: Rc::new(move |system: &System, context: &Context| {
// List tasks cannot perform changes to the system
// so the Effect returned by this handler is assumed to
// be pure
Expand Down Expand Up @@ -200,6 +202,12 @@ impl Task {
}
}

impl Display for Task {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}({})", self.id(), self.context().path)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions src/worker/distance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use super::intent;

#[derive(Debug)]
pub struct Distance(BTreeSet<Operation>);

impl Distance {
Expand Down
159 changes: 144 additions & 15 deletions src/worker/planner.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use json_patch::Patch;
use serde::Serialize;
use serde_json::Value;
use std::fmt::{self, Display};

use crate::error::{Error, IntoError};
use crate::path::Path;
use crate::task::{Context, Task};
use crate::system::System;
use crate::task::{ConditionFailed, Context, Task};

use super::distance::Distance;
use super::domain::Domain;
use super::workflow::Workflow;
use super::workflow::{Action, Workflow};
use super::Operation;

pub struct Planner {
Expand All @@ -22,7 +23,9 @@ pub enum PlanSearchError {
path: String,
reason: jsonptr::resolve::ResolveError,
},
PathNotFound,
LoopDetected,
ConditionFailed(crate::task::ConditionFailed),
WorkflowNotFound,
}

impl std::error::Error for PlanSearchError {}
Expand All @@ -31,10 +34,16 @@ impl Display for PlanSearchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PlanSearchError::SerializationError(err) => err.fmt(f),
PlanSearchError::PathNotFound => write!(f, "not found"),
PlanSearchError::LoopDetected => {
write!(f, "loop detected")
}
PlanSearchError::ConditionFailed(err) => write!(f, "condition failed: {}", err),
PlanSearchError::CannotResolveTarget { path, reason } => {
write!(f, "cannot resolve target path `{}`: {}", path, reason)
}
PlanSearchError::WorkflowNotFound => {
write!(f, "no plan found")
}
}
}
}
Expand All @@ -50,11 +59,45 @@ impl Planner {
Self { domain }
}

fn try_task(&self, task: Task, state: &Value, initial_plan: Workflow) {
fn try_task(
&self,
task: Task,
system: &System,
plan: Workflow,
) -> Result<Workflow, PlanSearchError> {
match task {
Task::Atom { .. } => {
let action_id = Action::new_id(&task, system.root()).map_err(|e| {
PlanSearchError::CannotResolveTarget {
path: task.context().path.to_string(),
reason: e,
}
})?;

// Detect loops in the plan
// if (initial_plan.0.some())
if plan.as_dag().some(|a| a.id == action_id) {
return Err(PlanSearchError::LoopDetected);
}

// Dry run the task
let Patch(changes) = task.dry_run(system).map_err(|err| match err {
Error::TaskConditionFailed(e) => PlanSearchError::ConditionFailed(e),
// TODO: we will probably want to log other
// errors in this case
_ => PlanSearchError::WorkflowNotFound,
})?;

// If no changes are returned, then assume the condition has failed
// TODO: make an exception if we are inside a compound task
if changes.is_empty() {
return Err(PlanSearchError::ConditionFailed(ConditionFailed::default()));
}

let Workflow { dag, pending } = plan;
let dag = dag.with_head(Action::new(action_id, task));
let pending = [pending, changes].concat();

Ok(Workflow { dag, pending })
}
Task::List { .. } => unimplemented!(),
}
Expand All @@ -67,11 +110,13 @@ impl Planner {
let cur = serde_json::to_value(cur).map_err(PlanSearchError::SerializationError)?;
let tgt = serde_json::to_value(tgt).map_err(PlanSearchError::SerializationError)?;

let system = System::new(cur);

// Store the initial state and an empty plan on the stack
let mut stack = vec![(cur, Workflow::default())];
let mut stack = vec![(system, Workflow::default())];

while let Some((cur, plan)) = stack.pop() {
let distance = Distance::new(&cur, &tgt);
while let Some((system, plan)) = stack.pop() {
let distance = Distance::new(&system, &tgt);

// we reached the target
if distance.is_empty() {
Expand Down Expand Up @@ -106,16 +151,100 @@ impl Planner {

// apply the task to the state, if it progresses the plan, then select
// it and put the new state with the new plan on the stack

// try to apply the job
// if successful, put the original state on the stack, followed by the
// new state after applying
if let Ok(Workflow { dag, pending }) =
self.try_task(task, &system, plan.clone())
{
// If there are no changes introduced by the task, then it doesn't
// contribute to the plan
if pending.is_empty() {
continue;
}

// If we got here, the task is part of a new potential workflow
// so we to make a copy of the system
let mut new_sys = system.clone();

// Update the state and the workflow
new_sys.patch(Patch(pending))?;
let new_plan = Workflow {
dag,
pending: vec![],
};

// add the new initial state and plan to the stack
stack.push((new_sys, new_plan));
}
}
}
}
}
}

Err(PlanSearchError::PathNotFound)?
Err(PlanSearchError::WorkflowNotFound)?
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::*;
use crate::extract::{Target, Update};
use crate::worker::{update, Domain};
use crate::{seq, Dag};

fn plus_one(mut counter: Update<i32>, Target(tgt): Target<i32>) -> Update<i32> {
if *counter < tgt {
*counter += 1;
}

counter
}

#[test]
fn it_calculates_a_simple_linear_workflow() {
let domain = Domain::new().job("", update(plus_one));

let planner = Planner::new(domain);
let workflow = planner.find_plan(0, 2).unwrap();

// We expect a linear DAG with two tasks
let expected: Dag<&str> = seq!(
"gustav::worker::planner::tests::plus_one()",
"gustav::worker::planner::tests::plus_one()"
);

assert_eq!(workflow.to_string(), expected.to_string(),);
}

#[test]
fn it_calculates_a_linear_simple_workflow_on_a_complex_state() {
#[derive(Serialize)]
struct MyState {
counters: HashMap<String, i32>,
}

let initial = MyState {
counters: HashMap::from([("one".to_string(), 0), ("two".to_string(), 0)]),
};

let target = MyState {
counters: HashMap::from([("one".to_string(), 2), ("two".to_string(), 2)]),
};

let domain = Domain::new().job("/counters/{counter}", update(plus_one));

let planner = Planner::new(domain);
let workflow = planner.find_plan(initial, target).unwrap();

// We expect a linear DAG with two tasks
let expected: Dag<&str> = seq!(
"gustav::worker::planner::tests::plus_one(/counters/one)",
"gustav::worker::planner::tests::plus_one(/counters/one)",
"gustav::worker::planner::tests::plus_one(/counters/two)",
"gustav::worker::planner::tests::plus_one(/counters/two)"
);

assert_eq!(workflow.to_string(), expected.to_string(),);
}
}
Loading

0 comments on commit 3dab999

Please sign in to comment.