Skip to content

Commit

Permalink
Fix links structures
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Sep 5, 2023
1 parent a17334c commit 42d9d56
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 31 deletions.
78 changes: 50 additions & 28 deletions async-nats/src/jetstream/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,29 @@ impl ObjectStore {
{
Box::pin(async move {
let object_info = self.info(object_name).await?;
if let Some(link) = object_info.link.as_ref() {
if let Some(link_name) = link.name.as_ref() {
let link_name = link_name.clone();
debug!("getting object via link");
if link.bucket == self.name {
return self.get(link_name).await;
if let Some(ref options) = object_info.options {
if let Some(link) = options.link.as_ref() {
if let Some(link_name) = link.name.as_ref() {
let link_name = link_name.clone();
debug!("getting object via link");
if link.bucket == self.name {
return self.get(link_name).await;
} else {
let bucket = self
.stream
.context
.get_object_store(&link_name)
.await
.map_err(|err| GetError::with_source(GetErrorKind::Other, err))?;
let object = bucket.get(&link_name).await?;
return Ok(object);
}
} else {
let bucket = self
.stream
.context
.get_object_store(&link_name)
.await
.map_err(|err| GetError::with_source(GetErrorKind::Other, err))?;
let object = bucket.get(&link_name).await?;
return Ok(object);
return Err(GetError::new(GetErrorKind::BucketLink));
}
} else {
return Err(GetError::new(GetErrorKind::BucketLink));
}
}

debug!("not a link. Getting the object");
Ok(Object::new(object_info, self.stream.clone()))
})
Expand Down Expand Up @@ -335,7 +338,7 @@ impl ObjectStore {
let object_info = ObjectInfo {
name: object_meta.name,
description: object_meta.description,
link: None,
options: None,
bucket: self.name.clone(),
nuid: object_nuid.to_string(),
chunks: object_chunks,
Expand Down Expand Up @@ -669,13 +672,18 @@ impl ObjectStore {
if object.deleted {
return Err(AddLinkError::new(AddLinkErrorKind::Deleted));
}
if object.link.is_some() {
return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
if let Some(ref options) = object.options {
if options.link.is_some() {
return Err(AddLinkError::new(AddLinkErrorKind::LinkToLink));
}
}

match self.info(&name).await {
Ok(info) => {
if info.link.is_none() {
if let Some(options) = info.options {
if options.link.is_none() {
return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
}
} else {
return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
}
}
Expand All @@ -688,9 +696,12 @@ impl ObjectStore {
let info = ObjectInfo {
name,
description: None,
link: Some(ObjectLink {
name: Some(object.name.clone()),
bucket: object.bucket.clone(),
options: Some(ObjectOptions {
link: Some(ObjectLink {
name: Some(object.name.clone()),
bucket: object.bucket.clone(),
}),
max_chunk_size: None,
}),
bucket: self.name.clone(),
nuid: nuid::next().to_string(),
Expand Down Expand Up @@ -736,8 +747,10 @@ impl ObjectStore {

match self.info(&name).await {
Ok(info) => {
if info.link.is_none() {
return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
if let Some(options) = info.options {
if options.link.is_none() {
return Err(AddLinkError::new(AddLinkErrorKind::AlreadyExists));
}
}
}
Err(err) if err.kind() != InfoErrorKind::NotFound => {
Expand All @@ -749,7 +762,10 @@ impl ObjectStore {
let info = ObjectInfo {
name: name.clone(),
description: None,
link: Some(ObjectLink { name: None, bucket }),
options: Some(ObjectOptions {
link: Some(ObjectLink { name: None, bucket }),
max_chunk_size: None,
}),
bucket: self.name.clone(),
nuid: nuid::next().to_string(),
size: 0,
Expand Down Expand Up @@ -1030,6 +1046,12 @@ impl tokio::io::AsyncRead for Object<'_> {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectOptions {
pub link: Option<ObjectLink>,
pub max_chunk_size: Option<u64>,
}

/// Meta and instance information about an object.
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct ObjectInfo {
Expand All @@ -1038,7 +1060,7 @@ pub struct ObjectInfo {
/// A short human readable description of the object.
pub description: Option<String>,
/// Link this object points to, if any.
pub link: Option<ObjectLink>,
pub options: Option<ObjectOptions>,
/// Name of the bucket the object is stored in.
pub bucket: String,
/// Unique identifier used to uniquely identify this version of the object.
Expand Down
39 changes: 36 additions & 3 deletions async-nats/tests/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,9 @@ mod object_store {

assert_eq!(
link_info
.options
.as_ref()
.unwrap()
.link
.as_ref()
.unwrap()
Expand All @@ -511,7 +514,18 @@ mod object_store {
.as_str(),
"object"
);
assert_eq!(link_info.link.as_ref().unwrap().bucket.as_str(), "bucket");
assert_eq!(
link_info
.options
.as_ref()
.unwrap()
.link
.as_ref()
.unwrap()
.bucket
.as_str(),
"bucket"
);

let result = bucket
.add_link("object", &another_object)
Expand Down Expand Up @@ -551,7 +565,26 @@ mod object_store {
bucket.add_bucket_link("link", "another").await.unwrap();

let link_info = bucket.info("link").await.unwrap();
assert!(link_info.link.as_ref().unwrap().name.is_none());
assert_eq!(link_info.link.as_ref().unwrap().bucket.as_str(), "another");
assert!(link_info
.options
.as_ref()
.unwrap()
.link
.as_ref()
.unwrap()
.name
.is_none());
assert_eq!(
link_info
.options
.as_ref()
.unwrap()
.link
.as_ref()
.unwrap()
.bucket
.as_str(),
"another"
);
}
}

0 comments on commit 42d9d56

Please sign in to comment.