diff --git a/Cargo.toml b/Cargo.toml index d6dff7c..bb75419 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ "crates/dp_runner", "crates/compute_unit_runner", "crates/jiaozifs_client_rs", "nodes/jz_reader", "nodes/jz_writer", "nodes/dummy_in", "nodes/dummy_out" -] +, "nodes/copy_in_place"] [workspace.package] repository = "https://github.com/GitDataAI/jz-action" diff --git a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs index 41dab66..7f02358 100644 --- a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs +++ b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs @@ -2,7 +2,6 @@ use compute_unit_runner::{ fs_cache::{ FSCache, FileCache, - MemCache, }, ipc, media_data_tracker, diff --git a/crates/compute_unit_runner/src/media_data_tracker.rs b/crates/compute_unit_runner/src/media_data_tracker.rs index f615723..c62a129 100644 --- a/crates/compute_unit_runner/src/media_data_tracker.rs +++ b/crates/compute_unit_runner/src/media_data_tracker.rs @@ -30,10 +30,7 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::sync::{ - broadcast, - mpsc::error::TrySendError, -}; +use tokio::sync::broadcast; use tonic::{ transport::Channel, Code, @@ -194,7 +191,7 @@ where let new_batch = match fs_cache.read(&req.id).await { Ok(batch) => batch, Err(err) => { - warn!("Failed to read batch: {}", err); + warn!("failed to read batch: {}", err); //todo how to handle missing data if let Err(err) = db_repo.update_state(&node_name, &req.id, Direction::Out, DataState::Error, None).await { error!("mark data {} to fail {}", &req.id, err); diff --git a/makefile b/makefile index caf7d2b..7e017ae 100644 --- a/makefile +++ b/makefile @@ -35,11 +35,15 @@ build-nodes: $(OUTPUT) cargo build -p dummy_out --release cp target/release/dummy_out $(OUTPUT)/dummy_out + cargo build -p copy_in_place --release + cp target/release/copy_in_place $(OUTPUT)/copy_in_place + docker_nodes: build-nodes docker build -f ./nodes/jz_reader/dockerfile -t jz-action/jz_reader:latest . docker build -f ./nodes/jz_writer/dockerfile -t jz-action/jz_writer:latest . docker build -f ./nodes/dummy_in/dockerfile -t jz-action/dummy_in:latest . docker build -f ./nodes/dummy_out/dockerfile -t jz-action/dummy_out:latest . + docker build -f ./nodes/copy_in_place/dockerfile -t jz-action/copy_in_place:latest . ################## minikube docker: docker_cd docker_dp docker_nodes diff --git a/nodes/copy_in_place/Cargo.toml b/nodes/copy_in_place/Cargo.toml new file mode 100644 index 0000000..c4be42c --- /dev/null +++ b/nodes/copy_in_place/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "copy_in_place" +version = "0.1.0" +repository.workspace = true +license.workspace = true +edition.workspace = true +include.workspace = true + +[dependencies] +jz_action = { path = "../../"} +compute_unit_runner = {path = "../../crates/compute_unit_runner"} + +uuid = {workspace = true} +tokio = { workspace = true} +tokio-retry = {workspace = true} +tokio-stream = {workspace = true} +anyhow = {workspace = true} +tracing = {workspace = true} +tracing-subscriber = {workspace = true} +clap = {version="4.5.7", features=["derive"]} +random_word = { version = "0.4.3", features = ["en"] } \ No newline at end of file diff --git a/nodes/copy_in_place/dockerfile b/nodes/copy_in_place/dockerfile new file mode 100644 index 0000000..fcadd80 --- /dev/null +++ b/nodes/copy_in_place/dockerfile @@ -0,0 +1,7 @@ +FROM jz-action/net-debug + +WORKDIR /app + +RUN mkdir -p /app + +ADD dist/copy_in_place /copy_in_place diff --git a/nodes/copy_in_place/src/main.rs b/nodes/copy_in_place/src/main.rs new file mode 100644 index 0000000..c96d7fd --- /dev/null +++ b/nodes/copy_in_place/src/main.rs @@ -0,0 +1,121 @@ +use anyhow::{ + anyhow, + Result, +}; +use clap::Parser; +use compute_unit_runner::ipc::{ + self, + IPCClient, + SubmitOuputDataReq, +}; +use jz_action::utils::StdIntoAnyhowResult; +use std::{ + path::Path, + str::FromStr, time::Duration, +}; +use tokio::{ + fs, + select, + signal::unix::{ + signal, + SignalKind, + }, + sync::mpsc, + time::{sleep, Instant}, +}; +use tracing::{ + error, + info, + Level, +}; + +#[derive(Debug, Parser)] +#[command( + name = "copy_in_place", + version = "0.0.1", + author = "Author Name ", + about = "embed in k8s images. move input directory to dest directory." +)] + +struct Args { + #[arg(short, long, default_value = "INFO")] + log_level: String, + + #[arg(short, long, default_value = "/unix_socket/compute_unit_runner_d")] + unix_socket_addr: String, + + #[arg(short, long, default_value = "/app/tmp")] + tmp_path: String, +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<()> { + let args = Args::parse(); + tracing_subscriber::fmt() + .with_max_level(Level::from_str(&args.log_level)?) + .try_init() + .anyhow()?; + + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::>(1); + { + let shutdown_tx = shutdown_tx.clone(); + let _ = tokio::spawn(async move { + if let Err(e) = copy_in_place(args).await { + let _ = shutdown_tx.send(Err(anyhow!("dummy read {e}"))).await; + } + }); + } + + { + //catch signal + let _ = tokio::spawn(async move { + let mut sig_term = signal(SignalKind::terminate()).unwrap(); + let mut sig_int = signal(SignalKind::interrupt()).unwrap(); + select! { + _ = sig_term.recv() => info!("Recieve SIGTERM"), + _ = sig_int.recv() => info!("Recieve SIGTINT"), + }; + let _ = shutdown_tx.send(Err(anyhow!("cancel by signal"))).await; + }); + } + + if let Some(Err(err)) = shutdown_rx.recv().await { + error!("program exit with error {:?}", err) + } + info!("gracefully shutdown"); + Ok(()) +} + +async fn copy_in_place(args: Args) -> Result<()> { + let client = ipc::IPCClientImpl::new(args.unix_socket_addr); + let tmp_path = Path::new(&args.tmp_path); + loop { + let instant = Instant::now(); + + let req = client.request_avaiable_data().await?; + if req.is_none() { + sleep(Duration::from_secs(2)).await; + continue; + } + + let id = req.unwrap().id; + let path_str = tmp_path.join(&id); + let root_input_dir = path_str.as_path(); + + + let new_id = uuid::Uuid::new_v4().to_string(); + let output_dir = tmp_path.join(&new_id); + + fs::rename(root_input_dir, output_dir).await?; + + info!("move data {:?}", instant.elapsed()); + + client.complete_result(&id).await?; + + //submit directory after completed a batch + client + .submit_output(SubmitOuputDataReq::new(&new_id, 30)) + .await?; + info!("submit new data {:?}", instant.elapsed()); + } +} diff --git a/src/bin/main.rs b/src/bin/main.rs new file mode 100644 index 0000000..0ef8971 --- /dev/null +++ b/src/bin/main.rs @@ -0,0 +1,2 @@ + +fn main(){} \ No newline at end of file diff --git a/src/driver/kube.rs b/src/driver/kube.rs index 4498925..ea67a5b 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -545,18 +545,33 @@ mod tests { "version": "v1", "dag": [ { - "name": "computeunit1", + "name": "dummy-in", "spec": { "image": "jz-action/dummy_in:latest", "command":"/dummy_in", "args": ["--log-level=debug"] } + }, { + "name": "copy-in-place", + "node_type": "ComputeUnit", + "dependency": [ + "dummy-in" + ], + "spec": { + "image": "jz-action/copy_in_place:latest", + "command":"/copy_in_place", + "replicas": 3, + "args": ["--log-level=debug"] + }, + "channel":{ + "cache_type":"Memory" + } }, { - "name": "computeunit2", + "name": "dummy-out", "node_type": "ComputeUnit", "dependency": [ - "computeunit1" + "copy-in-place" ], "spec": { "image": "jz-action/dummy_out:latest", diff --git a/src/lib.rs b/src/lib.rs index 9a40046..86d800b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub mod dbrepo; pub mod driver; pub mod network; pub mod utils; +pub mod rpc; /* use dag::Dag; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs new file mode 100644 index 0000000..e69de29