diff --git a/core/Cargo.lock b/core/Cargo.lock index eacc8122a37..b15064c988a 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -268,6 +268,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-compat" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bab94bde396a3f7b4962e396fdad640e241ed797d4d8d77fc8c237d14c58fc0" +dependencies = [ + "futures-core", + "futures-io", + "once_cell", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-executor" version = "1.13.1" @@ -348,6 +361,16 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "async-sleep" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c327a532ed3acb8ad885b50bb2ea5fc7c132a396dd990cf855d2825fbdc16c6c" +dependencies = [ + "futures-util", + "tokio", +] + [[package]] name = "async-std" version = "1.13.0" @@ -1031,6 +1054,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" +[[package]] +name = "base64" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" + [[package]] name = "base64" version = "0.13.1" @@ -1280,6 +1309,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "bufsize" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7864afba28009cd99a4d973c3de89cc766b800cdf1bd909966d454906f3bce5d" +dependencies = [ + "bytes", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -1331,6 +1369,9 @@ name = "bytes" version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +dependencies = [ + "serde", +] [[package]] name = "bytes-utils" @@ -1819,6 +1860,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "const-cstr" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3d0b5ff30645a68f35ece8cea4556ca14ef8a1651455f789a099a0513532a6" + [[package]] name = "const-oid" version = "0.9.6" @@ -3080,6 +3127,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "ghost" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0e085ded9f1267c32176b40921b9754c474f7dd96f7e808d4a982e48aa1e854" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "gimli" version = "0.31.0" @@ -4677,12 +4735,14 @@ dependencies = [ "reqsign", "reqwest 0.12.7", "rocksdb", + "rust-nebula", "serde", "serde_json", "sha1", "sha2", "size", "sled", + "snowflaked", "sqlx", "suppaftp", "surrealdb", @@ -4963,6 +5023,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "ordered-float" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" +dependencies = [ + "num-traits", + "serde", +] + [[package]] name = "ordered-multimap" version = "0.7.3" @@ -5006,6 +5076,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "panic-message" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d" + [[package]] name = "parking" version = "2.2.1" @@ -6336,6 +6412,35 @@ dependencies = [ "trim-in-place", ] +[[package]] +name = "rust-nebula" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11a94ea754ca8b05b71ae911b7035180861cebb607c711acd766d14c04f87ae9" +dependencies = [ + "anyhow", + "async-compat", + "async-sleep", + "async-trait", + "base64 0.11.0", + "bb8", + "bufsize", + "bytes", + "const-cstr", + "futures", + "futures-util", + "ghost", + "num-derive", + "num-traits", + "ordered-float", + "panic-message", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "rust-stemmers" version = "1.2.0" @@ -6956,6 +7061,15 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "snowflaked" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "398d462c4c454399be452039b24b0aa0ecb4c7a57f6ae615f5d25de2b032f850" +dependencies = [ + "loom", +] + [[package]] name = "socket2" version = "0.5.7" diff --git a/core/Cargo.toml b/core/Cargo.toml index 6b4e5d592d9..323d6087c92 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -166,6 +166,7 @@ services-moka = ["dep:moka"] services-mongodb = ["dep:mongodb"] services-monoiofs = ["dep:monoio", "dep:flume"] services-mysql = ["dep:sqlx", "sqlx?/mysql"] +services-nebula-graph = ["dep:rust-nebula", "dep:bb8", "dep:snowflaked"] services-obs = [ "dep:reqsign", "reqsign?/services-huaweicloud", @@ -204,7 +205,6 @@ services-vercel-blob = [] services-webdav = [] services-webhdfs = [] services-yandex-disk = [] -services-nebula-graph = [] [lib] bench = false @@ -346,6 +346,9 @@ compio = { version = "0.11.0", optional = true, features = [ ] } # for services-s3 crc32c = { version = "0.6.6", optional = true } +# for services-nebula-graph +rust-nebula = { version = "^0.0.2", optional = true, features = ["graph"] } +snowflaked = { version = "1", optional = true, features = ["sync"] } # for services-monoiofs flume = { version = "0.11", optional = true } monoio = { version = "0.2.4", optional = true, features = [ diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 753838b3131..0437dff4a75 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -145,6 +145,9 @@ pub use monoiofs::*; mod mysql; pub use self::mysql::*; +mod nebula_graph; +pub use nebula_graph::*; + mod obs; pub use obs::*; diff --git a/core/src/services/nebula_graph/backend.rs b/core/src/services/nebula_graph/backend.rs index 6ae3ebb8ea8..d03dd5bf2b1 100644 --- a/core/src/services/nebula_graph/backend.rs +++ b/core/src/services/nebula_graph/backend.rs @@ -17,7 +17,31 @@ use std::fmt::Debug; +#[cfg(feature = "tests")] +use std::time::Duration; + +use base64::engine::general_purpose::STANDARD as BASE64; +use base64::engine::Engine as _; +use bb8::{PooledConnection, RunError}; +use rust_nebula::{ + graph::GraphQuery, HostAddress, SingleConnSessionConf, SingleConnSessionManager, +}; +use snowflaked::sync::Generator; +use tokio::sync::OnceCell; + +use crate::raw::adapters::kv; +use crate::raw::*; use crate::services::NebulaGraphConfig; +use crate::*; + +static GENERATOR: Generator = Generator::new(0); + +impl Configurator for NebulaGraphConfig { + type Builder = NebulaGraphBuilder; + fn into_builder(self) -> Self::Builder { + NebulaGraphBuilder { config: self } + } +} #[doc = include_str!("docs.md")] #[derive(Default)] @@ -110,3 +134,263 @@ impl NebulaGraphBuilder { self } } + +impl Builder for NebulaGraphBuilder { + const SCHEME: Scheme = Scheme::NebulaGraph; + type Config = NebulaGraphConfig; + + fn build(self) -> Result { + let host = match self.config.host.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "host is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let port = match self.config.port { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "port is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let username = match self.config.username.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "username is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let password = match self.config.password.clone() { + Some(v) => v, + None => "".to_string(), + }; + let space = match self.config.space.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "space is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let tag = match self.config.tag.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "tag is empty") + .with_context("service", Scheme::NebulaGraph)) + } + }; + let key_field = match self.config.key_field.clone() { + Some(v) => v, + None => "key".to_string(), + }; + let value_field = match self.config.value_field.clone() { + Some(v) => v, + None => "value".to_string(), + }; + let root = normalize_root( + self.config + .root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + + let mut session_config = SingleConnSessionConf::new( + vec![HostAddress::new(&host, port)], + username, + password, + Some(space), + ); + // NebulaGraph use fbthrift for communication. fbthrift's max_buffer_size is default 4 KB, + // which is too small to store something. + // So we could set max_buffer_size to 10 MB so that NebulaGraph can store files with filesize < 1 MB at least. + session_config.set_buf_size(1024 * 1024); + session_config.set_max_buf_size(64 * 1024 * 1024); + session_config.set_max_parse_response_bytes_count(254); + + Ok(NebulaGraphBackend::new(Adapter { + session_pool: OnceCell::new(), + session_config, + + tag, + key_field, + value_field, + }) + .with_root(root.as_str())) + } +} + +/// Backend for NebulaGraph service +pub type NebulaGraphBackend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + session_pool: OnceCell>, + session_config: SingleConnSessionConf, + + tag: String, + key_field: String, + value_field: String, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Adapter") + .field("session_config", &self.session_config) + .field("tag", &self.tag) + .field("key_field", &self.key_field) + .field("value_field", &self.value_field) + .finish() + } +} + +impl Adapter { + async fn get_session(&self) -> Result> { + let session_pool = self + .session_pool + .get_or_try_init(|| async { + bb8::Pool::builder() + .max_size(64) + .build(SingleConnSessionManager::new(self.session_config.clone())) + .await + }) + .await + .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{}", err)).set_temporary())?; + + session_pool.get().await.map_err(|err| match err { + RunError::User(err) => { + Error::new(ErrorKind::Unexpected, format!("{}", err)).set_temporary() + } + RunError::TimedOut => { + Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary() + } + }) + } +} + +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::NebulaGraph, + &self.session_config.space.clone().unwrap(), + Capability { + read: true, + write: true, + write_total_max_size: Some(1024 * 1024), + write_can_empty: true, + delete: true, + list: true, + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result> { + let path = path.replace("'", "\\'").replace('"', "\\\""); + let query = format!( + "LOOKUP ON {} WHERE {}.{} == '{}' YIELD properties(vertex).{} AS {};", + self.tag, self.tag, self.key_field, path, self.value_field, self.value_field + ); + let mut sess = self.get_session().await?; + let result = sess + .query(&query) + .await + .map_err(parse_nebulagraph_session_error)?; + if result.is_empty() { + Ok(None) + } else { + let row = result + .get_row_values_by_index(0) + .map_err(parse_nebulagraph_dataset_error)?; + let value = row + .get_value_by_col_name(&self.value_field) + .map_err(parse_nebulagraph_dataset_error)?; + let base64_str = value.as_string().map_err(parse_nebulagraph_dataset_error)?; + let value_str = BASE64.decode(base64_str).map_err(|err| { + Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph") + .set_source(err) + })?; + let buf = Buffer::from(value_str); + Ok(Some(buf)) + } + } + + async fn set(&self, path: &str, value: Buffer) -> Result<()> { + #[cfg(feature = "tests")] + let path_copy = path; + + self.delete(path).await?; + let path = path.replace("'", "\\'").replace('"', "\\\""); + let file = value.to_vec(); + let file = BASE64.encode(&file); + let snowflake_id: u64 = GENERATOR.generate(); + let query = format!( + "INSERT VERTEX {} VALUES {}:('{}', '{}');", + self.tag, snowflake_id, path, file + ); + let mut sess = self.get_session().await?; + sess.execute(&query) + .await + .map_err(parse_nebulagraph_session_error)?; + + // To pass tests, we should confirm NebulaGraph has inserted data successfully + #[cfg(feature = "tests")] + loop { + let v = self.get(path_copy).await.unwrap(); + if v.is_none() { + std::thread::sleep(Duration::from_millis(1000)); + } else { + break; + } + } + Ok(()) + } + + async fn delete(&self, path: &str) -> Result<()> { + let path = path.replace("'", "\\'").replace('"', "\\\""); + let query = format!( + "LOOKUP ON {} WHERE {}.{} == '{}' YIELD id(vertex) AS id | DELETE VERTEX $-.id;", + self.tag, self.tag, self.key_field, path + ); + let mut sess = self.get_session().await?; + sess.execute(&query) + .await + .map_err(parse_nebulagraph_session_error)?; + Ok(()) + } + + async fn scan(&self, path: &str) -> Result> { + let path = path.replace("'", "\\'").replace('"', "\\\""); + let query = format!( + "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD properties(vertex).{} AS {};", + self.tag, self.tag, self.key_field, path, self.key_field, self.key_field + ); + + let mut sess = self.get_session().await?; + let result = sess + .query(&query) + .await + .map_err(parse_nebulagraph_session_error)?; + let mut res_vec = vec![]; + for row_i in 0..result.get_row_size() { + let row = result + .get_row_values_by_index(row_i) + .map_err(parse_nebulagraph_dataset_error)?; + let value = row + .get_value_by_col_name(&self.key_field) + .map_err(parse_nebulagraph_dataset_error)?; + let sub_path = value.as_string().map_err(parse_nebulagraph_dataset_error)?; + + res_vec.push(sub_path); + } + Ok(res_vec) + } +} + +fn parse_nebulagraph_session_error(err: rust_nebula::SingleConnSessionError) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph").set_source(err) +} + +fn parse_nebulagraph_dataset_error(err: rust_nebula::DataSetError) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from nebulagraph").set_source(err) +} diff --git a/core/src/types/operator/builder.rs b/core/src/types/operator/builder.rs index 4691eda6b24..95dfa1c17a1 100644 --- a/core/src/types/operator/builder.rs +++ b/core/src/types/operator/builder.rs @@ -292,6 +292,8 @@ impl Operator { Scheme::HdfsNative => Self::from_iter::(iter)?.finish(), #[cfg(feature = "services-lakefs")] Scheme::Lakefs => Self::from_iter::(iter)?.finish(), + #[cfg(feature = "services-nebula-graph")] + Scheme::NebulaGraph => Self::from_iter::(iter)?.finish(), v => { return Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 4dd8acc69fb..c0da5219b82 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -165,6 +165,8 @@ pub enum Scheme { Surrealdb, /// [lakefs](crate::services::Lakefs): LakeFS Services Lakefs, + /// [NebulaGraph](crate::services::NebulaGraph): NebulaGraph Services + NebulaGraph, /// Custom that allow users to implement services outside of OpenDAL. /// /// # NOTE @@ -315,6 +317,8 @@ impl Scheme { Scheme::Surrealdb, #[cfg(feature = "services-lakefs")] Scheme::Lakefs, + #[cfg(feature = "services-nebula-graph")] + Scheme::NebulaGraph, ]) } } @@ -406,6 +410,7 @@ impl FromStr for Scheme { "hdfs_native" => Ok(Scheme::HdfsNative), "surrealdb" => Ok(Scheme::Surrealdb), "lakefs" => Ok(Scheme::Lakefs), + "nebula_graph" => Ok(Scheme::NebulaGraph), _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))), } } @@ -480,6 +485,7 @@ impl From for &'static str { Scheme::HdfsNative => "hdfs_native", Scheme::Surrealdb => "surrealdb", Scheme::Lakefs => "lakefs", + Scheme::NebulaGraph => "nebula_graph", Scheme::Custom(v) => v, } }