Skip to content

Commit

Permalink
feat: change deployment to statefulset
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Jul 30, 2024
1 parent f35e80f commit 713ad9a
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 270 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde_json = {version = "1.0.117"}
serde = {version ="1.0.203", features = ["derive"]}
uuid = {version="1.8.0", features = ["v4","serde"]}
mongodb = {version="3.0.1"}

chrono = "0.4"


[package]
Expand All @@ -56,6 +56,7 @@ tonic = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}
mongodb = {workspace = true}
chrono = {workspace = true}

[build-dependencies]
tonic-build = "0.12.1"
Expand Down
222 changes: 22 additions & 200 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,91 +87,8 @@ where
}
}

pub(crate) async fn process_data_cmd(&mut self) -> Result<()> {
match self.node_type {
NodeType::Input => self.track_input_data().await,
NodeType::InputOutput => self.track_input_output_data().await,
NodeType::Output => self.track_output_data().await,
}
}

/// data was trasfer from user contaienr to data container
pub(crate) async fn track_input_data(&mut self) -> Result<()> {
let (ipc_process_submit_result_tx, mut ipc_process_submit_result_rx) = mpsc::channel(1024);
self.ipc_process_submit_output_tx = Some(ipc_process_submit_result_tx);

//TODO this make a async process to be sync process. got a low performance,
//if have any bottleneck here, we should refrator this one
let out_going_tx = self.out_going_tx.clone();
let tmp_store = self.tmp_store.clone();

let mut state_map: HashMap<String, BatchState> = HashMap::new();
tokio::spawn(async move {
loop {
select! {
Some((req, resp)) = ipc_process_submit_result_rx.recv() => {
state_map.insert(req.id.clone(), BatchState{
state: DataStateEnum::Processed,
});

// respose with nothing
resp.send(()).expect("channel only read once");

let tmp_out_path = tmp_store.join(req.id.clone());
let mut new_batch =MediaDataBatchResponse::default();

let mut entry_count = 0 ;
for entry in WalkDir::new(tmp_out_path) {
match entry {
Ok(entry) => {
if entry.file_type().is_file() {
let path = entry.path();
match fs::read(path).await {
Ok(content) => {
new_batch.cells.push(MediaDataCell{
size: content.len() as i32,
path: path.to_str().unwrap().to_string(),
data: content,
});
entry_count+=1;
}
Err(e) => error!("read file({:?}) fail {}", path, e),
}

println!("{}", entry.path().display());
}
}
Err(e) => error!("walk out dir({:?}) fail {}", &req.id, e),
}
}
new_batch.size = entry_count;

//write outgoing
if new_batch.size >0 {
if let Err(e) = out_going_tx.send(new_batch) {
error!("send data {}", e);
continue;
}

let entry = state_map.get_mut(&req.id)
.expect("this value has been inserted before");
entry.state = DataStateEnum::Sent;
}
let _ = state_map.remove(&req.id);
},
}
}
});
Ok(())
}

/// data was transfer from data container -> user container -> data container
pub(crate) async fn track_input_output_data(&mut self) -> Result<()> {
let upstreams = self
.upstreams
.as_ref()
.expect("input output node must have incoming nodes");

pub(crate) async fn process_data_cmd(&mut self) -> Result<()> {
let (incoming_data_tx, mut incoming_data_rx) = mpsc::channel(1024);

let (ipc_process_data_req_tx, mut ipc_process_data_req_rx) = mpsc::channel(1024);
Expand All @@ -184,26 +101,30 @@ where
mpsc::channel(1024);
self.ipc_process_completed_data_tx = Some(ipc_process_completed_data_tx);

for upstream in upstreams {
{
let upstream = upstream.clone();
let tx_clone = incoming_data_tx.clone();
let _ = tokio::spawn(async move {
//todo handle network disconnect
let mut client = DataStreamClient::connect(upstream.clone()).await?;
let mut stream = client.subscribe_media_data(Empty {}).await?.into_inner();

while let Some(item) = stream.next().await {
tx_clone.send(item.unwrap()).await.unwrap();
}

error!("unable read data from stream");
anyhow::Ok(())
});
if let Some(upstreams) = self.upstreams {
info!("Start listen upstream {} ....", upstream);
for upstream in upstreams {
{
let upstream = upstream.clone();
let tx_clone = incoming_data_tx.clone();
let _ = tokio::spawn(async move {
//todo handle network disconnect
let mut client = DataStreamClient::connect(upstream.clone()).await?;
let mut stream = client.subscribe_media_data(Empty {}).await?.into_inner();

while let Some(item) = stream.next().await {
tx_clone.send(item.unwrap()).await.unwrap();
}

error!("unable read data from stream");
anyhow::Ok(())
});
}
info!("listen incoming data from upstream {}", upstream);
}
info!("listen data from upstream {}", upstream);
}


//TODO this make a async process to be sync process. got a low performance,
//if have any bottleneck here, we should refrator this one
let out_going_tx = self.out_going_tx.clone();
Expand Down Expand Up @@ -318,105 +239,6 @@ where
Ok(())
}

pub(crate) async fn track_output_data(&mut self) -> Result<()> {
let upstreams = self
.upstreams
.as_ref()
.expect("input output node must have incoming nodes");

let (incoming_data_tx, mut incoming_data_rx) = mpsc::channel(1024);

let (ipc_process_data_req_tx, mut ipc_process_data_req_rx) = mpsc::channel(1024);
self.ipc_process_data_req_tx = Some(ipc_process_data_req_tx);

let (ipc_process_submit_result_tx, mut ipc_process_submit_result_rx) = mpsc::channel(1024);
self.ipc_process_submit_output_tx = Some(ipc_process_submit_result_tx);

for upstream in upstreams {
{
let upstream = upstream.clone();
let tx_clone = incoming_data_tx.clone();
let _ = tokio::spawn(async move {
//todo handle network disconnect
let mut client = DataStreamClient::connect(upstream.clone()).await?;
let mut stream = client.subscribe_media_data(Empty {}).await?.into_inner();

while let Some(item) = stream.next().await {
tx_clone.send(item.unwrap()).await.unwrap();
}

error!("unable read data from stream");
anyhow::Ok(())
});
}
info!("listen data from upstream {}", upstream);
}

//TODO this make a async process to be sync process. got a low performance,
//if have any bottleneck here, we should refrator this one
let tmp_store = self.tmp_store.clone();
//todo use db to keep this data.
let mut state_map: HashMap<String, BatchState> = HashMap::new();
tokio::spawn(async move {
loop {
select! {
data_batch_result = incoming_data_rx.recv() => {
if let Some(data_batch) = data_batch_result {
//create input directory
let id = Uuid::new_v4().to_string();
let tmp_in_path = tmp_store.join(id.clone());
if let Err(e) = fs::create_dir_all(&tmp_in_path).await {
error!("create input dir {:?} fail {}", tmp_in_path, e);
return
}

//write batch files
for entry in data_batch.cells.iter() {
let entry_path = tmp_in_path.join(entry.path.clone());
if let Err(e) = fs::write(entry_path.clone(), &entry.data).await {
error!("write file {:?} fail {}", entry_path, e);
}
}
state_map.insert(id, BatchState{
state: DataStateEnum::Received,
});
}
},
Some((_, resp)) = ipc_process_data_req_rx.recv() => {
//select a unassgined data
for (key, v ) in state_map.iter_mut() {
if v.state == DataStateEnum::Received {
//response this data's position
resp.send(AvaiableDataResponse{
id: key.clone(),
}).expect("channel only read once");
v.state = DataStateEnum::Assigned ;
break;
}
}
},
Some((req, _resp)) = ipc_process_submit_result_rx.recv() => {
//mark this data as completed
match state_map.get_mut(&req.id) {
Some(state)=>{
state.state = DataStateEnum::Processed;
},
None=>error!("id({:?}) not found", &req.id)
}
//remove data
let tmp_path = tmp_store.join(req.id.clone());
if let Err(e) = fs::remove_dir_all(&tmp_path).await {
error!("remove tmp dir{:?} fail {}", tmp_path, e);
}
//remove state
let _ = state_map.remove(&req.id);
},
}
}
});
Ok(())
}

pub async fn apply_db_state(
repo: R,
name: &str,
Expand Down
1 change: 0 additions & 1 deletion src/core/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub struct Node {
pub node_name: String,
pub state: TrackerState,
pub node_type: NodeType,
pub input_output_type: i32,
pub upstreams: Vec<String>,
pub created_at: i64,
pub updated_at: i64,
Expand Down
11 changes: 11 additions & 0 deletions src/dag/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use anyhow::{anyhow, Ok, Result};
use std::collections::HashMap;

pub struct Dag {
pub raw: String,
pub name: String,
nodes: HashMap<String, ComputeUnit>,
/// Store dependency relations.
Expand All @@ -14,6 +15,7 @@ pub struct Dag {
impl Dag {
pub fn new() -> Self {
Dag {
raw: String::new(),
name: String::new(),
nodes: HashMap::new(),
rely_graph: Graph::new(),
Expand Down Expand Up @@ -43,6 +45,14 @@ impl Dag {
Ok(self)
}

pub fn get_nodes(&self, node_id: &str)-> Vec<&str> {
self.rely_graph.get_incoming_nodes(node_id)
}

pub fn get_incomming_nodes(&self, node_id: &str)-> Vec<&str> {
self.rely_graph.get_incoming_nodes(node_id)
}

// from_json build graph from json string
pub fn from_json<'a>(json: &'a str) -> Result<Self> {
let value: serde_json::Value = serde_json::from_str(json)?;
Expand Down Expand Up @@ -73,6 +83,7 @@ impl Dag {
.collect();
Ok(Dag {
name: dag_name.to_string(),
raw: json.to_string(),
nodes: nodes_map,
rely_graph: rely_graph,
})
Expand Down
12 changes: 10 additions & 2 deletions src/dag/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub(crate) struct Graph {
/// Adjacency list of graph (stored as a vector of vector of indices)
adj: HashMap<String, Vec<String>>,
/// Node's in_degree, used for topological sort
in_degree: HashMap<String, Vec<String>>,
in_degree: HashMap<String, Vec<String>>,
}

impl Graph {
Expand Down Expand Up @@ -150,14 +150,22 @@ impl Graph {
}
}

/// Get the out degree of a node.
/// Get the in_comming nodes.
pub(crate) fn get_in_degree(&self, id: &str) -> usize {
match self.in_degree.get(id) {
Some(id) => id.len(),
None => 0,
}
}

/// Get the out degree of a node.
pub(crate) fn get_incoming_nodes(&self, id: &str) -> Vec<&str> {
match self.in_degree.get(id) {
Some(id) => id.iter().map(|v|v.as_str()).collect(),
None => vec![],
}
}

/// Get all the successors of a node (direct or indirect).
/// This function will return a vector of indices of successors (including itself).
pub(crate) fn get_node_successors(&self, id: &str) -> Vec<String> {
Expand Down
Loading

0 comments on commit 713ad9a

Please sign in to comment.