Skip to content

Commit

Permalink
GroveDBG v1.1.0 (#340)
Browse files Browse the repository at this point in the history
Update GroveDBG version with protocol changes
  • Loading branch information
fominok authored Sep 24, 2024
1 parent cbe10b6 commit fd2f134
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 50 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ let's say 10000, the following snippet should do:

```rust
let db = Arc::new(GroveDb::open("db").unwrap());
db.start_visualzier(10000);
db.start_visualizer(10000);
```

Just remember to use Arc because the HTTP server might outlast the GroveDB instance.
Expand Down
4 changes: 3 additions & 1 deletion grovedb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ indexmap = "2.2.6"
intmap = { version = "2.0.0", optional = true }
grovedb-path = { version = "2.0.3", path = "../path" }
grovedbg-types = { version = "2.0.3", path = "../grovedbg-types", optional = true }
tokio = { version = "1.37.0", features = ["rt-multi-thread", "net"], optional = true }
tokio = { version = "1.40.0", features = ["rt-multi-thread", "net"], optional = true }
axum = { version = "0.7.5", features = ["macros"], optional = true }
tower-http = { version = "0.5.2", features = ["fs"], optional = true }
blake3 = "1.4.0"
bitvec = "1"
zip-extensions = { version ="0.6.2", optional = true }
grovedb-version = { path = "../grovedb-version", version = "2.0.3" }
tokio-util = { version = "0.7.12", optional = true }

[dev-dependencies]
rand = "0.8.5"
Expand Down Expand Up @@ -74,6 +75,7 @@ estimated_costs = ["full"]
grovedbg = [
"grovedbg-types",
"tokio",
"tokio-util",
"full",
"grovedb-merk/grovedbg",
"axum",
Expand Down
4 changes: 2 additions & 2 deletions grovedb/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ fn main() {
use sha2::{digest::FixedOutput, Digest, Sha256};

const GROVEDBG_SHA256: [u8; 32] =
hex!("ea7d9258973aa765eaf5064451fc83efa22e0ce6eaf2938507e2703571364e35");
const GROVEDBG_VERSION: &str = "v1.0.0-rc.6";
hex!("5a1ee5a3033190974f580e41047ef9267ba03fafe0a70bbcf7878c1bb4f6126d");
const GROVEDBG_VERSION: &str = "v1.1.0";

let out_dir = PathBuf::from(&env::var_os("OUT_DIR").unwrap());
let grovedbg_zip_path = out_dir.join("grovedbg.zip");
Expand Down
217 changes: 171 additions & 46 deletions grovedb/src/debugger.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
//! GroveDB debugging support module.
use std::{collections::BTreeMap, fs, sync::Weak};
use std::{
collections::{BTreeMap, HashMap},
fs,
sync::{Arc, Weak},
time::{Duration, Instant, SystemTime},
};

use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Json, Router};
use grovedb_merk::{
Expand All @@ -11,14 +16,19 @@ use grovedb_merk::{
use grovedb_path::SubtreePath;
use grovedb_version::version::GroveVersion;
use grovedbg_types::{
MerkProofNode, MerkProofOp, NodeFetchRequest, NodeUpdate, Path, PathQuery, Query, QueryItem,
SizedQuery, SubqueryBranch,
DropSessionRequest, MerkProofNode, MerkProofOp, NewSessionResponse, NodeFetchRequest,
NodeUpdate, Path, PathQuery, Query, QueryItem, SessionId, SizedQuery, SubqueryBranch,
WithSession,
};
use indexmap::IndexMap;
use tempfile::tempdir;
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, Sender},
select,
sync::{RwLock, RwLockReadGuard},
time::sleep,
};
use tokio_util::sync::CancellationToken;
use tower_http::services::ServeDir;

use crate::{
Expand All @@ -30,6 +40,8 @@ use crate::{

const GROVEDBG_ZIP: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/grovedbg.zip"));

const SESSION_TIMEOUT: Duration = Duration::from_secs(60 * 10);

pub(super) fn start_visualizer<A>(grovedb: Weak<GroveDb>, addr: A)
where
A: ToSocketAddrs + Send + 'static,
Expand All @@ -44,33 +56,132 @@ where
zip_extensions::read::zip_extract(&grovedbg_zip, &grovedbg_www)
.expect("cannot extract grovedbg contents");

let (shutdown_send, mut shutdown_receive) = mpsc::channel::<()>(1);
let cancellation_token = CancellationToken::new();

let state: AppState = AppState {
cancellation_token: cancellation_token.clone(),
grovedb,
sessions: Default::default(),
};

let app = Router::new()
.route("/new_session", post(new_session))
.route("/drop_session", post(drop_session))
.route("/fetch_node", post(fetch_node))
.route("/fetch_root_node", post(fetch_root_node))
.route("/prove_path_query", post(prove_path_query))
.route("/fetch_with_path_query", post(fetch_with_path_query))
.fallback_service(ServeDir::new(grovedbg_www))
.with_state((shutdown_send, grovedb));

tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let listener = tokio::net::TcpListener::bind(addr)
.await
.expect("can't bind visualizer port");
axum::serve(listener, app)
.with_graceful_shutdown(async move {
shutdown_receive.recv().await;
})
.await
.unwrap()
});
.with_state(state.clone());

let rt = tokio::runtime::Runtime::new().unwrap();

let cloned_cancellation_token = cancellation_token.clone();
rt.spawn(async move {
loop {
select! {
_ = cloned_cancellation_token.cancelled() => break,
_ = sleep(Duration::from_secs(10)) => {
let now = Instant::now();
let mut lock = state.sessions.write().await;
let to_delete: Vec<SessionId> = lock.iter().filter_map(
|(id, session)| (session.last_access < now - SESSION_TIMEOUT).then_some(*id)
).collect();

to_delete.into_iter().for_each(|id| { lock.remove(&id); });
}
}
}
});

rt.block_on(async move {
let listener = tokio::net::TcpListener::bind(addr)
.await
.expect("can't bind visualizer port");
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancellation_token.cancelled().await;
})
.await
.unwrap()
});
});
}

#[derive(Clone)]
struct AppState {
cancellation_token: CancellationToken,
grovedb: Weak<GroveDb>,
sessions: Arc<RwLock<HashMap<SessionId, Session>>>,
}

impl AppState {
fn verify_running(&self) -> Result<(), AppError> {
if self.grovedb.strong_count() == 0 {
self.cancellation_token.cancel();
Err(AppError::Closed)
} else {
Ok(())
}
}

async fn new_session(&self) -> Result<SessionId, AppError> {
let grovedb = self.grovedb.upgrade().ok_or(AppError::Closed)?;
let id = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
self.sessions
.write()
.await
.insert(id, Session::new(&grovedb)?);

Ok(id)
}

async fn drop_session(&self, id: SessionId) {
self.sessions.write().await.remove(&id);
}

async fn get_snapshot(&self, id: SessionId) -> Result<RwLockReadGuard<GroveDb>, AppError> {
self.verify_running()?;
let mut lock = self.sessions.write().await;
if let Some(session) = lock.get_mut(&id) {
session.last_access = Instant::now();
Ok(RwLockReadGuard::map(lock.downgrade(), |l| {
&l.get(&id).as_ref().expect("checked above").snapshot
}))
} else {
Err(AppError::NoSession)
}
}
}

struct Session {
last_access: Instant,
_tempdir: tempfile::TempDir,
snapshot: GroveDb,
}

impl Session {
fn new(grovedb: &GroveDb) -> Result<Self, AppError> {
let tempdir = tempdir().map_err(|e| AppError::Any(e.to_string()))?;
let path = tempdir.path().join("grovedbg_session");
grovedb
.create_checkpoint(&path)
.map_err(|e| AppError::Any(e.to_string()))?;
let snapshot = GroveDb::open(path).map_err(|e| AppError::Any(e.to_string()))?;
Ok(Session {
last_access: Instant::now(),
_tempdir: tempdir,
snapshot,
})
}
}

enum AppError {
Closed,
NoSession,
Any(String),
}

Expand All @@ -80,6 +191,9 @@ impl IntoResponse for AppError {
AppError::Closed => {
(StatusCode::SERVICE_UNAVAILABLE, "GroveDB is closed").into_response()
}
AppError::NoSession => {
(StatusCode::UNAUTHORIZED, "No session with this id").into_response()
}
AppError::Any(e) => (StatusCode::INTERNAL_SERVER_ERROR, e).into_response(),
}
}
Expand All @@ -91,16 +205,28 @@ impl<E: std::error::Error> From<E> for AppError {
}
}

async fn new_session(State(state): State<AppState>) -> Result<Json<NewSessionResponse>, AppError> {
Ok(Json(NewSessionResponse {
session_id: state.new_session().await?,
}))
}

async fn drop_session(
State(state): State<AppState>,
Json(DropSessionRequest { session_id }): Json<DropSessionRequest>,
) {
state.drop_session(session_id).await;
}

async fn fetch_node(
State((shutdown, grovedb)): State<(Sender<()>, Weak<GroveDb>)>,
Json(NodeFetchRequest { path, key }): Json<NodeFetchRequest>,
State(state): State<AppState>,
Json(WithSession {
session_id,
request: NodeFetchRequest { path, key },
}): Json<WithSession<NodeFetchRequest>>,
) -> Result<Json<Option<NodeUpdate>>, AppError> {
let Some(db) = grovedb.upgrade() else {
shutdown.send(()).await.ok();
return Err(AppError::Closed);
};
let db = state.get_snapshot(session_id).await?;

// todo: GroveVersion::latest() to actual version
let merk = db
.open_non_transactional_merk_at_path(path.as_slice().into(), None, GroveVersion::latest())
.unwrap()?;
Expand All @@ -115,14 +241,14 @@ async fn fetch_node(
}

async fn fetch_root_node(
State((shutdown, grovedb)): State<(Sender<()>, Weak<GroveDb>)>,
State(state): State<AppState>,
Json(WithSession {
session_id,
request: (),
}): Json<WithSession<()>>,
) -> Result<Json<Option<NodeUpdate>>, AppError> {
let Some(db) = grovedb.upgrade() else {
shutdown.send(()).await.ok();
return Err(AppError::Closed);
};
let db = state.get_snapshot(session_id).await?;

// todo: GroveVersion::latest() to actual version
let merk = db
.open_non_transactional_merk_at_path(SubtreePath::empty(), None, GroveVersion::latest())
.unwrap()?;
Expand All @@ -138,13 +264,13 @@ async fn fetch_root_node(
}

async fn prove_path_query(
State((shutdown, grovedb)): State<(Sender<()>, Weak<GroveDb>)>,
Json(json_path_query): Json<PathQuery>,
State(state): State<AppState>,
Json(WithSession {
session_id,
request: json_path_query,
}): Json<WithSession<PathQuery>>,
) -> Result<Json<grovedbg_types::Proof>, AppError> {
let Some(db) = grovedb.upgrade() else {
shutdown.send(()).await.ok();
return Err(AppError::Closed);
};
let db = state.get_snapshot(session_id).await?;

let path_query = path_query_to_grovedb(json_path_query);

Expand All @@ -155,13 +281,13 @@ async fn prove_path_query(
}

async fn fetch_with_path_query(
State((shutdown, grovedb)): State<(Sender<()>, Weak<GroveDb>)>,
Json(json_path_query): Json<PathQuery>,
State(state): State<AppState>,
Json(WithSession {
session_id,
request: json_path_query,
}): Json<WithSession<PathQuery>>,
) -> Result<Json<Vec<grovedbg_types::NodeUpdate>>, AppError> {
let Some(db) = grovedb.upgrade() else {
shutdown.send(()).await.ok();
return Err(AppError::Closed);
};
let db = state.get_snapshot(session_id).await?;

let path_query = path_query_to_grovedb(json_path_query);

Expand Down Expand Up @@ -487,7 +613,6 @@ fn node_to_update(
feature_type,
}: NodeDbg,
) -> Result<NodeUpdate, crate::Error> {
// todo: GroveVersion::latest() to actual version
let grovedb_element = crate::Element::deserialize(&value, GroveVersion::latest())?;

let element = element_to_grovedbg(grovedb_element);
Expand Down
17 changes: 17 additions & 0 deletions grovedbg-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ use serde_with::{base64::Base64, serde_as};
pub type Key = Vec<u8>;
pub type Path = Vec<PathSegment>;
pub type PathSegment = Vec<u8>;
pub type SessionId = u64;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WithSession<R> {
pub session_id: SessionId,
pub request: R,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NewSessionResponse {
pub session_id: SessionId,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DropSessionRequest {
pub session_id: SessionId,
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down

0 comments on commit fd2f134

Please sign in to comment.