diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 9fd45e0..ac59751 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -16,7 +16,12 @@ jobs: steps: - uses: actions/checkout@v4 + + - name: Start minikube + uses: medyagh/setup-minikube@latest + - name: Build run: cargo build --verbose + - name: Run tests run: cargo test --verbose diff --git a/Cargo.toml b/Cargo.toml index 3f5d107..af06f13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] members = [ "runner/dp_runner", - "runner/compute_data_runner", + "runner/compute_unit_runner", ] [workspace.package] diff --git a/makefile b/makefile index 68c75cc..873f6d5 100644 --- a/makefile +++ b/makefile @@ -4,8 +4,8 @@ $(OUTPUT): mkdir -p $(OUTPUT) build-cd: $(OUTPUT) - cargo build -p compute_data_runner --release - cp target/release/compute_data_runner $(OUTPUT)/compute_data_runner + cargo build -p compute_unit_runner --release + cp target/release/compute_unit_runner $(OUTPUT)/compute_unit_runner build-dp: $(OUTPUT) cargo build -p dp_runner --release @@ -15,7 +15,7 @@ build: build-cd build-dp cargo build --release docker_cd: build-cd - docker build -f ./script/cd.dockerfile -t jz-action/compute_data_runner:latest . + docker build -f ./script/cd.dockerfile -t jz-action/compute_unit_runner:latest . docker_dp: build-dp docker build -f ./script/dp.dockerfile -t jz-action/dp_runner:latest . @@ -27,7 +27,7 @@ minikube-env: @eval $(minikube -p minikube docker-env) minikube-docker: minikube-env docker - docker push jz-action/compute_data_runner:latest + docker push jz-action/compute_unit_runner:latest docker push jz-action/dp_runner:latest clean: diff --git a/runner/compute_data_runner/Cargo.toml b/runner/compute_unit_runner/Cargo.toml similarity index 94% rename from runner/compute_data_runner/Cargo.toml rename to runner/compute_unit_runner/Cargo.toml index 74b4149..60239ae 100644 --- a/runner/compute_data_runner/Cargo.toml +++ b/runner/compute_unit_runner/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "compute_data_runner" +name = "compute_unit_runner" version = "0.1.0" edition = "2021" diff --git a/runner/compute_data_runner/src/ipc.rs b/runner/compute_unit_runner/src/ipc.rs similarity index 87% rename from runner/compute_data_runner/src/ipc.rs rename to runner/compute_unit_runner/src/ipc.rs index 8bd6455..d8ede23 100644 --- a/runner/compute_data_runner/src/ipc.rs +++ b/runner/compute_unit_runner/src/ipc.rs @@ -1,8 +1,6 @@ -use anyhow::Result; - +use crate::media_data_tracker::MediaDataTracker; use actix_web::{web, App, HttpResponse, HttpServer}; - -use crate::program::BatchProgram; +use anyhow::Result; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::oneshot; @@ -18,7 +16,9 @@ pub(crate) struct SubmitResultReq { pub(crate) id: String, } -async fn process_data_request(program_mutex: web::Data>>) -> HttpResponse { +async fn process_data_request( + program_mutex: web::Data>>, +) -> HttpResponse { let (tx, mut rx) = oneshot::channel::(); let program = program_mutex.lock().await; program @@ -36,7 +36,7 @@ async fn process_data_request(program_mutex: web::Data>> } async fn process_submit_result_request( - program_mutex: web::Data>>, + program_mutex: web::Data>>, data: web::Json, ) -> HttpResponse { let (tx, mut rx) = oneshot::channel::<()>(); @@ -60,7 +60,7 @@ async fn process_submit_result_request( pub(crate) fn start_ipc_server( unix_socket_addr: String, - program: Arc>, + program: Arc>, ) -> Result<()> { HttpServer::new(move || { App::new() diff --git a/runner/compute_data_runner/src/main.rs b/runner/compute_unit_runner/src/main.rs similarity index 84% rename from runner/compute_data_runner/src/main.rs rename to runner/compute_unit_runner/src/main.rs index 4043853..fbab7a5 100644 --- a/runner/compute_data_runner/src/main.rs +++ b/runner/compute_unit_runner/src/main.rs @@ -1,7 +1,7 @@ #![feature(future_join)] mod ipc; -mod program; +mod media_data_tracker; mod unit; use jz_action::network::datatransfer::data_stream_server::DataStreamServer; @@ -10,7 +10,7 @@ use jz_action::utils::StdIntoAnyhowResult; use anyhow::{anyhow, Result}; use clap::Parser; -use program::BatchProgram; +use media_data_tracker::MediaDataTracker; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -25,7 +25,7 @@ use unit::UnitDataStream; #[derive(Debug, Parser)] #[command( - name = "compute_runner", + name = "compute_unit_runner", version = "0.0.1", author = "Author Name ", about = "embed in k8s images" @@ -37,7 +37,7 @@ struct Args { #[arg(short, long, default_value = "/app/tmp")] tmp_path: String, - #[arg(short, long, default_value = "/unix_socket/compute_data_runner_d")] + #[arg(short, long, default_value = "/unix_socket/compute_unit_runner_d")] unix_socket_addr: String, #[arg(long, default_value = "[::1]:25431")] @@ -53,10 +53,9 @@ async fn main() -> Result<()> { .anyhow()?; let addr = args.host_port.parse()?; - let program = BatchProgram::new(PathBuf::from_str(args.tmp_path.as_str())?); + let program = MediaDataTracker::new(PathBuf::from_str(args.tmp_path.as_str())?); let program_safe = Arc::new(Mutex::new(program)); - let node_controller = DataNodeControllerServer { program: program_safe.clone(), }; @@ -77,7 +76,9 @@ async fn main() -> Result<()> { .await .anyhow() { - let _ = shutdown_tx_arc.send(Err(anyhow!("start data controller {e}"))).await; + let _ = shutdown_tx_arc + .send(Err(anyhow!("start data controller {e}"))) + .await; } }); @@ -90,8 +91,10 @@ async fn main() -> Result<()> { let program = program_safe.clone(); let shutdown_tx_arc = shutdown_tx.clone(); let _ = tokio::spawn(async move { - if let Err(e) = ipc::start_ipc_server(unix_socket_addr, program) { - let _ = shutdown_tx_arc.send(Err(anyhow!("start unix socket server {e}"))).await; + if let Err(e) = ipc::start_ipc_server(unix_socket_addr, program) { + let _ = shutdown_tx_arc + .send(Err(anyhow!("start unix socket server {e}"))) + .await; } }); } diff --git a/runner/compute_data_runner/src/program.rs b/runner/compute_unit_runner/src/media_data_tracker.rs similarity index 88% rename from runner/compute_data_runner/src/program.rs rename to runner/compute_unit_runner/src/media_data_tracker.rs index 5f7a81b..a571417 100644 --- a/runner/compute_data_runner/src/program.rs +++ b/runner/compute_unit_runner/src/media_data_tracker.rs @@ -1,24 +1,22 @@ use crate::ipc::{DataResponse, SubmitResultReq}; -use anyhow::{anyhow, Result}; +use anyhow::Result; use jz_action::network::common::Empty; use jz_action::network::datatransfer::data_stream_client::DataStreamClient; -use jz_action::network::datatransfer::{MediaDataBatchResponse, MediaDataCell, TabularDataBatchResponse}; +use jz_action::network::datatransfer::{MediaDataBatchResponse, MediaDataCell}; use jz_action::network::nodecontroller::NodeType; use std::collections::HashMap; use std::fs; use std::path::PathBuf; -use std::process::Command; -use std::sync::mpsc::Receiver; use tokio::select; use tokio::sync::mpsc; use tokio::sync::{broadcast, oneshot}; -use tokio_stream::{Stream, StreamExt}; +use tokio_stream::StreamExt; use tracing::{error, info}; use uuid::Uuid; use walkdir::WalkDir; #[derive(Debug)] -pub(crate) enum ProgramState { +pub(crate) enum TrackerState { Init, Ready, Pending, @@ -27,7 +25,7 @@ pub(crate) enum ProgramState { } #[derive(Debug, PartialEq)] -pub(crate) enum StateEnum { +pub(crate) enum DataStateEnum { Received, Assigned, Processed, @@ -36,13 +34,13 @@ pub(crate) enum StateEnum { #[derive(Debug)] pub(crate) struct BatchState { - pub(crate) state: StateEnum, + pub(crate) state: DataStateEnum, } -pub(crate) struct BatchProgram { +pub(crate) struct MediaDataTracker { pub(crate) tmp_store: PathBuf, - pub(crate) state: ProgramState, + pub(crate) state: TrackerState, pub(crate) node_type: NodeType, @@ -56,14 +54,14 @@ pub(crate) struct BatchProgram { pub(crate) out_going_tx: broadcast::Sender, //receive data from upstream and send it to program with this } -impl BatchProgram { +impl MediaDataTracker { pub(crate) fn new(tmp_store: PathBuf) -> Self { let out_going_tx = broadcast::Sender::new(128); - BatchProgram { + MediaDataTracker { tmp_store, node_type: NodeType::Input, - state: ProgramState::Init, + state: TrackerState::Init, upstreams: None, script: None, ipc_process_submit_result_tx: None, @@ -74,13 +72,13 @@ impl BatchProgram { pub(crate) async fn process_data_cmd(&mut self) -> Result<()> { match self.node_type { - NodeType::Input=>self.track_input_data().await, - NodeType::InputOutput=>self.track_input_output_data().await, - NodeType::Output=>self.track_output_data().await, + NodeType::Input => self.track_input_data().await, + NodeType::InputOutput => self.track_input_output_data().await, + NodeType::Output => self.track_output_data().await, } } - pub(crate) async fn track_input_data(&mut self) ->Result<()> { + pub(crate) async fn track_input_data(&mut self) -> Result<()> { let (ipc_process_submit_result_tx, mut ipc_process_submit_result_rx) = mpsc::channel(1024); self.ipc_process_submit_result_tx = Some(ipc_process_submit_result_tx); @@ -95,7 +93,7 @@ impl BatchProgram { select! { Some((req, resp)) = ipc_process_submit_result_rx.recv() => { state_map.insert(req.id.clone(), BatchState{ - state: StateEnum::Processed, + state: DataStateEnum::Processed, }); // respose with nothing @@ -139,7 +137,7 @@ impl BatchProgram { let entry = state_map.get_mut(&req.id) .expect("this value has been inserted before"); - entry.state = StateEnum::Sent; + entry.state = DataStateEnum::Sent; } let _ = state_map.remove(&req.id); }, @@ -149,8 +147,11 @@ impl BatchProgram { Ok(()) } - pub(crate) async fn track_input_output_data(&mut self) ->Result<()> { - let upstreams = self.upstreams.as_ref().expect("input output node must have incoming nodes"); + pub(crate) async fn track_input_output_data(&mut self) -> Result<()> { + let upstreams = self + .upstreams + .as_ref() + .expect("input output node must have incoming nodes"); let (incoming_data_tx, mut incoming_data_rx) = mpsc::channel(1024); @@ -168,18 +169,17 @@ impl BatchProgram { //todo handle network disconnect let mut client = DataStreamClient::connect(upstream.clone()).await?; let mut stream = client.subscribe_media_data(Empty {}).await?.into_inner(); - + while let Some(item) = stream.next().await { tx_clone.send(item.unwrap()).await.unwrap(); } - + error!("unable read data from stream"); anyhow::Ok(()) }); } info!("listen data from upstream {}", upstream); } - //TODO this make a async process to be sync process. got a low performance, //if have any bottleneck here, we should refrator this one @@ -193,17 +193,17 @@ impl BatchProgram { if let Some(data_batch) = data_batch_result { //create input directory let id = Uuid::new_v4().to_string(); - let tmp_in_path = tmp_store.join(id.clone()+"-input"); + let tmp_in_path = tmp_store.join(id.clone()+"-input"); if let Err(e) = fs::create_dir_all(&tmp_in_path) { error!("create input dir {:?} fail {}", tmp_in_path, e); - return + return } //create output directory at the same time - let tmp_out_path = tmp_store.join(id.clone()+"-output"); + let tmp_out_path = tmp_store.join(id.clone()+"-output"); if let Err(e) = fs::create_dir_all(&tmp_out_path) { error!("create output dir {:?} fail {}", tmp_out_path, e); - return + return } //write batch files for entry in data_batch.cells.iter() { @@ -213,19 +213,19 @@ impl BatchProgram { } } state_map.insert(id, BatchState{ - state: StateEnum::Received, + state: DataStateEnum::Received, }); } }, Some((_, resp)) = ipc_process_data_req_rx.recv() => { //select a unassgined data for (key, v ) in state_map.iter_mut() { - if v.state == StateEnum::Received { + if v.state == DataStateEnum::Received { //response this data's position resp.send(DataResponse{ id: key.clone(), }).expect("channel only read once"); - v.state = StateEnum::Assigned ; + v.state = DataStateEnum::Assigned ; break; } } @@ -234,7 +234,7 @@ impl BatchProgram { //mark this data as completed match state_map.get_mut(&req.id) { Some(state)=>{ - state.state = StateEnum::Processed; + state.state = DataStateEnum::Processed; }, None=>error!("id({:?}) not found", &req.id) } @@ -280,7 +280,7 @@ impl BatchProgram { } let entry = state_map.get_mut(&req.id) .expect("this value has been inserted before"); - entry.state = StateEnum::Sent; + entry.state = DataStateEnum::Sent; } //remove data @@ -296,8 +296,11 @@ impl BatchProgram { Ok(()) } - pub(crate) async fn track_output_data(&mut self) ->Result<()> { - let upstreams = self.upstreams.as_ref().expect("input output node must have incoming nodes"); + pub(crate) async fn track_output_data(&mut self) -> Result<()> { + let upstreams = self + .upstreams + .as_ref() + .expect("input output node must have incoming nodes"); let (incoming_data_tx, mut incoming_data_rx) = mpsc::channel(1024); @@ -315,18 +318,17 @@ impl BatchProgram { //todo handle network disconnect let mut client = DataStreamClient::connect(upstream.clone()).await?; let mut stream = client.subscribe_media_data(Empty {}).await?.into_inner(); - + while let Some(item) = stream.next().await { tx_clone.send(item.unwrap()).await.unwrap(); } - + error!("unable read data from stream"); anyhow::Ok(()) }); } info!("listen data from upstream {}", upstream); } - //TODO this make a async process to be sync process. got a low performance, //if have any bottleneck here, we should refrator this one @@ -339,17 +341,17 @@ impl BatchProgram { if let Some(data_batch) = data_batch_result { //create input directory let id = Uuid::new_v4().to_string(); - let tmp_in_path = tmp_store.join(id.clone()+"-input"); + let tmp_in_path = tmp_store.join(id.clone()+"-input"); if let Err(e) = fs::create_dir_all(&tmp_in_path) { error!("create input dir {:?} fail {}", tmp_in_path, e); - return + return } //create output directory at the same time - let tmp_out_path = tmp_store.join(id.clone()+"-output"); + let tmp_out_path = tmp_store.join(id.clone()+"-output"); if let Err(e) = fs::create_dir_all(&tmp_out_path) { error!("create output dir {:?} fail {}", tmp_out_path, e); - return + return } //write batch files for entry in data_batch.cells.iter() { @@ -359,19 +361,19 @@ impl BatchProgram { } } state_map.insert(id, BatchState{ - state: StateEnum::Received, + state: DataStateEnum::Received, }); } }, Some((_, resp)) = ipc_process_data_req_rx.recv() => { //select a unassgined data for (key, v ) in state_map.iter_mut() { - if v.state == StateEnum::Received { + if v.state == DataStateEnum::Received { //response this data's position resp.send(DataResponse{ id: key.clone(), }).expect("channel only read once"); - v.state = StateEnum::Assigned ; + v.state = DataStateEnum::Assigned ; break; } } @@ -380,7 +382,7 @@ impl BatchProgram { //mark this data as completed match state_map.get_mut(&req.id) { Some(state)=>{ - state.state = StateEnum::Processed; + state.state = DataStateEnum::Processed; }, None=>error!("id({:?}) not found", &req.id) } diff --git a/runner/compute_data_runner/src/unit.rs b/runner/compute_unit_runner/src/unit.rs similarity index 87% rename from runner/compute_data_runner/src/unit.rs rename to runner/compute_unit_runner/src/unit.rs index 521184c..6935743 100644 --- a/runner/compute_data_runner/src/unit.rs +++ b/runner/compute_unit_runner/src/unit.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use jz_action::network::datatransfer::data_stream_server::DataStream; use jz_action::network::datatransfer::{MediaDataBatchResponse, TabularDataBatchResponse}; use jz_action::network::nodecontroller::node_controller_server::NodeController; @@ -13,10 +13,10 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::{Code, Request, Response, Status}; use tracing::error; -use super::program::{BatchProgram, ProgramState}; +use super::media_data_tracker::{MediaDataTracker, TrackerState}; pub(crate) struct DataNodeControllerServer { - pub(crate) program: Arc>, + pub(crate) program: Arc>, } #[tonic::async_trait] @@ -27,8 +27,10 @@ impl NodeController for DataNodeControllerServer { ) -> std::result::Result, Status> { let request = request.into_inner(); let mut program_guard = self.program.lock().await; - program_guard.state = ProgramState::Ready; - program_guard.node_type = NodeType::try_from(request.node_type).anyhow().to_rpc(Code::InvalidArgument)?; + program_guard.state = TrackerState::Ready; + program_guard.node_type = NodeType::try_from(request.node_type) + .anyhow() + .to_rpc(Code::InvalidArgument)?; program_guard.upstreams = Some(request.upstreams); program_guard.script = Some(request.script); @@ -45,7 +47,7 @@ impl NodeController for DataNodeControllerServer { _request: Request, ) -> std::result::Result, Status> { let mut program_guard = self.program.lock().await; - program_guard.state = ProgramState::Pending; + program_guard.state = TrackerState::Pending; Ok(Response::new(Empty {})) } @@ -54,19 +56,19 @@ impl NodeController for DataNodeControllerServer { _request: Request, ) -> std::result::Result, Status> { let mut program_guard = self.program.lock().await; - program_guard.state = ProgramState::Ready; + program_guard.state = TrackerState::Ready; Ok(Response::new(Empty {})) } async fn stop(&self, _request: Request) -> std::result::Result, Status> { let mut program_guard = self.program.lock().await; - program_guard.state = ProgramState::Stopped; + program_guard.state = TrackerState::Stopped; Ok(Response::new(Empty {})) } } pub(crate) struct UnitDataStream { - pub(crate) program: Arc>, + pub(crate) program: Arc>, } #[tonic::async_trait] @@ -99,13 +101,12 @@ impl DataStream for UnitDataStream { Ok(Response::new(ReceiverStream::new(rx))) } - type subscribe_tabular_dataStream = ReceiverStream>; async fn subscribe_tabular_data( &self, _request: Request, ) -> Result, Status> { - todo!() + todo!() } } diff --git a/runner/dp_runner/src/program.rs b/runner/dp_runner/src/channel_tracker.rs similarity index 83% rename from runner/dp_runner/src/program.rs rename to runner/dp_runner/src/channel_tracker.rs index 29bdf89..2888039 100644 --- a/runner/dp_runner/src/program.rs +++ b/runner/dp_runner/src/channel_tracker.rs @@ -4,16 +4,15 @@ use jz_action::network::datatransfer::data_stream_client::DataStreamClient; use jz_action::network::datatransfer::MediaDataBatchResponse; use std::sync::Arc; use tokio::select; -use tokio::sync::{broadcast, mpsc, Mutex}; -use tokio_stream::{Stream, StreamExt, StreamMap}; +use tokio::sync::{mpsc, Mutex}; +use tokio_stream::StreamExt; use tonic::Status; use tracing::{error, info}; -use tracing_subscriber::registry::Data; use crate::mprc; #[derive(Debug)] -pub(crate) enum ProgramState { +pub(crate) enum ChannelState { Init, Ready, Pending, @@ -21,20 +20,18 @@ pub(crate) enum ProgramState { Stopped, } -pub(crate) struct BatchProgram { - pub(crate) state: ProgramState, - +pub(crate) struct ChannelTracker { + pub(crate) state: ChannelState, pub(crate) upstreams: Option>, - pub(crate) script: Option, - - pub receivers: Arc>>>>, + pub receivers: + Arc>>>>, } -impl BatchProgram { +impl ChannelTracker { pub(crate) fn new() -> Self { - BatchProgram { - state: ProgramState::Init, + ChannelTracker { + state: ChannelState::Init, upstreams: None, script: None, receivers: Arc::new(Mutex::new(mprc::Mprs::new())), diff --git a/runner/dp_runner/src/main.rs b/runner/dp_runner/src/main.rs index 57c88f6..a899bac 100644 --- a/runner/dp_runner/src/main.rs +++ b/runner/dp_runner/src/main.rs @@ -1,7 +1,7 @@ #![feature(future_join)] +mod channel_tracker; mod mprc; -mod program; mod unit; use jz_action::network::datatransfer::data_stream_server::DataStreamServer; @@ -9,8 +9,8 @@ use jz_action::network::nodecontroller::node_controller_server::NodeControllerSe use jz_action::utils::StdIntoAnyhowResult; use anyhow::{anyhow, Result}; +use channel_tracker::ChannelTracker; use clap::Parser; -use program::BatchProgram; use std::str::FromStr; use std::sync::Arc; use tokio::select; @@ -45,7 +45,7 @@ async fn main() -> Result<()> { .anyhow()?; let addr = args.host_port.parse()?; - let program = BatchProgram::new(); + let program = ChannelTracker::new(); let program_safe = Arc::new(Mutex::new(program)); let data_stream = UnitDataStream { program: program_safe, diff --git a/runner/dp_runner/src/mprc.rs b/runner/dp_runner/src/mprc.rs index 80d0503..fee855e 100644 --- a/runner/dp_runner/src/mprc.rs +++ b/runner/dp_runner/src/mprc.rs @@ -1,6 +1,6 @@ -use std::{borrow::Borrow, vec}; use rand::Rng; use std::hash::Hash; +use std::{borrow::Borrow, vec}; use tokio::sync::mpsc; pub(crate) struct Mprs diff --git a/runner/dp_runner/src/unit.rs b/runner/dp_runner/src/unit.rs index 8d7f06f..9363c69 100644 --- a/runner/dp_runner/src/unit.rs +++ b/runner/dp_runner/src/unit.rs @@ -1,20 +1,20 @@ use anyhow::Result; +use jz_action::network::common::Empty; use jz_action::network::datatransfer::data_stream_server::DataStream; use jz_action::network::datatransfer::{MediaDataBatchResponse, TabularDataBatchResponse}; use jz_action::network::nodecontroller::node_controller_server::NodeController; use jz_action::network::nodecontroller::StartRequest; use jz_action::utils::{AnyhowToGrpc, IntoAnyhowResult}; -use jz_action::network::common::Empty; -use std:: sync::Arc; +use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; use tonic::{Code, Request, Response, Status}; -use super::program::{BatchProgram, ProgramState}; +use super::channel_tracker::{ChannelState, ChannelTracker}; pub(crate) struct DataNodeControllerServer { - pub(crate) program: Arc>, + pub(crate) program: Arc>, } #[tonic::async_trait] @@ -25,7 +25,7 @@ impl NodeController for DataNodeControllerServer { ) -> std::result::Result, Status> { let request = request.into_inner(); let mut program_guard = self.program.lock().await; - program_guard.state = ProgramState::Ready; + program_guard.state = ChannelState::Ready; program_guard.upstreams = Some(request.upstreams); program_guard.script = Some(request.script); @@ -38,7 +38,7 @@ impl NodeController for DataNodeControllerServer { _request: Request, ) -> std::result::Result, Status> { let mut program_guard = self.program.lock().await; - program_guard.state = ProgramState::Pending; + program_guard.state = ChannelState::Pending; Ok(Response::new(Empty {})) } @@ -47,19 +47,19 @@ impl NodeController for DataNodeControllerServer { _request: Request, ) -> std::result::Result, Status> { let mut program_guard = self.program.lock().await; - program_guard.state = ProgramState::Ready; + program_guard.state = ChannelState::Ready; Ok(Response::new(Empty {})) } async fn stop(&self, _request: Request) -> std::result::Result, Status> { let mut program_guard = self.program.lock().await; - program_guard.state = ProgramState::Stopped; + program_guard.state = ChannelState::Stopped; Ok(Response::new(Empty {})) } } pub(crate) struct UnitDataStream { - pub(crate) program: Arc>, + pub(crate) program: Arc>, } #[tonic::async_trait] @@ -89,6 +89,6 @@ impl DataStream for UnitDataStream { &self, _request: Request, ) -> Result, Status> { - todo!() + todo!() } } diff --git a/script/cd.dockerfile b/script/cd.dockerfile index a186923..4bf7115 100644 --- a/script/cd.dockerfile +++ b/script/cd.dockerfile @@ -4,4 +4,4 @@ RUN mkdir -p /app WORKDIR /app -ADD dist/compute_data_runner /compute_data_runner +ADD dist/compute_unit_runner /compute_unit_runner diff --git a/src/driver/kube.rs b/src/driver/kube.rs index 0b3a1dd..5631058 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -153,10 +153,7 @@ where { pub async fn default() -> Result> { let mut reg = Handlebars::new(); - reg.register_template_string( - "claim", - include_str!("kubetpl/claim.tpl"), - )?; + reg.register_template_string("claim", include_str!("kubetpl/claim.tpl"))?; reg.register_template_string("deployment", include_str!("kubetpl/deployment.tpl"))?; reg.register_template_string("service", include_str!("kubetpl/service.tpl"))?; @@ -179,10 +176,7 @@ where pub async fn from_k8s_client(client: Client) -> Result> { let mut reg = Handlebars::new(); - reg.register_template_string( - "claim", - include_str!("kubetpl/claim.tpl"), - )?; + reg.register_template_string("claim", include_str!("kubetpl/claim.tpl"))?; reg.register_template_string("deployment", include_str!("kubetpl/deployment.tpl"))?; reg.register_template_string("service", include_str!("kubetpl/service.tpl"))?; @@ -243,7 +237,7 @@ where #[derive(Serialize)] struct ClaimRenderParams { - name: String + name: String, } impl Driver for KubeDriver<'_, ID> @@ -260,14 +254,15 @@ where let mut pipeline_ctl = KubePipelineController::::default(); for node in graph.iter() { - let claim_string = self.reg.render("claim", &ClaimRenderParams{ - name: node.name.clone()+"-node-claim" - })?; + let claim_string = self.reg.render( + "claim", + &ClaimRenderParams { + name: node.name.clone() + "-node-claim", + }, + )?; debug!("rendered clam string {}", claim_string); let claim: PersistentVolumeClaim = serde_json::from_str(&claim_string)?; - let claim_deployment = claim_api - .create(&PostParams::default(), &claim) - .await?; + let claim_deployment = claim_api.create(&PostParams::default(), &claim).await?; let deployment_string = self.reg.render("deployment", node)?; debug!("rendered unit clam string {}", deployment_string); @@ -286,23 +281,27 @@ where .await?; let channel_handler = if node.channel.is_some() { - let claim_string = self.reg.render("claim", &ClaimRenderParams{ - name: node.name.clone()+"-channel-claim" - })?; + let claim_string = self.reg.render( + "claim", + &ClaimRenderParams { + name: node.name.clone() + "-channel-claim", + }, + )?; debug!("rendered channel claim string {}", claim_string); let claim: PersistentVolumeClaim = serde_json::from_str(&claim_string)?; - let claim_deployment = claim_api - .create(&PostParams::default(), &claim) - .await?; + let claim_deployment = claim_api.create(&PostParams::default(), &claim).await?; - let tmp = self.reg.get_template("channel_deployment").unwrap(); - debug!("xxxx {:?}",tmp); + let tmp = self.reg.get_template("channel_deployment").unwrap(); + debug!("xxxx {:?}", tmp); let channel_deployment_string = self.reg.render("channel_deployment", node)?; - debug!("rendered channel deployment string {}", channel_deployment_string); + debug!( + "rendered channel deployment string {}", + channel_deployment_string + ); debug!("ggggggggggggggggggg"); let channel_deployment: Deployment = serde_json::from_str(channel_deployment_string.as_str())?; - debug!("xxxxaasdasd"); + debug!("xxxxaasdasd"); let channel_deployment = deployment_api .create(&PostParams::default(), &channel_deployment) .await?; @@ -425,6 +424,6 @@ mod tests { let dag = Dag::::from_json(json_str).unwrap(); let kube_driver = KubeDriver::::default().await.unwrap(); kube_driver.deploy("ntest", &dag).await.unwrap(); - // kube_driver.clean("ntest").await.unwrap(); + // kube_driver.clean("ntest").await.unwrap(); } } diff --git a/src/driver/kubetpl/deployment.tpl b/src/driver/kubetpl/deployment.tpl index e32613a..5c4c77a 100644 --- a/src/driver/kubetpl/deployment.tpl +++ b/src/driver/kubetpl/deployment.tpl @@ -25,9 +25,9 @@ "containers": [ { "name": "compute-data-unit", - "image": "jz-action/compute_data_runner:latest", + "image": "jz-action/compute_unit_runner:latest", "command": [ - "/compute_data_runner" + "/compute_unit_runner" ], "imagePullPolicy":"IfNotPresent", "ports": [