diff --git a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs index b89d950..23580e4 100644 --- a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs +++ b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs @@ -6,7 +6,6 @@ use jz_action::utils::StdIntoAnyhowResult; use anyhow::{anyhow, Result}; use clap::Parser; use media_data_tracker::MediaDataTracker; -use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use tokio::select; diff --git a/crates/compute_unit_runner/src/fs_cache.rs b/crates/compute_unit_runner/src/fs_cache.rs index c78d87d..93047f0 100644 --- a/crates/compute_unit_runner/src/fs_cache.rs +++ b/crates/compute_unit_runner/src/fs_cache.rs @@ -6,7 +6,6 @@ use jz_action::{ }; use std::{ collections::HashMap, - hash::Hash, path::{Path, PathBuf}, sync::Arc, time::Instant, @@ -132,7 +131,7 @@ impl FileCache for MemCache { async fn read(&self, id: &str) -> Result { let now = Instant::now(); - let mut store = self.0.lock().await; + let store = self.0.lock().await; let result = match store.get(id) { Some(val) => Ok(val.clone()), None => Err(anyhow!("data {} not foud", id)), diff --git a/crates/compute_unit_runner/src/media_data_tracker.rs b/crates/compute_unit_runner/src/media_data_tracker.rs index a0c0fdc..3b9baae 100644 --- a/crates/compute_unit_runner/src/media_data_tracker.rs +++ b/crates/compute_unit_runner/src/media_data_tracker.rs @@ -1,27 +1,21 @@ use crate::fs_cache::FileCache; use crate::ipc::{AvaiableDataResponse, CompleteDataReq, SubmitOuputDataReq}; -use crate::mprc::Mprs; use anyhow::{anyhow, Ok, Result}; use jz_action::core::models::{DataRecord, DataState, DbRepo, Direction, NodeRepo, TrackerState}; use jz_action::network::common::Empty; use jz_action::network::datatransfer::data_stream_client::DataStreamClient; -use jz_action::network::datatransfer::{MediaDataBatchResponse, MediaDataCell}; -use std::collections::HashMap; -use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::error::TrySendError; use tonic::transport::Channel; -use tonic::{Code, Status}; +use tonic::Code; use tracing::{debug, error, info, warn}; -use walkdir::WalkDir; use crate::multi_sender::MultiSender; -use tokio::fs; use tokio::select; use tokio::sync::mpsc; use tokio::sync::Mutex; -use tokio::sync::{broadcast, oneshot}; +use tokio::sync::oneshot; use tokio::time::{self, sleep, Instant}; use tokio_stream::StreamExt; diff --git a/crates/compute_unit_runner/src/multi_sender.rs b/crates/compute_unit_runner/src/multi_sender.rs index 8b65499..62567f6 100644 --- a/crates/compute_unit_runner/src/multi_sender.rs +++ b/crates/compute_unit_runner/src/multi_sender.rs @@ -1,7 +1,6 @@ use jz_action::network::datatransfer::{ data_stream_client::DataStreamClient, MediaDataBatchResponse, }; -use std::collections::{hash_map::Entry, HashMap}; use tokio::time::Instant; use tonic::transport::Channel; use tracing::{debug, error}; diff --git a/crates/dp_runner/src/channel_tracker.rs b/crates/dp_runner/src/channel_tracker.rs index b8faf5a..ba62268 100644 --- a/crates/dp_runner/src/channel_tracker.rs +++ b/crates/dp_runner/src/channel_tracker.rs @@ -1,21 +1,14 @@ -use anyhow::{anyhow, Error, Result}; +use anyhow::{anyhow, Result}; use compute_unit_runner::fs_cache::FileCache; -use compute_unit_runner::ipc::AvaiableDataResponse; use jz_action::core::models::{DataRecord, DataState, DbRepo, Direction, TrackerState}; -use jz_action::network::common::Empty; -use jz_action::network::datatransfer::data_stream_client::DataStreamClient; use jz_action::network::datatransfer::MediaDataBatchResponse; -use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::Sender; use tokio::sync::{mpsc, oneshot, Mutex}; -use tokio::time::{self, sleep, Instant}; -use tokio::{fs, select}; -use tokio_stream::StreamExt; -use tonic::Status; +use tokio::time::{self, Instant}; +use tokio::select; use tracing::{debug, error, info, warn}; -use uuid::Uuid; pub struct ChannelTracker where diff --git a/crates/dp_runner/src/main.rs b/crates/dp_runner/src/main.rs index eda7671..d3636dd 100644 --- a/crates/dp_runner/src/main.rs +++ b/crates/dp_runner/src/main.rs @@ -8,8 +8,6 @@ use anyhow::{anyhow, Result}; use channel_tracker::ChannelTracker; use clap::Parser; use compute_unit_runner::fs_cache::*; -use std::fs::File; -use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use stream::ChannelDataStream; diff --git a/crates/dp_runner/src/stream.rs b/crates/dp_runner/src/stream.rs index ce520d6..114151d 100644 --- a/crates/dp_runner/src/stream.rs +++ b/crates/dp_runner/src/stream.rs @@ -1,15 +1,12 @@ use anyhow::Result; -use jz_action::core::models::{DbRepo, NodeRepo}; +use jz_action::core::models::DbRepo; use jz_action::network::common::Empty; use jz_action::network::datatransfer::data_stream_server::DataStream; use jz_action::network::datatransfer::{MediaDataBatchResponse, TabularDataBatchResponse}; -use jz_action::utils::{AnyhowToGrpc, IntoAnyhowResult}; use std::sync::Arc; use tokio::sync::Mutex; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::{Code, Request, Response, Status}; -use tracing::info; +use tokio::sync::oneshot; +use tonic::{Request, Response, Status}; use super::channel_tracker::ChannelTracker; diff --git a/nodes/dummy_out/src/main.rs b/nodes/dummy_out/src/main.rs index 19ec226..d235d1a 100644 --- a/nodes/dummy_out/src/main.rs +++ b/nodes/dummy_out/src/main.rs @@ -3,10 +3,9 @@ use anyhow::{anyhow, Result}; use clap::Parser; use compute_unit_runner::ipc::{self, IPCClient}; use jz_action::utils::StdIntoAnyhowResult; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::str::FromStr; use std::time::Duration; -use tokio::fs; use tokio::select; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc; diff --git a/nodes/jz_writer/src/main.rs b/nodes/jz_writer/src/main.rs index 5b5695a..02cc839 100644 --- a/nodes/jz_writer/src/main.rs +++ b/nodes/jz_writer/src/main.rs @@ -6,7 +6,7 @@ use jiaozifs_client_rs::apis::{self}; use jiaozifs_client_rs::models::RefType; use jz_action::utils::IntoAnyhowResult; use jz_action::utils::StdIntoAnyhowResult; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::str::FromStr; use std::time::Duration; use tokio::fs; diff --git a/src/dbrepo/mongo.rs b/src/dbrepo/mongo.rs index c957b5e..c68ac48 100644 --- a/src/dbrepo/mongo.rs +++ b/src/dbrepo/mongo.rs @@ -231,8 +231,8 @@ impl DataRepo for MongoRepo { #[cfg(test)] mod tests { use super::*; - use std::env; - use tracing_subscriber; + + fn is_send() where