diff --git a/Cargo.toml b/Cargo.toml index f1f8005..35532da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,15 +43,14 @@ version = "0.1.0" edition = "2021" [dependencies] - -bimap = "0.6.3" kube = { version = "0.93.1", features = ["runtime", "derive"] } k8s-openapi = { version = "0.22.0", features = ["latest"] } handlebars = "6.0.0" prost = "0.13.1" serde_variant = "0.1.3" uri="0.4.0" -comfy-table = "7.1.1" +prettytable-rs = "^0.10" +k8s-metrics = "0.16.0" tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] } tokio-retry = {workspace = true} @@ -72,6 +71,7 @@ chrono = {workspace = true} futures = {workspace = true} async-trait = {workspace = true} reqwest = {workspace = true} +ratatui = "0.28.0" [build-dependencies] tonic-build = "0.12.1" diff --git a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs index 768079c..1ea6e1a 100644 --- a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs +++ b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs @@ -111,7 +111,7 @@ async fn main() -> Result<()> { info!("ipc server stopped"); } }; - Ok::<(), anyhow::Error>(()) + Ok::<(), anyhow::Error>(()) }); } { diff --git a/crates/compute_unit_runner/src/media_data_tracker.rs b/crates/compute_unit_runner/src/media_data_tracker.rs index 371180b..553e46e 100644 --- a/crates/compute_unit_runner/src/media_data_tracker.rs +++ b/crates/compute_unit_runner/src/media_data_tracker.rs @@ -168,8 +168,8 @@ where &DataState::PartialSent ]; - if db_repo.count(&node_name, running_state.as_slice(), &Direction::Out).await? == 0 && - db_repo.count(&node_name, &[&DataState::Received,&DataState::Assigned], &Direction::In).await? == 0 { + if db_repo.count(&node_name, running_state.as_slice(), Some(&Direction::Out)).await? == 0 && + db_repo.count(&node_name, &[&DataState::Received,&DataState::Assigned], Some(&Direction::In)).await? == 0 { db_repo.update_node_by_name(&node_name, TrackerState::Finish).await.map_err(|err|anyhow!("update node data {err}"))?; info!("node was finished, not need to run backend"); return anyhow::Ok(()); @@ -333,7 +333,7 @@ where } loop { - if let Err(err) = db_repo.count(&node_name, &[&DataState::Received, &DataState::PartialSent], &Direction::Out).await.and_then(|count|{ + if let Err(err) = db_repo.count(&node_name, &[&DataState::Received, &DataState::PartialSent], Some(&Direction::Out)).await.and_then(|count|{ if count > buf_size { Err(anyhow!("has reach limit current:{count} limit:{buf_size}")) } else { @@ -542,7 +542,7 @@ where &DataState::SelectForSend, &DataState::PartialSent, ]; - match db_repo.count(&node_name, running_state.as_slice(), &Direction::Out).await { + match db_repo.count(&node_name, running_state.as_slice(), Some(&Direction::Out)).await { Ok(count) => { if count ==0 { break; diff --git a/crates/dp_runner/src/channel_tracker.rs b/crates/dp_runner/src/channel_tracker.rs index 329c3d1..5736b9c 100644 --- a/crates/dp_runner/src/channel_tracker.rs +++ b/crates/dp_runner/src/channel_tracker.rs @@ -25,8 +25,8 @@ use std::{ use tokio::{ select, sync::mpsc::{ - self, - }, + self, + }, task::JoinSet, time::{ self, @@ -148,7 +148,7 @@ where &DataState::SelectForSend, &DataState::EndRecieved ]; - if db_repo.count(&node_name, running_state.as_slice(), &Direction::Out).await? == 0 { + if db_repo.count(&node_name, running_state.as_slice(), Some(&Direction::Out)).await? == 0 { db_repo.update_node_by_name(&node_name, TrackerState::Finish).await.map_err(|err|anyhow!("update node data {err}"))?; info!("node was finished, not need to run backend"); return anyhow::Ok(()); @@ -255,7 +255,7 @@ where let id = data_batch.id.clone(); let size = data_batch.size; //check limit - if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], &Direction::In).await.and_then(|count|{ + if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], Some(&Direction::In)).await.and_then(|count|{ if count > buf_size { Err(anyhow!("has reach limit current:{count} limit:{buf_size}")) } else { diff --git a/crates/dp_runner/src/state_controller.rs b/crates/dp_runner/src/state_controller.rs index dad72fb..727db29 100644 --- a/crates/dp_runner/src/state_controller.rs +++ b/crates/dp_runner/src/state_controller.rs @@ -58,11 +58,13 @@ where .get_node_by_name(name) .await{ Ok(record)=> { - debug!("{} fetch state from db", record.node_name); + debug!("{} fetch state {:?} from db", record.node_name, record.state); let mut program_guard = program.write().await; if program_guard.local_state == record.state { + println!("xcxxxxxx {:?}",program_guard.local_state); continue } + println!("xxxxxxxxxxxx"); let old_local_state = program_guard.local_state.clone(); program_guard.local_state = record.state.clone(); info!("update state {:?} -> {:?}", &old_local_state, &record.state); diff --git a/jz-flow b/jz-flow new file mode 100755 index 0000000..70ac4e3 Binary files /dev/null and b/jz-flow differ diff --git a/nodes/dummy_in/src/main.rs b/nodes/dummy_in/src/main.rs index 8f8539c..bf79ae0 100644 --- a/nodes/dummy_in/src/main.rs +++ b/nodes/dummy_in/src/main.rs @@ -96,7 +96,7 @@ async fn dummy_in(token: CancellationToken, args: Args) -> Result<()> { }; loop { - if args.total_count > 0 && count > args.total_count { + if args.total_count > 0 && count >= args.total_count { info!("exit pod because work has done"); if client.status().await.unwrap().state == TrackerState::Finish { return Ok(()); diff --git a/script/example_dag.json b/script/example_dag.json index 040de95..b147f8a 100644 --- a/script/example_dag.json +++ b/script/example_dag.json @@ -8,7 +8,8 @@ "image": "gitdatateam/dummy_in:latest", "command": "/dummy_in", "args": [ - "--log-level=debug" + "--log-level=debug", + "--total-count=100" ] } }, diff --git a/src/api/client/job.rs b/src/api/client/job.rs index aa67d59..fd07927 100644 --- a/src/api/client/job.rs +++ b/src/api/client/job.rs @@ -99,7 +99,7 @@ impl JobClient { .await .anyhow() .and_then(|body| String::from_utf8(body.into()).anyhow())?; - return Err(anyhow!("get job {code} reason {err_msg}")); + return Err(anyhow!("list job {code} reason {err_msg}")); } resp.bytes() @@ -156,7 +156,7 @@ impl JobClient { .await .anyhow() .and_then(|body| String::from_utf8(body.into()).anyhow())?; - return Err(anyhow!("request job {code} reason {err_msg}")); + return Err(anyhow!("request update job {code} reason {err_msg}")); } Ok(()) @@ -183,7 +183,7 @@ impl JobClient { .await .anyhow() .and_then(|body| String::from_utf8(body.into()).anyhow())?; - return Err(anyhow!("request job {code} reason {err_msg}")); + return Err(anyhow!("request job detail {code} reason {err_msg}")); } resp.bytes() @@ -192,4 +192,57 @@ impl JobClient { .and_then(|body| serde_json::from_slice(&body).anyhow()) .anyhow() } + + pub async fn run_job(&self, job_id: &ObjectId) -> Result<()> { + let resp = self + .client + .post( + self.base_uri + .clone() + .join("job/")? + .join("run/")? + .join(job_id.to_hex().as_str())?, + ) + .send() + .await + .anyhow()?; + + if !resp.status().is_success() { + let code = resp.status(); + let err_msg = resp + .bytes() + .await + .anyhow() + .and_then(|body| String::from_utf8(body.into()).anyhow())?; + return Err(anyhow!("request start job {code} reason {err_msg}")); + } + + Ok(()) + } + + pub async fn clean_job(&self, job_id: &ObjectId) -> Result<()> { + let resp = self + .client + .delete( + self.base_uri + .clone() + .join("job/")? + .join(job_id.to_hex().as_str())?, + ) + .send() + .await + .anyhow()?; + + if !resp.status().is_success() { + let code = resp.status(); + let err_msg = resp + .bytes() + .await + .anyhow() + .and_then(|body| String::from_utf8(body.into()).anyhow())?; + return Err(anyhow!("request start job {code} reason {err_msg}")); + } + + Ok(()) + } } diff --git a/src/api/job_api.rs b/src/api/job_api.rs index ff84787..72ec88c 100644 --- a/src/api/job_api.rs +++ b/src/api/job_api.rs @@ -5,6 +5,7 @@ use crate::{ Job, JobDbRepo, JobUpdateInfo, + ListJobParams, MainDbRepo, }, driver::Driver, @@ -42,17 +43,24 @@ async fn list(db_repo: web::Data) -> HttpResponse where MAINR: MainDbRepo, { - match db_repo.list_jobs().await { + let list_job_params = &ListJobParams { state: None }; + match db_repo.list_jobs(list_job_params).await { Ok(jobs) => HttpResponse::Ok().json(&jobs), Err(err) => HttpResponse::InternalServerError().body(err.to_string()), } } -async fn delete(db_repo: web::Data, path: web::Path) -> HttpResponse +async fn clean_job( + job_manager: web::Data>, + path: web::Path, +) -> HttpResponse where + D: Driver, MAINR: MainDbRepo, + JOBR: JobDbRepo, { - match db_repo.delete(&path.into_inner()).await { + let id = ObjectId::from_str(&path.into_inner()).unwrap(); + match job_manager.clean_job(&id).await { Ok(_) => HttpResponse::Ok().finish(), Err(err) => HttpResponse::InternalServerError().body(err.to_string()), } @@ -91,22 +99,38 @@ where } } +async fn run_job( + job_manager: web::Data>, + path: web::Path, +) -> HttpResponse +where + D: Driver, + MAINR: MainDbRepo, + JOBR: JobDbRepo, +{ + let id = ObjectId::from_str(&path.into_inner()).unwrap(); + match job_manager.start_job(&id).await { + Ok(detail) => HttpResponse::Ok().json(detail), + Err(err) => HttpResponse::InternalServerError().body(err.to_string()), + } +} + pub(super) fn job_route_config(cfg: &mut web::ServiceConfig) where D: Driver, MAINR: MainDbRepo, JOBR: JobDbRepo, { - cfg.service( - web::resource("/job") - .route(web::post().to(create::)) - .route(web::delete().to(delete::)), - ) - .service( - web::resource("/job/{id}") - .route(web::get().to(get::)) - .route(web::post().to(update::)), - ) - .service(web::resource("/jobs").route(web::get().to(list::))) - .service(web::resource("/job/detail/{id}").route(web::get().to(job_details::))); + cfg.service(web::resource("/job").route(web::post().to(create::))) + .service( + web::resource("/job/{id}") + .route(web::get().to(get::)) + .route(web::post().to(update::)) + .route(web::delete().to(clean_job::)), + ) + .service(web::resource("/jobs").route(web::get().to(list::))) + .service( + web::resource("/job/detail/{id}").route(web::get().to(job_details::)), + ) + .service(web::resource("/job/run/{id}").route(web::post().to(run_job::))); } diff --git a/src/bin/jz-flow/job.rs b/src/bin/jz-flow/job.rs index f383237..77fdd62 100644 --- a/src/bin/jz-flow/job.rs +++ b/src/bin/jz-flow/job.rs @@ -1,4 +1,12 @@ -use std::str::FromStr; +use std::{ + io::{ + self, + stdout, + }, + iter, + os::linux::raw::stat, + str::FromStr, +}; use crate::global::GlobalOptions; use anyhow::Result; @@ -10,13 +18,17 @@ use clap::{ Args, Parser, }; -use comfy_table::Table; use jz_action::{ api::client::JzFlowClient, core::db::Job, dag::Dag, + utils::sizefmt::SmartSize, }; use mongodb::bson::oid::ObjectId; +use prettytable::{ + Row, + Table, +}; use serde_variant::to_variant_name; use tokio::fs; @@ -24,8 +36,10 @@ use tokio::fs; pub(super) enum JobCommands { /// Adds files to myapp Create(JobCreateArgs), - List, + Run(RunJobArgs), + List(ListJobArgs), Detail(JobDetailArgs), + Clean(CleanJobArgs), } pub(super) async fn run_job_subcommand( @@ -34,8 +48,10 @@ pub(super) async fn run_job_subcommand( ) -> Result<()> { match command { JobCommands::Create(args) => create_job(global_opts, args).await, - JobCommands::List => list_job(global_opts).await, + JobCommands::Run(args) => run_job(global_opts, args).await, + JobCommands::List(args) => list_job(global_opts, args).await, JobCommands::Detail(args) => get_job_details(global_opts, args).await, + JobCommands::Clean(args) => clean_job(global_opts, args).await, } } @@ -67,42 +83,55 @@ pub(super) async fn create_job(global_opts: GlobalOptions, args: JobCreateArgs) Ok(()) } -pub(super) async fn list_job(global_opts: GlobalOptions) -> Result<()> { +#[derive(Debug, Args)] +pub(super) struct ListJobArgs { + #[arg(long, default_value = "table", help = "format json/table")] + pub(super) format: String, +} + +pub(super) async fn list_job(global_opts: GlobalOptions, args: ListJobArgs) -> Result<()> { let client = JzFlowClient::new(&global_opts.listen)?.job(); let jobs = client.list().await?; + if args.format == "json" { + println!("{}", serde_json::to_string_pretty(&jobs)?); + return Ok(()); + } + let mut table = Table::new(); - table.set_header(vec![ + + // Add a row per time + table.add_row(Row::from(vec![ "ID", "Name", "State", "TryNumber", "CreatedAt", "UpdatedAt", - ]); + ])); jobs.iter().for_each(|job| { - table.add_row(vec![ - job.id.to_string(), - job.name.to_string(), - to_variant_name(&job.state).unwrap().to_string(), - job.retry_number.to_string(), - DateTime::from_timestamp(job.created_at, 0) - .unwrap() - .to_string(), - DateTime::from_timestamp(job.updated_at, 0) - .unwrap() - .to_string(), - ]); + table.add_row(Row::from(vec![ + cell!(job.id), + cell!(job.name), + cell!(to_variant_name(&job.state).unwrap()), + cell!(job.retry_number), + cell!(DateTime::from_timestamp(job.created_at, 0).unwrap()), + cell!(DateTime::from_timestamp(job.updated_at, 0).unwrap()), + ])); }); - println!("{table}"); + + table.printstd(); Ok(()) } #[derive(Debug, Args)] pub(super) struct JobDetailArgs { - #[arg(long, help = "job name, must be unique")] + #[arg(index = 1, help = "job name, must be unique")] pub(super) id: String, + + #[arg(long, default_value = "table", help = "format json/table")] + pub(super) format: String, } pub(super) async fn get_job_details(global_opts: GlobalOptions, args: JobDetailArgs) -> Result<()> { @@ -110,6 +139,97 @@ pub(super) async fn get_job_details(global_opts: GlobalOptions, args: JobDetailA let id: ObjectId = ObjectId::from_str(&args.id)?; let job_detail = client.get_job_detail(&id).await?; - println!("{}", serde_json::to_string_pretty(&job_detail)?); + if args.format == "json" { + println!("{}", serde_json::to_string_pretty(&job_detail)?); + return Ok(()); + } + + let mut table = Table::new(); + table.add_row(Row::from(vec![ + "ID", + "Name", + "State", + "TryNumber", + "CreatedAt", + "UpdatedAt", + ])); + + table.add_row(Row::from(vec![ + cell!(job_detail.job.id), + cell!(job_detail.job.name), + cell!(to_variant_name(&job_detail.job.state).unwrap()), + cell!(job_detail.job.retry_number), + cell!(DateTime::from_timestamp(job_detail.job.created_at, 0).unwrap()), + cell!(DateTime::from_timestamp(job_detail.job.updated_at, 0).unwrap()), + ])); + table.printstd(); + + if job_detail.node_status.is_none() { + return Ok(()); + } + + println!("Nodes:"); + + let mut table = Table::new(); + table.add_row(Row::from(vec![ + "NodeName", + "State", + "DataCount", + "Replicas", + "TmpStorage", + "Pods", + ])); + for status in job_detail.node_status.unwrap() { + let mut pod_table = Table::new(); + pod_table.add_row(Row::from(vec!["Name", "State", "CPU", "Memory"])); + for pod in status.pods { + pod_table.add_row(Row::from(vec![ + cell!(pod.0), + cell!(pod.1.state), + cell!(pod.1.cpu_usage), + cell!(pod.1.memory_usage.to_smart_string()), + ])); + } + + table.add_row(Row::from(vec![ + cell!(status.name), + cell!(to_variant_name(&status.state)?), + cell!(status.data_count), + cell!(status.replicas), + cell!(status.storage), + cell!(pod_table), + ])); + } + table.printstd(); + Ok(()) +} + +#[derive(Debug, Args)] +pub(super) struct RunJobArgs { + #[arg(index = 1, help = "job name, must be unique")] + pub(super) id: String, +} + +pub(super) async fn run_job(global_opts: GlobalOptions, args: RunJobArgs) -> Result<()> { + let client = JzFlowClient::new(&global_opts.listen)?.job(); + let id: ObjectId = ObjectId::from_str(&args.id)?; + client.run_job(&id).await?; + + println!("Run job successfully, job ID: {}", args.id); + Ok(()) +} + +#[derive(Debug, Args)] +pub(super) struct CleanJobArgs { + #[arg(index = 1, help = "job name, must be unique")] + pub(super) id: String, +} + +pub(super) async fn clean_job(global_opts: GlobalOptions, args: CleanJobArgs) -> Result<()> { + let client = JzFlowClient::new(&global_opts.listen)?.job(); + let id: ObjectId = ObjectId::from_str(&args.id)?; + client.clean_job(&id).await?; + + println!("Clean job successfully, job ID: {}", args.id); Ok(()) } diff --git a/src/bin/jz-flow/main.rs b/src/bin/jz-flow/main.rs index 882ebd5..968926b 100644 --- a/src/bin/jz-flow/main.rs +++ b/src/bin/jz-flow/main.rs @@ -1,3 +1,7 @@ +#![feature(iter_repeat_n)] +#[macro_use] +extern crate prettytable; + mod global; mod job; mod run; diff --git a/src/core/job_db_models.rs b/src/core/job_db_models.rs index 4f38b37..615ad4b 100644 --- a/src/core/job_db_models.rs +++ b/src/core/job_db_models.rs @@ -134,7 +134,7 @@ pub trait DataRepo { &self, node_name: &str, states: &[&DataState], - direction: &Direction, + direction: Option<&Direction>, ) -> impl std::future::Future> + Send; fn insert_new_path( diff --git a/src/core/main_db_models.rs b/src/core/main_db_models.rs index 0d3d52c..bf81a8f 100644 --- a/src/core/main_db_models.rs +++ b/src/core/main_db_models.rs @@ -5,13 +5,14 @@ use serde::{ Serialize, }; -#[derive(Serialize, Deserialize, Debug, Default)] +#[derive(Serialize, Deserialize, Debug, Default, PartialEq)] pub enum JobState { #[default] Created, Running, Error, Finish, + Clean, } #[derive(Default, Serialize, Deserialize, Debug)] @@ -31,6 +32,10 @@ pub struct JobUpdateInfo { pub state: Option, } +#[derive(Serialize, Deserialize, Debug)] +pub struct ListJobParams { + pub state: Option, +} pub trait JobRepo { fn insert(&self, job: &Job) -> impl std::future::Future> + Send; @@ -46,7 +51,10 @@ pub trait JobRepo { info: &JobUpdateInfo, ) -> impl std::future::Future> + Send; - fn list_jobs(&self) -> impl std::future::Future>> + Send; + fn list_jobs( + &self, + list_job_params: &ListJobParams, + ) -> impl std::future::Future>> + Send; } pub trait MainDbRepo = JobRepo + Clone + Send + Sync + 'static; diff --git a/src/dag/mod.rs b/src/dag/mod.rs index fb00bea..0630580 100644 --- a/src/dag/mod.rs +++ b/src/dag/mod.rs @@ -1,6 +1,5 @@ mod graph; -use graph::Graph; use crate::{ core::ComputeUnit, utils::IntoAnyhowResult, @@ -10,6 +9,7 @@ use anyhow::{ Ok, Result, }; +use graph::Graph; use std::collections::HashMap; pub struct Dag { @@ -70,6 +70,9 @@ impl Dag { pub fn get_outgoing_nodes(&self, node_id: &str) -> Vec<&str> { self.rely_graph.get_outgoing_nodes(node_id) } + pub fn topo_sort_nodes(&self) -> Vec { + self.rely_graph.topo_sort() + } // from_json build graph from json string pub fn from_json(json: &str) -> Result { diff --git a/src/dbrepo/job_db_mongo.rs b/src/dbrepo/job_db_mongo.rs index a2c8940..c0bca61 100644 --- a/src/dbrepo/job_db_mongo.rs +++ b/src/dbrepo/job_db_mongo.rs @@ -1,3 +1,5 @@ +use std::os::linux::raw::stat; + use crate::{ core::db::{ DataRecord, @@ -254,15 +256,19 @@ impl DataRepo for MongoRunDbRepo { &self, node_name: &str, states: &[&DataState], - direction: &Direction, + direction: Option<&Direction>, ) -> Result { - let states: Vec<&str> = states.iter().map(to_variant_name).try_collect()?; + let mut query = doc! {"node_name":node_name}; + if states.len() > 0 { + let states: Vec<&str> = states.iter().map(to_variant_name).try_collect()?; + query.insert("state", doc! {"$in": states}); + } + if let Some(direction) = direction { + query.insert("direction", to_variant_name(&direction)?); + } + self.data_col - .count_documents(doc! { - "node_name":node_name, - "state": doc! {"$in": states}, - "direction": to_variant_name(&direction)? - }) + .count_documents(query) .await .map(|count| count as usize) .anyhow() diff --git a/src/dbrepo/main_db_mongo.rs b/src/dbrepo/main_db_mongo.rs index 8155eda..5821bd4 100644 --- a/src/dbrepo/main_db_mongo.rs +++ b/src/dbrepo/main_db_mongo.rs @@ -4,6 +4,7 @@ use crate::{ JobRepo, JobState, JobUpdateInfo, + ListJobParams, }, utils::{ IntoAnyhowResult, @@ -148,13 +149,13 @@ impl JobRepo for MongoMainDbRepo { .anyhow() } - async fn list_jobs(&self) -> Result> { - self.job_col - .find(doc! {}) - .await? - .try_collect() - .await - .anyhow() + async fn list_jobs(&self, list_job_params: &ListJobParams) -> Result> { + let mut query = doc! {}; + if let Some(state) = list_job_params.state.as_ref() { + query.insert("state", to_variant_name(state)?); + } + + self.job_col.find(query).await?.try_collect().await.anyhow() } async fn get(&self, id: &ObjectId) -> Result> { diff --git a/src/driver/kube.rs b/src/driver/kube.rs index 2ad27b0..cc9c6e8 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -30,6 +30,7 @@ use anyhow::{ Result, }; use chrono::prelude::*; +use futures::future::try_join_all; use handlebars::{ Context, Handlebars, @@ -38,14 +39,21 @@ use handlebars::{ RenderContext, RenderError, }; -use k8s_openapi::api::{ - apps::v1::StatefulSet, - core::v1::{ - Namespace, - PersistentVolumeClaim, - Pod, - Service, +use k8s_metrics::{ + v1beta1 as metricsv1, + QuantityExt, +}; +use k8s_openapi::{ + api::{ + apps::v1::StatefulSet, + core::v1::{ + Namespace, + PersistentVolumeClaim, + Pod, + Service, + }, }, + apimachinery::pkg::api::resource::Quantity, }; use kube::{ api::{ @@ -67,7 +75,10 @@ use tokio_retry::{ strategy::ExponentialBackoff, Retry, }; -use tracing::debug; +use tracing::{ + debug, + error, +}; pub struct KubeChannelHander where @@ -79,7 +90,7 @@ where pub(crate) stateset_name: String, pub(crate) claim_name: String, pub(crate) _service_name: String, - pub(crate) _db_repo: R, + pub(crate) db_repo: R, } impl ChannelHandler for KubeChannelHander @@ -96,6 +107,8 @@ where let claim_api: Api = Api::namespaced(self.client.clone(), &self.namespace); let pods_api: Api = Api::namespaced(self.client.clone(), &self.namespace); + let metrics_api: Api = + Api::::namespaced(self.client.clone(), &self.namespace); let statefulset = statefulset_api.get(&self.stateset_name).await.anyhow()?; let selector = statefulset @@ -124,8 +137,15 @@ where .map(|cap| cap.0.clone()) .unwrap_or_default(); + let db_node = self.db_repo.get_node_by_name(&self.node_name).await?; + let data_count = self + .db_repo + .count(&self.node_name, &Vec::new(), None) + .await?; let mut node_status = NodeStatus { name: self.node_name.clone(), + state: db_node.state, + data_count: data_count, replicas: statefulset .spec .as_ref() @@ -142,16 +162,43 @@ where .and_then(|status| status.phase) .unwrap_or_default(); + let metrics = metrics_api.get(&pod_name).await.anyhow()?; + let mut cpu_sum = 0.0; + let mut memory_sum = 0; + for container in metrics.containers.iter() { + cpu_sum += container + .usage + .cpu() + .map_err(|err| { + error!("cpu not exit {err}"); + err + }) + .unwrap_or(0.0); + memory_sum += container + .usage + .memory() + .map_err(|err| { + error!("cpu not exit {err}"); + err + }) + .unwrap_or(0); + } let pod_status = PodStauts { state: phase, - disk_usage: 0, - cpu_usage: 0, - memory_usage: 0, + cpu_usage: cpu_sum, + memory_usage: memory_sum, }; node_status.pods.insert(pod_name, pod_status); } Ok(node_status) } + + async fn start(&self) -> Result<()> { + self.db_repo + .update_node_by_name(&self.node_name, TrackerState::Ready) + .await + } + async fn pause(&mut self) -> Result<()> { todo!() } @@ -175,7 +222,7 @@ where pub(crate) stateset_name: String, pub(crate) claim_name: String, pub(crate) _service_name: String, - pub(crate) _db_repo: R, + pub(crate) db_repo: R, pub(crate) channel: Option>, } @@ -195,6 +242,8 @@ where let claim_api: Api = Api::namespaced(self.client.clone(), &self.namespace); let pods_api: Api = Api::namespaced(self.client.clone(), &self.namespace); + let metrics_api: Api = + Api::::namespaced(self.client.clone(), &self.namespace); let statefulset = statefulset_api.get(&self.stateset_name).await.anyhow()?; let selector = statefulset @@ -223,8 +272,15 @@ where .map(|cap| cap.0.clone()) .unwrap_or_default(); + let db_node = self.db_repo.get_node_by_name(&self.node_name).await?; + let data_count = self + .db_repo + .count(&self.node_name, &Vec::new(), None) + .await?; let mut node_status = NodeStatus { name: self.node_name.clone(), + state: db_node.state, + data_count: data_count, replicas: statefulset .spec .as_ref() @@ -241,17 +297,47 @@ where .and_then(|status| status.phase) .unwrap_or_default(); + let metrics = metrics_api.get(&pod_name).await.anyhow()?; + let mut cpu_sum = 0.0; + let mut memory_sum = 0; + for container in metrics.containers.iter() { + cpu_sum += container + .usage + .cpu() + .map_err(|err| { + error!("cpu not exit {err}"); + err + }) + .unwrap_or(0.0); + memory_sum += container + .usage + .memory() + .map_err(|err| { + error!("cpu not exit {err}"); + err + }) + .unwrap_or(0); + } let pod_status = PodStauts { state: phase, - disk_usage: 0, - cpu_usage: 0, - memory_usage: 0, + cpu_usage: cpu_sum, + memory_usage: memory_sum, }; node_status.pods.insert(pod_name, pod_status); } Ok(node_status) } + async fn start(&self) -> Result<()> { + self.db_repo + .update_node_by_name(&self.node_name, TrackerState::Ready) + .await?; + if let Some(channel) = self.channel.as_ref() { + channel.start().await?; + } + Ok(()) + } + async fn pause(&mut self) -> Result<()> { todo!() } @@ -276,6 +362,7 @@ where { _client: Client, _db_repo: R, + topo_sort_nodes: Vec, handlers: HashMap>, } @@ -283,9 +370,10 @@ impl KubePipelineController where R: JobDbRepo, { - fn new(repo: R, client: Client) -> Self { + fn new(repo: R, client: Client,topo_sort_nodes: Vec) -> Self { Self { _db_repo: repo, + topo_sort_nodes:topo_sort_nodes, _client: client, handlers: Default::default(), } @@ -298,9 +386,14 @@ where { type Output = KubeHandler; + async fn start(&self) -> Result<()> { + try_join_all(self.handlers.iter().map(|handler| handler.1.start())) + .await + .map(|_| ()) + } //todo use iter - fn nodes(&self) -> Result> { - anyhow::Ok(self.handlers.keys().map(|a| a.to_string()).collect()) + fn nodes_in_order(&self) -> Result> { + Ok(self.topo_sort_nodes.clone()) } async fn get_node(&self, id: &str) -> Result<&KubeHandler> { @@ -456,8 +549,8 @@ where updated_at: cur_tm, }; repo.insert_global_state(&graph_record).await?; - - let mut pipeline_ctl = KubePipelineController::new(repo.clone(), self.client.clone()); + let topo_sort_nodes = graph.topo_sort_nodes(); + let mut pipeline_ctl = KubePipelineController::new(repo.clone(), self.client.clone(),topo_sort_nodes); for node in graph.iter() { if node.spec.command.is_empty() { return Err(anyhow!("{} dont have command", &node.name)); @@ -563,7 +656,7 @@ where .name() .expect("set name in template") .to_string(), - _db_repo: repo.clone(), + db_repo: repo.clone(), }), vec![channel_node_name], ) @@ -651,7 +744,7 @@ where .expect("set name in template") .to_string(), channel: channel_handler, - _db_repo: repo.clone(), + db_repo: repo.clone(), }; pipeline_ctl.handlers.insert(node.name.clone(), handler); @@ -673,7 +766,8 @@ where let claim_api: Api = Api::namespaced(self.client.clone(), run_id); let service_api: Api = Api::namespaced(self.client.clone(), run_id); - let mut pipeline_ctl = KubePipelineController::new(repo.clone(), self.client.clone()); + let topo_sort_nodes = graph.topo_sort_nodes(); + let mut pipeline_ctl = KubePipelineController::new(repo.clone(), self.client.clone(),topo_sort_nodes); for node in graph.iter() { let up_nodes = graph.get_incomming_nodes(&node.name); // query channel @@ -704,7 +798,7 @@ where .name() .expect("set name in template") .to_string(), - _db_repo: repo.clone(), + db_repo: repo.clone(), }) } else { None @@ -739,7 +833,7 @@ where .expect("set name in template") .to_string(), channel: channel_handler, - _db_repo: repo.clone(), + db_repo: repo.clone(), }; pipeline_ctl.handlers.insert(node.name.clone(), handler); diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 0b36277..99e6ccd 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -1,6 +1,9 @@ pub mod kube; -use crate::dag::Dag; +use crate::{ + core::db::TrackerState, + dag::Dag, +}; use anyhow::Result; use serde::{ Deserialize, @@ -14,14 +17,15 @@ use std::{ #[derive(Debug, Default, Serialize, Deserialize)] pub struct PodStauts { pub state: String, - pub disk_usage: u32, - pub cpu_usage: u32, - pub memory_usage: u32, + pub cpu_usage: f64, + pub memory_usage: i64, } -#[derive(Debug, Default, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct NodeStatus { pub name: String, + pub state: TrackerState, + pub data_count: usize, pub replicas: u32, pub storage: String, pub pods: HashMap, @@ -31,6 +35,8 @@ pub trait UnitHandler: Send { type Output: ChannelHandler; fn name(&self) -> &str; + + fn start(&self) -> impl std::future::Future> + Send; //pause graph running for now fn status(&self) -> impl Future> + Send; @@ -44,13 +50,13 @@ pub trait UnitHandler: Send { fn stop(&mut self) -> impl Future> + Send; //return a channel handler - fn channel_handler( - &self, - ) -> impl Future>> + Send; + fn channel_handler(&self) -> impl Future>> + Send; } pub trait ChannelHandler: Send { fn name(&self) -> &str; + + fn start(&self) -> impl std::future::Future> + Send; //pause graph running for now fn status(&self) -> impl Future> + Send; @@ -67,7 +73,9 @@ pub trait ChannelHandler: Send { pub trait PipelineController: Send { type Output: UnitHandler; - fn nodes(&self) -> Result>; + fn start(&self) -> impl std::future::Future> + Send; + + fn nodes_in_order(&self) -> Result>; fn get_node(&self, id: &str) -> impl std::future::Future> + Send; diff --git a/src/job/job_mgr.rs b/src/job/job_mgr.rs index 53529dc..52f0ed0 100644 --- a/src/job/job_mgr.rs +++ b/src/job/job_mgr.rs @@ -4,6 +4,7 @@ use crate::{ JobDbRepo, JobState, JobUpdateInfo, + ListJobParams, MainDbRepo, }, dag::Dag, @@ -20,6 +21,7 @@ use crate::{ }; use anyhow::Result; use futures::future::try_join_all; +use k8s_openapi::api::node; use kube::Client; use mongodb::bson::oid::ObjectId; use serde::{ @@ -40,7 +42,7 @@ use tracing::{ #[derive(Serialize, Deserialize)] pub struct JobDetails { pub job: Job, - pub node_status: HashMap, + pub node_status: Option>, } #[derive(Clone)] @@ -90,26 +92,47 @@ where return Ok(()); } - while let Some(job) = db.get_job_for_running().await? { - let dag = Dag::from_json(job.graph_json.as_str())?; - let namespace = format!("{}-{}", job.name, job.retry_number); - if let Err(err) = driver.deploy(namespace.as_str(), &dag).await { - error!("run job {} {err}, start cleaning", job.name); - if let Err(err) = driver.clean(namespace.as_str()).await { - error!("clean job resource {err}"); - } - if let Err(err) = db - .update( - &job.id, - &JobUpdateInfo { - state: Some(JobState::Error), - }, - ) - .await - { - error!("set job to error state {err}"); - } + if let Err(err) = { + while let Some(job) = db.get_job_for_running().await? { + let dag = Dag::from_json(job.graph_json.as_str())?; + let namespace = format!("{}-{}", job.name, job.retry_number); + if let Err(err) = driver.deploy(namespace.as_str(), &dag).await { + error!("run job {} {err}, start cleaning", job.name); + if let Err(err) = driver.clean(namespace.as_str()).await { + error!("clean job resource {err}"); + } + if let Err(err) = db + .update( + &job.id, + &JobUpdateInfo { + state: Some(JobState::Error), + }, + ) + .await + { + error!("set job to error state {err}"); + } + }; + } + + let finish_jobs_params = &ListJobParams { + state: Some(JobState::Finish), }; + + for job in db.list_jobs(finish_jobs_params).await? { + let namespace = format!("{}-{}", job.name, job.retry_number - 1); + driver.clean(&namespace).await?; + db.update( + &job.id, + &JobUpdateInfo { + state: Some(JobState::Clean), + }, + ) + .await?; + } + anyhow::Ok(()) + } { + error!("error in job backend {err}"); } } }); @@ -119,18 +142,56 @@ where pub async fn get_job_details(&self, id: &ObjectId) -> Result { let job = self.db.get(id).await?.anyhow("job not found")?; + + let node_status = if job.state == JobState::Running { + let dag = Dag::from_json(job.graph_json.as_str())?; + let namespace = format!("{}-{}", job.name, job.retry_number - 1); + let controller = self.driver.attach(&namespace, &dag).await?; + let nodes = controller.nodes_in_order().anyhow()?; + let nodes_controller = + try_join_all(nodes.iter().map(|node_name| controller.get_node(node_name))).await?; + + let mut node_status = vec![]; + for node_ctl in nodes_controller { + let status = node_ctl.status().await?; + node_status.push(status); + } + Some(node_status) + } else { + None + }; + + Ok(JobDetails { job, node_status }) + } + + pub async fn start_job(&self, id: &ObjectId) -> Result<()> { + let job = self.db.get(id).await?.anyhow("job not found")?; let dag = Dag::from_json(job.graph_json.as_str())?; let namespace = format!("{}-{}", job.name, job.retry_number - 1); let controller = self.driver.attach(&namespace, &dag).await?; - let nodes = controller.nodes().anyhow()?; - let nodes_controller = - try_join_all(nodes.iter().map(|node_name| controller.get_node(node_name))).await?; + controller.start().await + } - let mut node_status = HashMap::new(); - for node_ctl in nodes_controller { - let status = node_ctl.status().await?; - node_status.insert(node_ctl.name().to_string(), status); - } - Ok(JobDetails { job, node_status }) + pub async fn clean_job(&self, id: &ObjectId) -> Result<()> { + let job = self.db.get(id).await?.anyhow("job not found")?; + let namespace = format!("{}-{}", job.name, job.retry_number - 1); + self.db + .update( + &id, + &JobUpdateInfo { + state: Some(JobState::Finish), + }, + ) + .await?; + self.driver.clean(&namespace).await?; + self.db + .update( + &id, + &JobUpdateInfo { + state: Some(JobState::Clean), + }, + ) + .await?; + Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index fe91c10..6dd8978 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ #![feature(future_join)] #![feature(iterator_try_collect)] #![feature(duration_constructors)] +#![feature(iter_repeat_n)] pub mod api; pub mod core; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 15a8429..ddd31c4 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,3 +1,5 @@ +pub mod sizefmt; + use core::fmt; use anyhow::{ diff --git a/src/utils/sizefmt.rs b/src/utils/sizefmt.rs new file mode 100644 index 0000000..f9dd117 --- /dev/null +++ b/src/utils/sizefmt.rs @@ -0,0 +1,43 @@ +pub const KILOBYTE: i64 = 1 << 10; +pub const MEGABYTE: i64 = 1 << 20; +pub const GIGABYTE: i64 = 1 << 30; + +pub trait SmartSize { + fn to_smart_string(self) -> String; +} + +impl SmartSize for i64 { + fn to_smart_string(self) -> String { + let divisor = match self { + size if size < MEGABYTE => KILOBYTE, + size if size < GIGABYTE => MEGABYTE, + _ => GIGABYTE, + }; + let name = match self { + size if size < MEGABYTE => "KB", + size if size < GIGABYTE => "MB", + _ => "GB", + }; + + let size = (self as f32) / (divisor as f32); + format!("{:.2} {}", size, name) + } +} + +impl SmartSize for u64 { + fn to_smart_string(self) -> String { + (self as i64).to_smart_string() + } +} + +impl SmartSize for i32 { + fn to_smart_string(self) -> String { + (self as i64).to_smart_string() + } +} + +impl SmartSize for u32 { + fn to_smart_string(self) -> String { + (self as i64).to_smart_string() + } +}