Skip to content

Commit

Permalink
WIP - so many changes... trying to make sure the conflict resolution …
Browse files Browse the repository at this point in the history
…impl makes sense...
  • Loading branch information
rcmgleite committed Jul 3, 2024
1 parent b83c31c commit ce8ba68
Show file tree
Hide file tree
Showing 19 changed files with 845 additions and 458 deletions.
36 changes: 25 additions & 11 deletions src/client/db_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A concrete [`Client`] implementation for rldb
use async_trait::async_trait;
use bytes::Bytes;
use rand::{distributions::Alphanumeric, Rng};
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tracing::{event, Level};
Expand Down Expand Up @@ -67,22 +68,22 @@ impl Client for DbClient {
}

async fn ping(&mut self) -> Result<PingResponse> {
let ping_cmd = cmd::ping::Ping;
let ping_cmd = cmd::ping::Ping::new(generate_request_id());
let req = Message::from(ping_cmd).serialize();

let conn = self.get_conn_mut()?;
conn.write_all(&req).await?;

let response = Message::try_from_async_read(conn).await?;
event!(Level::DEBUG, "{:?}", response.payload.as_ref().unwrap());
event!(Level::TRACE, "{:?}", response.payload.as_ref().unwrap());
serde_json::from_slice(&response.payload.unwrap())?
}

async fn get(&mut self, key: Bytes, replica: bool) -> Result<GetResponse> {
async fn get(&mut self, key: Bytes, replica: bool) -> Result<Vec<GetResponse>> {
let get_cmd = if replica {
cmd::get::Get::new_replica(key)
cmd::get::Get::new_replica(key, generate_request_id())
} else {
cmd::get::Get::new(key)
cmd::get::Get::new(key, generate_request_id())
};

let req = Message::from(get_cmd).serialize();
Expand All @@ -102,9 +103,9 @@ impl Client for DbClient {
replication: bool,
) -> Result<PutResponse> {
let put_cmd = if replication {
cmd::put::Put::new_replication(key, value, metadata)
cmd::put::Put::new_replication(key, value, metadata, generate_request_id())
} else {
cmd::put::Put::new(key, value, metadata)
cmd::put::Put::new(key, value, metadata, generate_request_id())
};
let req = Message::from(put_cmd).serialize();

Expand All @@ -114,7 +115,7 @@ impl Client for DbClient {
let response = Message::try_from_async_read(conn).await?;

event!(
Level::DEBUG,
Level::TRACE,
"put response: {:?}",
response.payload.as_ref().unwrap()
);
Expand All @@ -123,7 +124,7 @@ impl Client for DbClient {
}

async fn heartbeat(&mut self, known_nodes: Vec<Node>) -> Result<HeartbeatResponse> {
let cmd = cmd::cluster::heartbeat::Heartbeat::new(known_nodes);
let cmd = cmd::cluster::heartbeat::Heartbeat::new(known_nodes, generate_request_id());
let req = Message::from(cmd).serialize();

let conn = self.get_conn_mut()?;
Expand All @@ -137,7 +138,10 @@ impl Client for DbClient {
&mut self,
known_cluster_node_addr: String,
) -> Result<JoinClusterResponse> {
let cmd = cmd::cluster::join_cluster::JoinCluster::new(known_cluster_node_addr);
let cmd = cmd::cluster::join_cluster::JoinCluster::new(
known_cluster_node_addr,
generate_request_id(),
);
let req = Message::from(cmd).serialize();

let conn = self.get_conn_mut()?;
Expand All @@ -148,7 +152,7 @@ impl Client for DbClient {
}

async fn cluster_state(&mut self) -> Result<ClusterStateResponse> {
let cmd = cmd::cluster::cluster_state::ClusterState;
let cmd = cmd::cluster::cluster_state::ClusterState::new(generate_request_id());
let req = Message::from(cmd).serialize();

let conn = self.get_conn_mut()?;
Expand All @@ -170,3 +174,13 @@ impl Factory for DbClientFactory {
Ok(Box::new(client))
}
}

// dummy function to generate request ids.. probably better to change this to uuid or some other good
// requestid type
fn generate_request_id() -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect()
}
8 changes: 4 additions & 4 deletions src/client/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
put::PutResponse,
},
error::{Error, Result},
persistency::{metadata_evaluation, Metadata, MetadataEvaluation},
persistency::{storage::metadata_evaluation, Metadata, MetadataEvaluation},
storage_engine::{in_memory::InMemory, StorageEngine},
test_utils::fault::{Fault, When},
};
Expand Down Expand Up @@ -98,17 +98,17 @@ impl Client for MockClient {
async fn ping(&mut self) -> Result<PingResponse> {
todo!()
}
async fn get(&mut self, key: Bytes, _replica: bool) -> Result<GetResponse> {
async fn get(&mut self, key: Bytes, _replica: bool) -> Result<Vec<GetResponse>> {
let storage_guard = self.storage.lock().await;
let metadata = storage_guard.metadata_engine.get(&key).await.unwrap();
let data = storage_guard.storage_engine.get(&key).await.unwrap();
match (metadata, data) {
(None, None) => Err(Error::NotFound { key }),
(None, Some(_)) | (Some(_), None) => panic!("should never happen"),
(Some(metadata), Some(data)) => Ok(GetResponse {
(Some(metadata), Some(data)) => Ok(vec![GetResponse {
value: data,
metadata: String::from_utf8(hex::encode(metadata).into_bytes()).unwrap(),
}),
}]),
}
}
async fn put(
Expand Down
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub trait Client {
/// Ping command interface
async fn ping(&mut self) -> Result<PingResponse>;
/// Get command interface
async fn get(&mut self, key: Bytes, replica: bool) -> Result<GetResponse>;
async fn get(&mut self, key: Bytes, replica: bool) -> Result<Vec<GetResponse>>;
/// Put command interface
async fn put(
&mut self,
Expand Down
10 changes: 5 additions & 5 deletions src/cluster/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn start_heartbeat(cluster_state: Arc<State>, config: HeartbeatConfig)
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(config.interval as u64)).await;

let span = span!(Level::DEBUG, "heartbeat_loop", addr=?cluster_state.own_addr());
let span = span!(Level::TRACE, "heartbeat_loop", addr=?cluster_state.own_addr());
do_heartbeat(
config.fanout,
cluster_state.clone(),
Expand All @@ -62,7 +62,7 @@ pub async fn start_heartbeat(cluster_state: Arc<State>, config: HeartbeatConfig)
.instrument(span)
.await;

event!(Level::DEBUG, "heartbeat cycle finished {:?}", cluster_state);
event!(Level::TRACE, "heartbeat cycle finished {:?}", cluster_state);
}
}

Expand All @@ -79,7 +79,7 @@ async fn do_heartbeat_to_node(
client_factory: Arc<dyn ClientFactory + Send + Sync + 'static>,
cluster_connections: ClusterConnectionsMap,
) -> Result<HeartbeatResult> {
event!(Level::DEBUG, "heartbeating to node: {:?}", target_node);
event!(Level::TRACE, "heartbeating to node: {:?}", target_node);

let client = cluster_connections
.lock()
Expand Down Expand Up @@ -112,7 +112,7 @@ async fn do_heartbeat_to_node(

if let Err(err) = client.heartbeat(known_nodes).await {
event!(
Level::WARN,
Level::DEBUG,
"Unable to heartbeat to node {:?} - err {:?}",
target_node,
err
Expand All @@ -128,7 +128,7 @@ async fn do_heartbeat_to_node(
Err(err.into())
} else {
event!(
Level::DEBUG,
Level::TRACE,
"Successfully heartbeated to node {:?}",
target_node,
);
Expand Down
12 changes: 11 additions & 1 deletion src/cmd/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ pub const CMD_CLUSTER_CLUSTER_STATE: u32 = 102;

/// ClusterState deserialized [`crate::cmd::Command`]
#[derive(Default, Serialize, Deserialize)]
pub struct ClusterState;
pub struct ClusterState {
request_id: String,
}

impl ClusterState {
pub fn new(request_id: String) -> Self {
Self { request_id }
}

/// Executes a [`ClusterState`] command.
pub async fn execute(self, db: Arc<Db>) -> Result<ClusterStateResponse> {
let cluster_state = db.cluster_state()?;
Expand All @@ -34,6 +40,10 @@ impl IntoMessage for ClusterState {
Self::cmd_id()
}

fn request_id(&self) -> String {
self.request_id.clone()
}

fn payload(&self) -> Option<Bytes> {
Some(Bytes::from(serde_json::to_string(self).unwrap()))
}
Expand Down
9 changes: 7 additions & 2 deletions src/cmd/cluster/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ pub const CMD_CLUSTER_HEARTBEAT: u32 = 100;
#[derive(Serialize, Deserialize)]
pub struct Heartbeat {
nodes: Vec<Node>,
request_id: String,
}

impl Heartbeat {
/// Constructs a new heartbeat [`crate::cmd::Command`]
pub fn new(nodes: Vec<Node>) -> Self {
Self { nodes }
pub fn new(nodes: Vec<Node>, request_id: String) -> Self {
Self { nodes, request_id }
}

/// Executes a [`Heartbeat`] command
Expand All @@ -50,6 +51,10 @@ impl IntoMessage for Heartbeat {
Self::cmd_id()
}

fn request_id(&self) -> String {
self.request_id.clone()
}

fn payload(&self) -> Option<Bytes> {
Some(Bytes::from(serde_json::to_string(self).unwrap()))
}
Expand Down
14 changes: 11 additions & 3 deletions src/cmd/cluster/join_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,25 @@ use std::sync::Arc;

use bytes::Bytes;
use serde::{Deserialize, Serialize};
use tracing::{event, Level};

use crate::{cluster::state::Node, error::Result, persistency::Db, server::message::IntoMessage};

pub const CMD_CLUSTER_JOIN_CLUSTER: u32 = 101;

/// JoinCluster deserialized [`crate::cmd::Command`]
#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct JoinCluster {
known_cluster_node_addr: String,
request_id: String,
}

impl JoinCluster {
pub fn new(known_cluster_node_addr: String) -> Self {
pub fn new(known_cluster_node_addr: String, request_id: String) -> Self {
event!(Level::DEBUG, "request_id {}", request_id);
Self {
known_cluster_node_addr,
request_id,
}
}

Expand Down Expand Up @@ -50,13 +54,17 @@ impl IntoMessage for JoinCluster {
Self::cmd_id()
}

fn request_id(&self) -> String {
self.request_id.clone()
}

fn payload(&self) -> Option<Bytes> {
Some(Bytes::from(serde_json::to_string(self).unwrap()))
}
}

/// Deserialized [`JoinCluster`] response payload.
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct JoinClusterResponse {
message: String,
}
29 changes: 21 additions & 8 deletions src/cmd/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,37 @@ pub struct Get {
#[serde(with = "serde_utf8_bytes")]
key: Bytes,
replica: bool,
request_id: String,
}

impl Get {
/// Constructs a new [`Get`] instance
pub fn new(key: Bytes) -> Self {
pub fn new(key: Bytes, request_id: String) -> Self {
Self {
key,
replica: false,
request_id,
}
}

pub fn new_replica(key: Bytes) -> Self {
Self { key, replica: true }
pub fn new_replica(key: Bytes, request_id: String) -> Self {
Self {
key,
replica: true,
request_id,
}
}

/// Executes the [`Get`] command using the specified [`Db`] instance
pub async fn execute(self, db: Arc<Db>) -> Result<GetResponse> {
pub async fn execute(self, db: Arc<Db>) -> Result<Vec<GetResponse>> {
if let Some(resp) = db.get(self.key.clone(), self.replica).await? {
Ok(GetResponse {
value: resp.1,
metadata: hex::encode(resp.0),
})
Ok(resp
.into_iter()
.map(|entry| GetResponse {
value: entry.value,
metadata: hex::encode(entry.metadata.serialize()),
})
.collect())
} else {
Err(Error::NotFound { key: self.key })
}
Expand All @@ -58,6 +67,10 @@ impl IntoMessage for Get {
fn payload(&self) -> Option<Bytes> {
Some(Bytes::from(serde_json::to_string(self).unwrap()))
}

fn request_id(&self) -> String {
self.request_id.clone()
}
}

/// The struct that represents a [`Get`] response payload
Expand Down
Loading

0 comments on commit ce8ba68

Please sign in to comment.