Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track and report network stat #124

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cas_client/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ use utils::progress::ProgressUpdater;
use crate::error::Result;
use crate::CasClientError;

/// Metrics regarding a XORB upload.
#[derive(Debug)]
pub struct UploadMetrics {
pub n_bytes: usize,
}

/// A Client to the CAS (Content Addressed Storage) service to allow storage and
/// management of XORBs (Xet Object Remote Block). A XORB represents a collection
/// of arbitrary bytes. These bytes are hashed according to a Xet Merkle Hash
Expand All @@ -35,7 +41,7 @@ pub trait UploadClient {
hash: &MerkleHash,
data: Vec<u8>,
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<()>;
) -> Result<UploadMetrics>;

/// Check if a XORB already exists.
async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result<bool>;
Expand Down
2 changes: 1 addition & 1 deletion cas_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
pub use chunk_cache::CacheConfig;
pub use http_client::{build_auth_http_client, build_http_client};
use interface::RegistrationClient;
pub use interface::{Client, ReconstructionClient, UploadClient};
pub use interface::{Client, ReconstructionClient, UploadClient, UploadMetrics};
pub use local_client::{tests_utils, LocalClient};
pub use remote_client::RemoteClient;

Expand Down
16 changes: 8 additions & 8 deletions cas_client/src/local_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tracing::{debug, info};

use crate::error::{CasClientError, Result};
use crate::interface::UploadClient;
use crate::UploadMetrics;

#[derive(Debug)]
pub struct LocalClient {
Expand Down Expand Up @@ -105,7 +106,7 @@ impl UploadClient for LocalClient {
hash: &MerkleHash,
data: Vec<u8>,
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<()> {
) -> Result<UploadMetrics> {
// no empty writes
if chunk_and_boundaries.is_empty() || data.is_empty() {
return Err(CasClientError::InvalidArguments);
Expand All @@ -120,7 +121,7 @@ impl UploadClient for LocalClient {

if self.exists(prefix, hash).await? {
info!("{prefix:?}/{hash:?} already exists in Local CAS; returning.");
return Ok(());
return Ok(UploadMetrics { n_bytes: data.len() });
}

let file_path = self.get_path_for_entry(prefix, hash);
Expand All @@ -136,8 +137,7 @@ impl UploadClient for LocalClient {
CasClientError::InternalError(anyhow!("Unable to create temporary file for staging Xorbs, got {e:?}"))
})?;

let total_bytes_written;
{
let bytes_written = {
let mut writer = BufWriter::new(&tempfile);
let (_, bytes_written) = CasObject::serialize(
&mut writer,
Expand All @@ -148,8 +148,8 @@ impl UploadClient for LocalClient {
)?;
// flush before persisting
writer.flush()?;
total_bytes_written = bytes_written;
}
bytes_written
};

tempfile.persist(&file_path).map_err(|e| e.error)?;

Expand All @@ -161,9 +161,9 @@ impl UploadClient for LocalClient {
let _ = std::fs::set_permissions(&file_path, permissions);
}

info!("{file_path:?} successfully written with {total_bytes_written:?} bytes.");
info!("{file_path:?} successfully written with {bytes_written:?} bytes.");

Ok(())
Ok(UploadMetrics { n_bytes: bytes_written })
}

async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result<bool> {
Expand Down
14 changes: 8 additions & 6 deletions cas_client/src/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,21 @@ impl UploadClient for RemoteClient {
hash: &MerkleHash,
data: Vec<u8>,
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<()> {
) -> Result<UploadMetrics> {
let key = Key {
prefix: prefix.to_string(),
hash: *hash,
};

let was_uploaded = self.upload(&key, &data, chunk_and_boundaries).await?;
let (was_uploaded, metrics) = self.upload(&key, &data, chunk_and_boundaries).await?;

if !was_uploaded {
debug!("{key:?} not inserted into CAS.");
} else {
debug!("{key:?} inserted into CAS.");
}

Ok(())
Ok(metrics)
}

async fn exists(&self, prefix: &str, hash: &MerkleHash) -> Result<bool> {
Expand Down Expand Up @@ -245,12 +245,12 @@ impl RemoteClient {
key: &Key,
contents: &[u8],
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<bool> {
) -> Result<(bool, UploadMetrics)> {
let url = Url::parse(&format!("{}/xorb/{key}", self.endpoint))?;

let mut writer = Cursor::new(Vec::new());

let (_, _) = CasObject::serialize(
let (_, n_bytes) = CasObject::serialize(
&mut writer,
&key.hash,
contents,
Expand All @@ -265,7 +265,9 @@ impl RemoteClient {
let response = self.http_auth_client.post(url).body(data).send().await?;
let response_parsed: UploadXorbResponse = response.json().await?;

Ok(response_parsed.was_inserted)
let metrics = UploadMetrics { n_bytes };

Ok((response_parsed.was_inserted, metrics))
}

/// use the reconstruction response from CAS to re-create the described file for any calls
Expand Down
1 change: 1 addition & 0 deletions cas_object/src/cas_object_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl CasObject {

/// Serialize into Cas Object from uncompressed data and chunk boundaries.
/// Assumes correctness from caller: it's the receiver's responsibility to validate a cas object.
/// Returns a tuple of CasObject and number of bytes in the serialized XORB.
pub fn serialize<W: Write + Seek>(
writer: &mut W,
hash: &MerkleHash,
Expand Down
102 changes: 92 additions & 10 deletions data/src/parallel_xorb_uploader.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use cas_client::Client;
use cas_client::{Client, UploadMetrics};
use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo};
use mdb_shard::ShardFileManager;
use merkledb::aggregate_hashes::cas_node_hash;
use merklehash::MerkleHash;
use tokio::sync::{Mutex, Semaphore};
use tokio::task::JoinSet;
use tracing::info;
use utils::progress::ProgressUpdater;
use utils::ThreadPool;

use crate::data_processing::CASDataAggregator;
use crate::errors::DataProcessingError::*;
use crate::errors::*;

const DEFAULT_NETWORK_STAT_REPORT_INTERVAL_SEC: u32 = 2; // 2 s
Copy link
Contributor

@port8080 port8080 Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably have this set low for testing, would prefer larger in prod - maybe 100s


#[async_trait]
pub(crate) trait XorbUpload {
/// Register a block of data ready for upload and dedup, return the hash of the produced xorb.
Expand All @@ -23,7 +27,75 @@ pub(crate) trait XorbUpload {
async fn flush(&self) -> Result<()>;
}

type XorbUploadValueType = (MerkleHash, Vec<u8>, Vec<(MerkleHash, usize)>);
struct NetworkStatCheckPoint {
n_bytes: u64,
start: Instant,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found using std::time::SystemTime more useful in sending these over compared to Instant in #121

}

impl Default for NetworkStatCheckPoint {
fn default() -> Self {
Self {
n_bytes: 0,
start: Instant::now(),
}
}
}

struct NetworkStat {
accumulated: NetworkStatCheckPoint,
last_check_point: NetworkStatCheckPoint,
report_interval: Duration,
}

impl NetworkStat {
fn new(report_interval_sec: u32) -> Self {
Self {
accumulated: Default::default(),
last_check_point: Default::default(),
report_interval: Duration::from_secs(report_interval_sec.into()),
}
}

fn update_and_report(&mut self, metrics: &UploadMetrics, what: &str) {
self.accumulated.n_bytes += metrics.n_bytes as u64;
let now = Instant::now();
if now.duration_since(self.last_check_point.start) >= self.report_interval {
Self::report_rate(&format!("{what} accumulated"), self.accumulated.n_bytes, self.accumulated.start, now);
Self::report_rate(
&format!("{what} instantaneous"),
self.accumulated.n_bytes - self.last_check_point.n_bytes,
self.last_check_point.start,
now,
);
self.last_check_point = NetworkStatCheckPoint {
n_bytes: self.accumulated.n_bytes,
start: Instant::now(),
};
}
}

fn report_rate(what: &str, n_bytes: u64, start: Instant, end: Instant) {
const RATE_UNIT: [(f64, &str); 3] = [(1e9, "Gbps"), (1e6, "Mbps"), (1e3, "Kbps")];

let duration = end.duration_since(start);

if n_bytes == 0 {
info!("{what} rate: 0 bps");
}

let bps = n_bytes as f64 * 8. / duration.as_secs_f64();

for (base, unit) in RATE_UNIT {
let curr = bps / base;
if curr > 1. {
info!("{what} rate: {curr:.2} {unit}");
return;
}
}

info!("{what} rate: {bps:.2} bps");
}
}

/// Helper to parallelize xorb upload and registration.
/// Calls to registering xorbs return immediately after computing a xorb hash so callers
Expand All @@ -40,14 +112,17 @@ pub(crate) struct ParallelXorbUploader {
cas: Arc<dyn Client + Send + Sync>,

// Internal worker
upload_tasks: Mutex<JoinSet<Result<()>>>,
upload_tasks: Mutex<JoinSet<Result<UploadMetrics>>>,

// Rate limiter
rate_limiter: Arc<Semaphore>,

// Theadpool
threadpool: Arc<ThreadPool>,

// Network metrics
egress_stat: Mutex<NetworkStat>,

// Upload Progress
upload_progress_updater: Option<Arc<dyn ProgressUpdater>>,
}
Expand All @@ -68,14 +143,17 @@ impl ParallelXorbUploader {
upload_tasks: Mutex::new(JoinSet::new()),
rate_limiter,
threadpool,
egress_stat: Mutex::new(NetworkStat::new(DEFAULT_NETWORK_STAT_REPORT_INTERVAL_SEC)), // report every 2 s
upload_progress_updater,
})
}

async fn status_is_ok(&self) -> Result<()> {
let mut upload_tasks = self.upload_tasks.lock().await;
while let Some(result) = upload_tasks.try_join_next() {
result??;
let metrics = result??;
let mut egress_rate = self.egress_stat.lock().await;
egress_rate.update_and_report(&metrics, "Xorb upload");
}

Ok(())
Expand Down Expand Up @@ -139,24 +217,28 @@ impl XorbUpload for ParallelXorbUploader {
let mut upload_tasks = self.upload_tasks.lock().await;

while let Some(result) = upload_tasks.join_next().await {
result??;
let metrics = result??;
let mut egress_rate = self.egress_stat.lock().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try the lock and optimistically report. I think it is completely OK to skip the update_and_report call if there is lock contention

egress_rate.update_and_report(&metrics, "Xorb upload");
}

Ok(())
}
}

type XorbUploadValueType = (MerkleHash, Vec<u8>, Vec<(MerkleHash, usize)>);

async fn upload_and_register_xorb(
item: XorbUploadValueType,
shard_manager: Arc<ShardFileManager>,
cas: Arc<dyn Client + Send + Sync>,
cas_prefix: String,
) -> Result<()> {
) -> Result<UploadMetrics> {
let (cas_hash, data, chunks) = item;

let raw_bytes_len = data.len();
// upload xorb
{
let metrics = {
let mut pos = 0;
let chunk_and_boundaries = chunks
.iter()
Expand All @@ -165,8 +247,8 @@ async fn upload_and_register_xorb(
(*hash, pos as u32)
})
.collect();
cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await?;
}
cas.put(&cas_prefix, &cas_hash, data, chunk_and_boundaries).await?
};

// register for dedup
// This should happen after uploading xorb above succeeded so not to
Expand All @@ -188,5 +270,5 @@ async fn upload_and_register_xorb(
shard_manager.add_cas_block(cas_info).await?;
}

Ok(())
Ok(metrics)
}
4 changes: 2 additions & 2 deletions data/src/test_utils/local_test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use cas_client::tests_utils::*;
use cas_client::{CasClientError, Client, LocalClient, ReconstructionClient, UploadClient};
use cas_client::{CasClientError, Client, LocalClient, ReconstructionClient, UploadClient, UploadMetrics};
use cas_types::FileRange;
use mdb_shard::shard_file_reconstructor::FileReconstructor;
use mdb_shard::ShardFileManager;
Expand Down Expand Up @@ -41,7 +41,7 @@ impl UploadClient for LocalTestClient {
hash: &MerkleHash,
data: Vec<u8>,
chunk_and_boundaries: Vec<(MerkleHash, u32)>,
) -> Result<(), CasClientError> {
) -> Result<UploadMetrics, CasClientError> {
self.cas.put(prefix, hash, data, chunk_and_boundaries).await
}

Expand Down