Skip to content

Commit

Permalink
Add customization of Object Store chunk size
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Sep 6, 2023
1 parent 42d9d56 commit 891f025
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 17 deletions.
2 changes: 2 additions & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,5 @@ filter_subjects
rollup
IoT
ObjectMeta
128k
ObjectMetadata
43 changes: 29 additions & 14 deletions async-nats/src/jetstream/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ impl ObjectStore {
data: &mut (impl tokio::io::AsyncRead + std::marker::Unpin),
) -> Result<ObjectInfo, PutError>
where
ObjectMeta: From<T>,
ObjectMetadata: From<T>,
{
let object_meta: ObjectMeta = meta.into();
let object_meta: ObjectMetadata = meta.into();

let encoded_object_name = encode_object_name(&object_meta.name);
if !is_valid_object_name(&encoded_object_name) {
Expand All @@ -296,7 +296,8 @@ impl ObjectStore {
let mut object_chunks = 0;
let mut object_size = 0;

let mut buffer = BytesMut::with_capacity(DEFAULT_CHUNK_SIZE);
let chunk_size = object_meta.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
let mut buffer = BytesMut::with_capacity(chunk_size);
let mut context = ring::digest::Context::new(&SHA256);

loop {
Expand Down Expand Up @@ -338,7 +339,10 @@ impl ObjectStore {
let object_info = ObjectInfo {
name: object_meta.name,
description: object_meta.description,
options: None,
options: Some(ObjectOptions {
max_chunk_size: Some(chunk_size),
link: None,
}),
bucket: self.name.clone(),
nuid: object_nuid.to_string(),
chunks: object_chunks,
Expand Down Expand Up @@ -518,7 +522,7 @@ impl ObjectStore {
Ok(())
}

/// Updates [Object] [ObjectMeta].
/// Updates [Object] [ObjectMetadata].
///
/// # Examples
///
Expand All @@ -533,7 +537,7 @@ impl ObjectStore {
/// bucket
/// .update_metadata(
/// "object",
/// object_store::ObjectMeta {
/// object_store::UpdateMetadata {
/// name: "new_name".to_string(),
/// description: Some("a new description".to_string()),
/// },
Expand All @@ -545,7 +549,7 @@ impl ObjectStore {
pub async fn update_metadata<A: AsRef<str>>(
&self,
object: A,
metadata: ObjectMeta,
metadata: UpdateMetadata,
) -> Result<ObjectInfo, UpdateMetadataError> {
let mut info = self.info(object.as_ref()).await?;

Expand Down Expand Up @@ -1049,7 +1053,7 @@ impl tokio::io::AsyncRead for Object<'_> {
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectOptions {
pub link: Option<ObjectLink>,
pub max_chunk_size: Option<u64>,
pub max_chunk_size: Option<usize>,
}

/// Meta and instance information about an object.
Expand Down Expand Up @@ -1093,18 +1097,28 @@ pub struct ObjectLink {
pub bucket: String,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct UpdateMetadata {
/// Name of the object
pub name: String,
/// A short human readable description of the object.
pub description: Option<String>,
}

/// Meta information about an object.
#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectMeta {
pub struct ObjectMetadata {
/// Name of the object
pub name: String,
/// A short human readable description of the object.
pub description: Option<String>,
/// Max chunk size. Default is 128k.
pub chunk_size: Option<usize>,
}

impl From<&str> for ObjectMeta {
fn from(s: &str) -> ObjectMeta {
ObjectMeta {
impl From<&str> for ObjectMetadata {
fn from(s: &str) -> ObjectMetadata {
ObjectMetadata {
name: s.to_string(),
..Default::default()
}
Expand All @@ -1126,11 +1140,12 @@ impl AsObjectInfo for &ObjectInfo {
}
}

impl From<ObjectInfo> for ObjectMeta {
impl From<ObjectInfo> for ObjectMetadata {
fn from(info: ObjectInfo) -> Self {
ObjectMeta {
ObjectMetadata {
name: info.name,
description: info.description,
chunk_size: None,
}
}
}
Expand Down
25 changes: 22 additions & 3 deletions async-nats/tests/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod object_store {
use std::{io, time::Duration};

use async_nats::jetstream::{
object_store::{AddLinkErrorKind, ObjectMeta},
object_store::{AddLinkErrorKind, ObjectMetadata, UpdateMetadata},
stream::DirectGetErrorKind,
};
use base64::Engine;
Expand Down Expand Up @@ -81,6 +81,24 @@ mod object_store {
tracing::info!("reading content");
object_link.read_to_end(&mut contents).await.unwrap();
assert_eq!(contents, result);

bucket
.put(
ObjectMetadata {
name: "BAR".to_string(),
description: Some("custom object".to_string()),
chunk_size: Some(64 * 1024),
},
&mut bytes.as_slice(),
)
.await
.unwrap();

let meta = bucket.get("BAR").await.unwrap();
assert_eq!(
64 * 1024,
meta.info.options.unwrap().max_chunk_size.unwrap()
);
}

#[tokio::test]
Expand Down Expand Up @@ -353,9 +371,10 @@ mod object_store {
.unwrap();
bucket
.put(
ObjectMeta {
ObjectMetadata {
name: "Foo".to_string(),
description: Some("foo desc".to_string()),
chunk_size: None,
},
&mut "dadada".as_bytes(),
)
Expand Down Expand Up @@ -436,7 +455,7 @@ mod object_store {
.await
.unwrap();

let given_metadata = ObjectMeta {
let given_metadata = UpdateMetadata {
name: "new_object".to_owned(),
description: Some("description".to_string()),
};
Expand Down

0 comments on commit 891f025

Please sign in to comment.