Skip to content

Commit

Permalink
Rework get logic with versions a bit..
Browse files Browse the repository at this point in the history
  • Loading branch information
rcmgleite committed Jun 17, 2024
1 parent e4c0221 commit 2679b6d
Showing 1 changed file with 26 additions and 28 deletions.
54 changes: 26 additions & 28 deletions src/persistency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,18 +278,7 @@ impl Db {
pub async fn get(&self, key: Bytes, replica: bool) -> Result<Option<(Bytes, Bytes)>> {
if replica {
event!(Level::DEBUG, "Executing a replica GET");

// FIXME: Horrible logic
let value_and_metadata = self.storage_engine.get(&key).await?;
let metadata_value_option = value_and_metadata.map(|mut value_and_metadata| {
let metadata_size = value_and_metadata.get_u32();
let metadata =
Bytes::copy_from_slice(&value_and_metadata[0..metadata_size as usize]);
let value = Bytes::copy_from_slice(&value_and_metadata[metadata_size as usize..]);
(metadata, value)
});

Ok(metadata_value_option)
self.into_metadata_and_data(self.storage_engine.get(&key).await?)
} else {
event!(Level::INFO, "executing a non-replica GET");
let quorum_config = self.cluster_state.quorum_config();
Expand Down Expand Up @@ -330,19 +319,7 @@ impl Db {

let quorum_result = quorum.finish();
match quorum_result.evaluation {
Evaluation::Reached => {
let value_and_metadata = quorum_result.successes[0].clone();
let metadata_value_option = value_and_metadata.map(|mut value_and_metadata| {
let metadata_size = value_and_metadata.get_u32();
let metadata =
Bytes::copy_from_slice(&value_and_metadata[0..metadata_size as usize]);
let value =
Bytes::copy_from_slice(&value_and_metadata[metadata_size as usize..]);
(metadata, value)
});

Ok(metadata_value_option)
}
Evaluation::Reached => Ok(quorum_result.successes[0].clone()),
Evaluation::NotReached | Evaluation::Unreachable => {
if quorum_result.failures.iter().all(|err| err.is_not_found()) {
return Err(Error::NotFound { key: key.clone() });
Expand All @@ -360,14 +337,14 @@ impl Db {
}
}

async fn do_get(&self, key: Bytes, src_addr: Bytes) -> Result<Option<Bytes>> {
async fn do_get(&self, key: Bytes, src_addr: Bytes) -> Result<Option<(Bytes, Bytes)>> {
if self.owns_key(&src_addr)? {
event!(
Level::INFO,
"node is part of preference_list {:?}",
src_addr
);
Ok(self.storage_engine.get(&key).await?)
self.into_metadata_and_data(self.storage_engine.get(&key).await?)
} else {
event!(
Level::INFO,
Expand All @@ -390,10 +367,31 @@ impl Db {
client.connect().await?;

let resp = client.get(key.clone(), true).await?;
Ok(Some(resp.value))
// FIXME: remove unwrap()
Ok(Some((
hex::decode(resp.metadata).unwrap().into(),
resp.value,
)))
}
}

// This function is very poorly written, has bad error handling, performs bad and is not readable.
// we will fix it in due time ;)
fn into_metadata_and_data(
&self,
metadata_and_data: Option<Bytes>,
) -> Result<Option<(Bytes, Bytes)>> {
let value_and_metadata = metadata_and_data;
let metadata_value_option = value_and_metadata.map(|mut value_and_metadata| {
let metadata_size = value_and_metadata.get_u32();
let metadata = Bytes::copy_from_slice(&value_and_metadata[0..metadata_size as usize]);
let value = Bytes::copy_from_slice(&value_and_metadata[metadata_size as usize..]);
(metadata, value)
});

Ok(metadata_value_option)
}

/// Verifies if the key provided is owned by self.
pub fn owns_key(&self, key: &[u8]) -> Result<bool> {
self.cluster_state.owns_key(key)
Expand Down

0 comments on commit 2679b6d

Please sign in to comment.