Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Jul 30, 2024
1 parent 713ad9a commit 7b5480f
Show file tree
Hide file tree
Showing 23 changed files with 93 additions and 741 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[workspace]
members = [
"crates/dp_runner",
"crates/compute_unit_runner",
"crates/jiaozifs_client_rs", "nodes/jz_reader", "nodes/jz_writer",
]
Expand Down
3 changes: 2 additions & 1 deletion crates/compute_unit_runner/src/bin/compute_unit_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ async fn main() -> Result<()> {
.try_init()
.anyhow()?;

let db_repo = Arc::new(MongoRepo::new(MongoConfig::new(args.mongo_url.clone()), &args.database).await?);
let db_repo =
Arc::new(MongoRepo::new(MongoConfig::new(args.mongo_url.clone()), &args.database).await?);

let program = MediaDataTracker::new(
db_repo.clone(),
Expand Down
4 changes: 4 additions & 0 deletions crates/compute_unit_runner/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use http_body_util::Collected;
use jz_action::core::models::NodeRepo;
use jz_action::utils::StdIntoAnyhowResult;
use serde::{Deserialize, Serialize};
use tracing::info;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -49,6 +50,7 @@ async fn process_data_request<R>(
where
R: NodeRepo,
{
info!("receive avaiable data reqeust");
let (tx, mut rx) = oneshot::channel::<AvaiableDataResponse>();
let program = program_mutex.lock().await;
program
Expand All @@ -72,6 +74,7 @@ async fn process_completed_request<R>(
where
R: NodeRepo,
{
info!("receive data completed request");
let (tx, mut rx) = oneshot::channel::<()>();
let program = program_mutex.lock().await;
//read request
Expand All @@ -98,6 +101,7 @@ async fn process_submit_output_request<R>(
where
R: NodeRepo,
{
info!("receive submit output request");
let (tx, mut rx) = oneshot::channel::<()>();
let program = program_mutex.lock().await;
//read request
Expand Down
22 changes: 8 additions & 14 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use jz_action::core::models::{NodeRepo, TrackerState};
use jz_action::network::common::Empty;
use jz_action::network::datatransfer::data_stream_client::DataStreamClient;
use jz_action::network::datatransfer::{MediaDataBatchResponse, MediaDataCell};
use jz_action::network::nodecontroller::NodeType;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::{error, info};
use tracing::{debug, error, info};
use uuid::Uuid;
use walkdir::WalkDir;

use jz_action::utils::StdIntoAnyhowResult;
use tokio::fs;
use tokio::select;
use tokio::sync::mpsc;
Expand All @@ -20,8 +20,6 @@ use tokio::sync::{broadcast, oneshot};
use tokio::time;
use tokio_stream::StreamExt;

use jz_action::utils::StdIntoAnyhowResult;

#[derive(Debug, PartialEq)]
pub enum DataStateEnum {
Received,
Expand All @@ -47,8 +45,6 @@ where

pub(crate) local_state: TrackerState,

pub(crate) node_type: NodeType,

pub(crate) upstreams: Option<Vec<String>>,

// channel for process avaiable data request
Expand Down Expand Up @@ -77,7 +73,6 @@ where
tmp_store,
_name: name.to_string(),
_repo: repo,
node_type: NodeType::Input,
local_state: TrackerState::Init,
upstreams: None,
ipc_process_submit_output_tx: None,
Expand All @@ -101,8 +96,8 @@ where
mpsc::channel(1024);
self.ipc_process_completed_data_tx = Some(ipc_process_completed_data_tx);

if let Some(upstreams) = self.upstreams {
info!("Start listen upstream {} ....", upstream);
if let Some(upstreams) = self.upstreams.as_ref() {
info!("Start listen upstream {:?} ....", upstreams);
for upstream in upstreams {
{
let upstream = upstream.clone();
Expand All @@ -111,11 +106,11 @@ where
//todo handle network disconnect
let mut client = DataStreamClient::connect(upstream.clone()).await?;
let mut stream = client.subscribe_media_data(Empty {}).await?.into_inner();

while let Some(item) = stream.next().await {
tx_clone.send(item.unwrap()).await.unwrap();
}

error!("unable read data from stream");
anyhow::Ok(())
});
Expand All @@ -124,7 +119,6 @@ where
}
}


//TODO this make a async process to be sync process. got a low performance,
//if have any bottleneck here, we should refrator this one
let out_going_tx = self.out_going_tx.clone();
Expand Down Expand Up @@ -251,15 +245,15 @@ where
.get_node_by_name(name)
.await
.expect("record has inserted in controller or network error");
debug!("fetch state from db");
match record.state {
TrackerState::Ready => {
let mut program_guard = program.lock().await;

if matches!(program_guard.local_state, TrackerState::Init) {
//start
info!("set to ready state {:?}", record.upstreams);
program_guard.local_state = TrackerState::Ready;
program_guard.node_type =
NodeType::try_from(record.input_output_type).anyhow()?;
program_guard.upstreams = Some(record.upstreams);
program_guard.process_data_cmd().await?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/compute_unit_runner/src/unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Code, Request, Response, Status};
use tracing::error;
use tracing::{error, info};

use super::media_data_tracker::MediaDataTracker;

Expand Down Expand Up @@ -45,7 +45,7 @@ where
&self,
request: Request<Empty>,
) -> Result<Response<Self::subscribeMediaDataStream>, Status> {
println!("recieve new data request {:?}", request);
info!("recieve media subscribe request {:?}", request);

let (tx, rx) = mpsc::channel(4);
let program_guard = self.program.lock().await;
Expand Down
18 changes: 0 additions & 18 deletions crates/dp_runner/Cargo.toml

This file was deleted.

7 changes: 0 additions & 7 deletions crates/dp_runner/dockerfile

This file was deleted.

116 changes: 0 additions & 116 deletions crates/dp_runner/src/channel_tracker.rs

This file was deleted.

Loading

0 comments on commit 7b5480f

Please sign in to comment.