-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track and report network stat #124
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,24 @@ | ||
use std::sync::Arc; | ||
use std::time::{Duration, Instant}; | ||
|
||
use async_trait::async_trait; | ||
use cas_client::Client; | ||
use cas_client::{Client, UploadMetrics}; | ||
use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo}; | ||
use mdb_shard::ShardFileManager; | ||
use merkledb::aggregate_hashes::cas_node_hash; | ||
use merklehash::MerkleHash; | ||
use tokio::sync::{Mutex, Semaphore}; | ||
use tokio::task::JoinSet; | ||
use tracing::info; | ||
use utils::progress::ProgressUpdater; | ||
use utils::ThreadPool; | ||
|
||
use crate::data_processing::CASDataAggregator; | ||
use crate::errors::DataProcessingError::*; | ||
use crate::errors::*; | ||
|
||
const DEFAULT_NETWORK_STAT_REPORT_INTERVAL_SEC: u32 = 2; // 2 s | ||
|
||
#[async_trait] | ||
pub(crate) trait XorbUpload { | ||
/// Register a block of data ready for upload and dedup, return the hash of the produced xorb. | ||
|
@@ -23,7 +27,75 @@ pub(crate) trait XorbUpload { | |
async fn flush(&self) -> Result<()>; | ||
} | ||
|
||
type XorbUploadValueType = (MerkleHash, Vec<u8>, Vec<(MerkleHash, usize)>); | ||
struct NetworkStatCheckPoint { | ||
n_bytes: u64, | ||
start: Instant, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found using |
||
} | ||
|
||
impl Default for NetworkStatCheckPoint { | ||
fn default() -> Self { | ||
Self { | ||
n_bytes: 0, | ||
start: Instant::now(), | ||
} | ||
} | ||
} | ||
|
||
struct NetworkStat { | ||
accumulated: NetworkStatCheckPoint, | ||
last_check_point: NetworkStatCheckPoint, | ||
report_interval: Duration, | ||
} | ||
|
||
impl NetworkStat { | ||
fn new(report_interval_sec: u32) -> Self { | ||
Self { | ||
accumulated: Default::default(), | ||
last_check_point: Default::default(), | ||
report_interval: Duration::from_secs(report_interval_sec.into()), | ||
} | ||
} | ||
|
||
fn update_and_report(&mut self, metrics: &UploadMetrics, what: &str) { | ||
self.accumulated.n_bytes += metrics.n_bytes as u64; | ||
let now = Instant::now(); | ||
if now.duration_since(self.last_check_point.start) >= self.report_interval { | ||
Self::report_rate(&format!("{what} accumulated"), self.accumulated.n_bytes, self.accumulated.start, now); | ||
Self::report_rate( | ||
&format!("{what} instantaneous"), | ||
self.accumulated.n_bytes - self.last_check_point.n_bytes, | ||
self.last_check_point.start, | ||
now, | ||
); | ||
self.last_check_point = NetworkStatCheckPoint { | ||
n_bytes: self.accumulated.n_bytes, | ||
start: Instant::now(), | ||
}; | ||
} | ||
} | ||
|
||
fn report_rate(what: &str, n_bytes: u64, start: Instant, end: Instant) { | ||
const RATE_UNIT: [(f64, &str); 3] = [(1e9, "Gbps"), (1e6, "Mbps"), (1e3, "Kbps")]; | ||
|
||
let duration = end.duration_since(start); | ||
|
||
if n_bytes == 0 { | ||
info!("{what} rate: 0 bps"); | ||
} | ||
|
||
let bps = n_bytes as f64 * 8. / duration.as_secs_f64(); | ||
|
||
for (base, unit) in RATE_UNIT { | ||
let curr = bps / base; | ||
if curr > 1. { | ||
info!("{what} rate: {curr:.2} {unit}"); | ||
return; | ||
} | ||
} | ||
|
||
info!("{what} rate: {bps:.2} bps"); | ||
} | ||
} | ||
|
||
/// Helper to parallelize xorb upload and registration. | ||
/// Calls to registering xorbs return immediately after computing a xorb hash so callers | ||
|
@@ -40,14 +112,17 @@ pub(crate) struct ParallelXorbUploader { | |
cas: Arc<dyn Client + Send + Sync>, | ||
|
||
// Internal worker | ||
upload_tasks: Mutex<JoinSet<Result<()>>>, | ||
upload_tasks: Mutex<JoinSet<Result<UploadMetrics>>>, | ||
|
||
// Rate limiter | ||
rate_limiter: Arc<Semaphore>, | ||
|
||
// Theadpool | ||
threadpool: Arc<ThreadPool>, | ||
|
||
// Network metrics | ||
egress_stat: Mutex<NetworkStat>, | ||
|
||
// Upload Progress | ||
upload_progress_updater: Option<Arc<dyn ProgressUpdater>>, | ||
} | ||
|
@@ -68,14 +143,17 @@ impl ParallelXorbUploader { | |
upload_tasks: Mutex::new(JoinSet::new()), | ||
rate_limiter, | ||
threadpool, | ||
egress_stat: Mutex::new(NetworkStat::new(DEFAULT_NETWORK_STAT_REPORT_INTERVAL_SEC)), // report every 2 s | ||
upload_progress_updater, | ||
}) | ||
} | ||
|
||
async fn status_is_ok(&self) -> Result<()> { | ||
let mut upload_tasks = self.upload_tasks.lock().await; | ||
while let Some(result) = upload_tasks.try_join_next() { | ||
result??; | ||
let metrics = result??; | ||
let mut egress_rate = self.egress_stat.lock().await; | ||
egress_rate.update_and_report(&metrics, "Xorb upload"); | ||
} | ||
|
||
Ok(()) | ||
|
@@ -139,24 +217,28 @@ impl XorbUpload for ParallelXorbUploader { | |
let mut upload_tasks = self.upload_tasks.lock().await; | ||
|
||
while let Some(result) = upload_tasks.join_next().await { | ||
result??; | ||
let metrics = result??; | ||
let mut egress_rate = self.egress_stat.lock().await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try the lock and optimistically report. I think it is completely OK to skip the |
||
egress_rate.update_and_report(&metrics, "Xorb upload"); | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
type XorbUploadValueType = (MerkleHash, Vec<u8>, Vec<(MerkleHash, usize)>); | ||
|
||
async fn upload_and_register_xorb( | ||
item: XorbUploadValueType, | ||
shard_manager: Arc<ShardFileManager>, | ||
cas: Arc<dyn Client + Send + Sync>, | ||
cas_prefix: String, | ||
) -> Result<()> { | ||
) -> Result<UploadMetrics> { | ||
let (cas_hash, data, chunks) = item; | ||
|
||
let raw_bytes_len = data.len(); | ||
// upload xorb | ||
{ | ||
let metrics = { | ||
let mut pos = 0; | ||
let chunk_and_boundaries = chunks | ||
.iter() | ||
|
@@ -165,8 +247,8 @@ async fn upload_and_register_xorb( | |
(*hash, pos as u32) | ||
}) | ||
.collect(); | ||
cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await?; | ||
} | ||
cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await? | ||
}; | ||
|
||
// register for dedup | ||
// This should happen after uploading xorb above succeeded so not to | ||
|
@@ -188,5 +270,5 @@ async fn upload_and_register_xorb( | |
shard_manager.add_cas_block(cas_info).await?; | ||
} | ||
|
||
Ok(()) | ||
Ok(metrics) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably have this set low for testing, would prefer larger in prod - maybe 100s