Skip to content

Commit

Permalink
feat: add copy in place nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 5, 2024
1 parent 3a70675 commit 574dd85
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = [
"crates/dp_runner",
"crates/compute_unit_runner",
"crates/jiaozifs_client_rs", "nodes/jz_reader", "nodes/jz_writer", "nodes/dummy_in", "nodes/dummy_out"
]
, "nodes/copy_in_place"]

[workspace.package]
repository = "https://github.com/GitDataAI/jz-action"
Expand Down
1 change: 0 additions & 1 deletion crates/compute_unit_runner/src/bin/compute_unit_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use compute_unit_runner::{
fs_cache::{
FSCache,
FileCache,
MemCache,
},
ipc,
media_data_tracker,
Expand Down
7 changes: 2 additions & 5 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::sync::{
broadcast,
mpsc::error::TrySendError,
};
use tokio::sync::broadcast;
use tonic::{
transport::Channel,
Code,
Expand Down Expand Up @@ -194,7 +191,7 @@ where
let new_batch = match fs_cache.read(&req.id).await {
Ok(batch) => batch,
Err(err) => {
warn!("Failed to read batch: {}", err);
warn!("failed to read batch: {}", err);
//todo how to handle missing data
if let Err(err) = db_repo.update_state(&node_name, &req.id, Direction::Out, DataState::Error, None).await {
error!("mark data {} to fail {}", &req.id, err);
Expand Down
4 changes: 4 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ build-nodes: $(OUTPUT)
cargo build -p dummy_out --release
cp target/release/dummy_out $(OUTPUT)/dummy_out

cargo build -p copy_in_place --release
cp target/release/copy_in_place $(OUTPUT)/copy_in_place

docker_nodes: build-nodes
docker build -f ./nodes/jz_reader/dockerfile -t jz-action/jz_reader:latest .
docker build -f ./nodes/jz_writer/dockerfile -t jz-action/jz_writer:latest .
docker build -f ./nodes/dummy_in/dockerfile -t jz-action/dummy_in:latest .
docker build -f ./nodes/dummy_out/dockerfile -t jz-action/dummy_out:latest .
docker build -f ./nodes/copy_in_place/dockerfile -t jz-action/copy_in_place:latest .

################## minikube
docker: docker_cd docker_dp docker_nodes
Expand Down
21 changes: 21 additions & 0 deletions nodes/copy_in_place/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "copy_in_place"
version = "0.1.0"
repository.workspace = true
license.workspace = true
edition.workspace = true
include.workspace = true

[dependencies]
jz_action = { path = "../../"}
compute_unit_runner = {path = "../../crates/compute_unit_runner"}

uuid = {workspace = true}
tokio = { workspace = true}
tokio-retry = {workspace = true}
tokio-stream = {workspace = true}
anyhow = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
clap = {version="4.5.7", features=["derive"]}
random_word = { version = "0.4.3", features = ["en"] }
7 changes: 7 additions & 0 deletions nodes/copy_in_place/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM jz-action/net-debug

WORKDIR /app

RUN mkdir -p /app

ADD dist/copy_in_place /copy_in_place
121 changes: 121 additions & 0 deletions nodes/copy_in_place/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use anyhow::{
anyhow,
Result,
};
use clap::Parser;
use compute_unit_runner::ipc::{
self,
IPCClient,
SubmitOuputDataReq,
};
use jz_action::utils::StdIntoAnyhowResult;
use std::{
path::Path,
str::FromStr, time::Duration,
};
use tokio::{
fs,
select,
signal::unix::{
signal,
SignalKind,
},
sync::mpsc,
time::{sleep, Instant},
};
use tracing::{
error,
info,
Level,
};

#[derive(Debug, Parser)]
#[command(
name = "copy_in_place",
version = "0.0.1",
author = "Author Name <github.com/GitDataAI/jz-action>",
about = "embed in k8s images. move input directory to dest directory."
)]

struct Args {
#[arg(short, long, default_value = "INFO")]
log_level: String,

#[arg(short, long, default_value = "/unix_socket/compute_unit_runner_d")]
unix_socket_addr: String,

#[arg(short, long, default_value = "/app/tmp")]
tmp_path: String,
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
let args = Args::parse();
tracing_subscriber::fmt()
.with_max_level(Level::from_str(&args.log_level)?)
.try_init()
.anyhow()?;

let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<Result<()>>(1);
{
let shutdown_tx = shutdown_tx.clone();
let _ = tokio::spawn(async move {
if let Err(e) = copy_in_place(args).await {
let _ = shutdown_tx.send(Err(anyhow!("dummy read {e}"))).await;
}
});
}

{
//catch signal
let _ = tokio::spawn(async move {
let mut sig_term = signal(SignalKind::terminate()).unwrap();
let mut sig_int = signal(SignalKind::interrupt()).unwrap();
select! {
_ = sig_term.recv() => info!("Recieve SIGTERM"),
_ = sig_int.recv() => info!("Recieve SIGTINT"),
};
let _ = shutdown_tx.send(Err(anyhow!("cancel by signal"))).await;
});
}

if let Some(Err(err)) = shutdown_rx.recv().await {
error!("program exit with error {:?}", err)
}
info!("gracefully shutdown");
Ok(())
}

async fn copy_in_place(args: Args) -> Result<()> {
let client = ipc::IPCClientImpl::new(args.unix_socket_addr);
let tmp_path = Path::new(&args.tmp_path);
loop {
let instant = Instant::now();

let req = client.request_avaiable_data().await?;
if req.is_none() {
sleep(Duration::from_secs(2)).await;
continue;
}

let id = req.unwrap().id;
let path_str = tmp_path.join(&id);
let root_input_dir = path_str.as_path();


let new_id = uuid::Uuid::new_v4().to_string();
let output_dir = tmp_path.join(&new_id);

fs::rename(root_input_dir, output_dir).await?;

info!("move data {:?}", instant.elapsed());

client.complete_result(&id).await?;

//submit directory after completed a batch
client
.submit_output(SubmitOuputDataReq::new(&new_id, 30))
.await?;
info!("submit new data {:?}", instant.elapsed());
}
}
2 changes: 2 additions & 0 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

fn main(){}
21 changes: 18 additions & 3 deletions src/driver/kube.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,18 +545,33 @@ mod tests {
"version": "v1",
"dag": [
{
"name": "computeunit1",
"name": "dummy-in",
"spec": {
"image": "jz-action/dummy_in:latest",
"command":"/dummy_in",
"args": ["--log-level=debug"]
}
}, {
"name": "copy-in-place",
"node_type": "ComputeUnit",
"dependency": [
"dummy-in"
],
"spec": {
"image": "jz-action/copy_in_place:latest",
"command":"/copy_in_place",
"replicas": 3,
"args": ["--log-level=debug"]
},
"channel":{
"cache_type":"Memory"
}
},
{
"name": "computeunit2",
"name": "dummy-out",
"node_type": "ComputeUnit",
"dependency": [
"computeunit1"
"copy-in-place"
],
"spec": {
"image": "jz-action/dummy_out:latest",
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod dbrepo;
pub mod driver;
pub mod network;
pub mod utils;
pub mod rpc;

/*
use dag::Dag;
Expand Down
Empty file added src/rpc/mod.rs
Empty file.

0 comments on commit 574dd85

Please sign in to comment.