Skip to content

Commit

Permalink
Merge pull request #25 from gevulotnetwork/add-workflow
Browse files Browse the repository at this point in the history
Add workflow support
  • Loading branch information
trusch authored Dec 12, 2024
2 parents 8d94dcc + 0b3c05e commit c883841
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 8 deletions.
3 changes: 3 additions & 0 deletions src/gevulot_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::pin_client::PinClient;
use crate::sudo_client::SudoClient;
use crate::task_client::TaskClient;
use crate::worker_client::WorkerClient;
use crate::workflow_client::WorkflowClient;
use std::sync::Arc;
use tokio::sync::RwLock;

Expand All @@ -17,6 +18,7 @@ use tokio::sync::RwLock;
pub struct GevulotClient {
pub pins: PinClient,
pub tasks: TaskClient,
pub workflows: WorkflowClient,
pub workers: WorkerClient,
pub gov: GovClient,
pub sudo: SudoClient,
Expand Down Expand Up @@ -101,6 +103,7 @@ impl GevulotClientBuilder {
Ok(GevulotClient {
pins: PinClient::new(base_client.clone()),
tasks: TaskClient::new(base_client.clone()),
workflows: WorkflowClient::new(base_client.clone()),
workers: WorkerClient::new(base_client.clone()),
gov: GovClient::new(base_client.clone()),
sudo: SudoClient::new(base_client.clone()),
Expand Down
127 changes: 119 additions & 8 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,10 @@ impl From<gevulot::TaskSpec> for TaskSpec {
})
.collect(),
resources: TaskResources {
cpus: proto.cpus as i64,
gpus: proto.gpus as i64,
memory: proto.memory as i64,
time: proto.time as i64,
cpus: proto.cpus,
gpus: proto.gpus,
memory: proto.memory,
time: proto.time,
},
store_stdout: Some(proto.store_stdout),
store_stderr: Some(proto.store_stderr),
Expand Down Expand Up @@ -337,10 +337,10 @@ pub struct OutputContext {

#[derive(Serialize, Deserialize, Debug)]
pub struct TaskResources {
pub cpus: i64,
pub gpus: i64,
pub memory: i64,
pub time: i64,
pub cpus: u64,
pub gpus: u64,
pub memory: u64,
pub time: u64,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -398,6 +398,117 @@ impl From<gevulot::TaskStatus> for TaskStatus {
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Workflow {
pub kind: String,
pub version: String,
pub metadata: Metadata,
pub spec: WorkflowSpec,
pub status: Option<WorkflowStatus>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct WorkflowStage {
pub tasks: Vec<TaskSpec>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct WorkflowSpec {
pub stages: Vec<WorkflowStage>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct WorkflowStageStatus {
pub task_ids: Vec<String>,
pub finished_tasks: u64,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct WorkflowStatus {
pub state: String,
pub current_stage: u64,
pub stages: Vec<WorkflowStageStatus>,
}

impl From<gevulot::Workflow> for Workflow {
fn from(proto: gevulot::Workflow) -> Self {
Workflow {
kind: "Workflow".to_string(),
version: "v0".to_string(),
metadata: Metadata {
id: proto.metadata.as_ref().map(|m| m.id.clone()),
name: proto
.metadata
.as_ref()
.map(|m| m.name.clone())
.unwrap_or_default(),
creator: proto.metadata.as_ref().map(|m| m.creator.clone()),
description: proto
.metadata
.as_ref()
.map(|m| m.desc.clone())
.unwrap_or_default(),
tags: proto
.metadata
.as_ref()
.map(|m| m.tags.clone())
.unwrap_or_default(),
labels: proto
.metadata
.as_ref()
.map(|m| m.labels.clone())
.unwrap_or_default()
.into_iter()
.map(|l| Label {
key: l.key,
value: l.value,
})
.collect(),
workflow_ref: None,
},
spec: proto.spec.map(|s| s.into()).unwrap(),
status: proto.status.map(|s| s.into()),
}
}
}

impl From<gevulot::WorkflowSpec> for WorkflowSpec {
fn from(proto: gevulot::WorkflowSpec) -> Self {
WorkflowSpec {
stages: proto
.stages
.into_iter()
.map(|stage| WorkflowStage {
tasks: stage.tasks.into_iter().map(|t| t.into()).collect(),
})
.collect(),
}
}
}

impl From<gevulot::WorkflowStatus> for WorkflowStatus {
fn from(proto: gevulot::WorkflowStatus) -> Self {
WorkflowStatus {
state: match proto.state {
0 => "Pending".to_string(),
1 => "Running".to_string(),
2 => "Done".to_string(),
3 => "Failed".to_string(),
_ => "Unknown".to_string(),
},
current_stage: proto.current_stage,
stages: proto
.stages
.into_iter()
.map(|s| WorkflowStageStatus {
task_ids: s.task_ids,
finished_tasks: s.finished_tasks,
})
.collect(),
}
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Metadata {
pub id: Option<String>,
Expand Down
1 change: 1 addition & 0 deletions src/workflow_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
};

/// Client for managing workflows in the Gevulot system.
#[derive(Debug, Clone)]
pub struct WorkflowClient {
base_client: Arc<RwLock<BaseClient>>,
}
Expand Down

0 comments on commit c883841

Please sign in to comment.