Skip to content

Commit

Permalink
re-introduce minimum viable floor, lean further into optional blocks
Browse files Browse the repository at this point in the history
Signed-off-by: Brian L. Troutwine <[email protected]>
  • Loading branch information
blt committed May 2, 2024
1 parent bfe8ecd commit f5303a4
Showing 1 changed file with 33 additions and 54 deletions.
87 changes: 33 additions & 54 deletions lading_payload/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! from that, decoupling the create/send operations. This module is the
//! mechanism by which 'blocks' -- that is, byte blobs of a predetermined size
//! -- are created.
use std::{collections::VecDeque, num::NonZeroU32};
use std::num::NonZeroU32;

use bytes::{buf::Writer, BufMut, Bytes, BytesMut};
use rand::Rng;
Expand Down Expand Up @@ -489,16 +489,20 @@ fn chunk_bytes<const N: usize>(
fn construct_block_cache_inner<R, S>(
mut rng: &mut R,
serializer: &S,
initial_block_chunks: &[u32],
block_chunks: &[u32],
total_bytes: u32,
) -> Result<Vec<Block>, SpinError>
where
S: crate::Serialize,
R: Rng + ?Sized,
{
let mut block_cache: Vec<Block> = Vec::new();
let mut block_cache: Vec<Block> = Vec::with_capacity(128);
let mut bytes_remaining = total_bytes;
let mut block_sizes = VecDeque::from(initial_block_chunks.to_vec());
let mut min_viable_size = 0;
let max_viable_size: u32 = *block_chunks
.iter()
.max()
.expect("empty block chunks, catastrophic bug");

// Build out the blocks.
//
Expand All @@ -509,74 +513,49 @@ where
// objective and iterate over these, dynamically constructing new block
// sizes +/- 20% if the block construction fails because of insufficient
// space to serialize.
while bytes_remaining > 0 {
let block_size = rng.gen_range(min_viable_size..=max_viable_size);

while bytes_remaining > 0 && !block_sizes.is_empty() {
let block_size = block_sizes
.pop_front()
.expect("impossible condition, catastrophic bug");

if let Ok(block) = construct_block(&mut rng, serializer, block_size) {
bytes_remaining = bytes_remaining.saturating_sub(block.total_bytes.get());
block_cache.push(block);
} else {
let adjusted_size = adjust_block_size(rng, block_size);
if adjusted_size < bytes_remaining {
block_sizes.push_back(adjusted_size);
match construct_block(&mut rng, serializer, block_size) {
Ok(block) => {
bytes_remaining = bytes_remaining.saturating_sub(block.total_bytes.get());
block_cache.push(block);
}
Err(SpinError::EmptyBlock) => {
min_viable_size = block_size;
}
Err(e) => {
error!("Unexpected error during block construction: {e}");
return Err(e);
}
}

if bytes_remaining < min_viable_size {
break;
}
}

// Instrument the results of the block construction.

if block_cache.is_empty() {
error!("Empty block cache, unable to construct blocks!");
Err(SpinError::ConstructBlockCache(
ConstructBlockCacheError::InsufficientBlockSizes,
))
} else {
// "Capacity" here refers to how much data the block_chunks can hold.
// Note that this may not match the end-user's requested pregen-cache-size.
// This function is only concerned with filling up the chunks requested.
let capacity_sum = initial_block_chunks.iter().sum::<u32>();
let filled_sum = block_cache.iter().map(|b| b.total_bytes.get()).sum::<u32>();

if filled_sum == capacity_sum {
info!(
num_blocks = block_cache.len(),
filled_byte_sum = filled_sum,
capacity_sum = capacity_sum,
"Block cache constructed."
);
} else {
let filled_sum_str = byte_unit::Byte::from_bytes(filled_sum.into())
.get_appropriate_unit(false)
.to_string();
let capacity_sum_str = byte_unit::Byte::from_bytes(capacity_sum.into())
.get_appropriate_unit(false)
.to_string();
warn!(
"Filled {filled_sum_str} only. Could not fill up to the chunk capacity of {capacity_sum_str}."
);
}
let filled_sum_str = byte_unit::Byte::from_bytes(filled_sum.into())
.get_appropriate_unit(false)
.to_string();
let capacity_sum_str = byte_unit::Byte::from_bytes(total_bytes.into())
.get_appropriate_unit(false)
.to_string();
info!("Filled {filled_sum_str} of requested {capacity_sum_str}.");

Ok(block_cache)
}
}

/// Dynamically adjusts the block size based on discovered limits.
#[inline]
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
fn adjust_block_size<R>(rng: &mut R, current_size: u32) -> u32
where
R: Rng + ?Sized,
{
// Define an upper and lower bound on new sizes. +/- 20% is arbitrary.
let lower_bound = (f64::from(current_size) * 0.8) as u32;
let upper_bound = (f64::from(current_size) * 1.2) as u32;

rng.gen_range(lower_bound..=upper_bound)
}

/// Construct a new block
///
/// # Panics
Expand Down

0 comments on commit f5303a4

Please sign in to comment.