Skip to content

Commit

Permalink
set/blocking_set
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Apr 16, 2024
1 parent 616baba commit a4df302
Show file tree
Hide file tree
Showing 23 changed files with 61 additions and 59 deletions.
4 changes: 2 additions & 2 deletions core/src/raw/adapters/kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
}

/// Set a key into service.
async fn set(&self, path: &str, value: &[u8]) -> Result<()>;
async fn set(&self, path: &str, value: Buffer) -> Result<()>;

/// The blocking version of set.
fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
let _ = (path, value);

Err(Error::new(
Expand Down
5 changes: 2 additions & 3 deletions core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::Arc;
use std::vec::IntoIter;

use async_trait::async_trait;
use bytes::Buf;

use super::Adapter;
use crate::raw::oio::{HierarchyLister, QueueBuf};
Expand Down Expand Up @@ -254,7 +253,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {

async fn close(&mut self) -> Result<()> {
let buf = self.buffer.collect();
self.kv.set(&self.path, buf.chunk()).await
self.kv.set(&self.path, buf).await
}

async fn abort(&mut self) -> Result<()> {
Expand All @@ -272,7 +271,7 @@ impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {

fn close(&mut self) -> Result<()> {
let buf = self.buffer.collect();
self.kv.blocking_set(&self.path, buf.chunk())?;
self.kv.blocking_set(&self.path, buf)?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions core/src/services/atomicserver/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl Adapter {
async fn atomic_post_object_request(
&self,
path: &str,
value: &[u8],
value: Buffer,
) -> Result<Request<Buffer>> {
let path = normalize_path(path);
let path = path.as_str();
Expand Down Expand Up @@ -424,7 +424,7 @@ impl kv::Adapter for Adapter {
Ok(Some(bytes_file))
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let req = self.atomic_get_object_request(path)?;
let res = self.client.send(req).await?;
let bytes = res.into_body();
Expand Down
6 changes: 2 additions & 4 deletions core/src/services/cacache/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,15 @@ impl kv::Adapter for Adapter {
Ok(Some(Buffer::from(result)))
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
cacache::write(&self.datadir, path, value)
.await
.map_err(parse_error)?;

Ok(())
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
cacache::write_sync(&self.datadir, path, value).map_err(parse_error)?;

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/services/cloudflare_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl kv::Adapter for Adapter {
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let url = format!("{}/values/{}", self.url_prefix, path);
let req = Request::put(&url);
let multipart = Multipart::new();
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/d1/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl kv::Adapter for Adapter {
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let table = &self.table;
let key_field = &self.key_field;
let value_field = &self.value_field;
Expand All @@ -334,7 +334,7 @@ impl kv::Adapter for Adapter {
DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
);

let params = vec![path.into(), value.into()];
let params = vec![path.into(), value.as_ref().into()];
let req = self.create_d1_query_request(&query, params)?;

let resp = self.client.send(req).await?;
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/etcd/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,10 @@ impl kv::Adapter for Adapter {
}
}

async fn set(&self, key: &str, value: &[u8]) -> Result<()> {
async fn set(&self, key: &str, value: Buffer) -> Result<()> {
let mut client = self.conn().await?;
let _ = client
.put(key, value, None)
.put(key, value.to_vec(), None)
.await
.map_err(format_etcd_error)?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/foundationdb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ impl kv::Adapter for Adapter {
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let transaction = self.db.create_trx().expect("Unable to create transaction");

transaction.set(path.as_bytes(), value);
transaction.set(path.as_bytes(), value.as_ref());

match transaction.commit().await {
Ok(_) => Ok(()),
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/gridfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl kv::Adapter for Adapter {
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let bucket = self.get_bucket().await?;
// delete old file if exists
let filter = doc! { "filename": path };
Expand All @@ -268,7 +268,7 @@ impl kv::Adapter for Adapter {
// set new file
let mut upload_stream = bucket.open_upload_stream(path, None);
upload_stream
.write_all(value)
.write_all(value.as_ref())
.await
.map_err(new_std_io_error)?;
upload_stream.close().await.map_err(new_std_io_error)?;
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/libsql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl kv::Adapter for Adapter {
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let query = format!(
"INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES (?, ?)",
self.table, self.key_field, self.value_field
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/memcached/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ impl kv::Adapter for Adapter {
Ok(result.map(Buffer::from))
}

async fn set(&self, key: &str, value: &[u8]) -> Result<()> {
async fn set(&self, key: &str, value: Buffer) -> Result<()> {
let mut conn = self.conn().await?;

conn.set(
&percent_encode_path(key),
value,
value.as_ref(),
// Set expiration to 0 if ttl not set.
self.default_ttl
.map(|v| v.as_secs() as u32)
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/mongodb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl kv::Adapter for Adapter {
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let collection = self.get_collection().await?;
let filter = doc! { self.key_field.as_str(): path };
let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } };
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/mysql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl kv::Adapter for Adapter {
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let query = format!(
"INSERT INTO `{}` (`{}`, `{}`)
VALUES (:path, :value)
Expand All @@ -278,7 +278,7 @@ impl kv::Adapter for Adapter {
statement,
params! {
"path" => path,
"value" => value,
"value" => value.as_ref(),
},
)
.await
Expand Down
11 changes: 6 additions & 5 deletions core/src/services/persy/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,21 @@ impl kv::Adapter for Adapter {
Ok(None)
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let cloned_path = path.to_string();
let cloned_value = value.to_vec();
let cloned_self = self.clone();

task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
.await
.map_err(new_task_join_error)
.and_then(|inner_result| inner_result)
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
let mut tx = self.persy.begin().map_err(parse_error)?;
let id = tx.insert(&self.segment, value).map_err(parse_error)?;
let id = tx
.insert(&self.segment, value.as_ref())
.map_err(parse_error)?;

tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
.map_err(parse_error)?;
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/postgresql/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl kv::Adapter for Adapter {
Ok(Some(Buffer::from(value)))
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let table = &self.table;
let key_field = &self.key_field;
let value_field = &self.value_field;
Expand All @@ -323,7 +323,7 @@ impl kv::Adapter for Adapter {
.await
.map_err(parse_postgre_error)?;
let _ = connection
.query(&statement, &[&path, &value])
.query(&statement, &[&path, &value.as_ref()])
.await
.map_err(parse_postgre_error)?;
Ok(())
Expand Down
11 changes: 6 additions & 5 deletions core/src/services/redb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,17 @@ impl kv::Adapter for Adapter {
Ok(result.map(Buffer::from))
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let cloned_self = self.clone();
let cloned_path = path.to_string();
let cloned_value = value.to_vec();

task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
.await
.map_err(new_task_join_error)
.and_then(|inner_result| inner_result)
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> =
Expand All @@ -183,7 +182,9 @@ impl kv::Adapter for Adapter {
.open_table(table_define)
.map_err(parse_table_error)?;

table.insert(path, value).map_err(parse_storage_error)?;
table
.insert(path, value.as_ref())
.map_err(parse_storage_error)?;
}

write_txn.commit().map_err(parse_commit_error)?;
Expand Down
3 changes: 2 additions & 1 deletion core/src/services/redis/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,9 @@ impl kv::Adapter for Adapter {
Ok(result.map(Buffer::from))
}

async fn set(&self, key: &str, value: &[u8]) -> Result<()> {
async fn set(&self, key: &str, value: Buffer) -> Result<()> {
let conn = self.conn().await?;
let value = value.as_ref();
match self.default_ttl {
Some(ttl) => match conn {
RedisConnection::Normal(mut conn) => conn
Expand Down
7 changes: 3 additions & 4 deletions core/src/services/rocksdb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,16 @@ impl kv::Adapter for Adapter {
Ok(result.map(Buffer::from))
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let cloned_self = self.clone();
let cloned_path = path.to_string();
let cloned_value = value.to_vec();

task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
.await
.map_err(new_task_join_error)?
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
self.db.put(path, value).map_err(parse_rocksdb_error)
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/services/sled/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,19 @@ impl kv::Adapter for Adapter {
.map(|v| Buffer::from(v.to_vec())))
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let cloned_self = self.clone();
let cloned_path = path.to_string();
let cloned_value = value.to_vec();

task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), value))
.await
.map_err(new_task_join_error)?
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
self.tree.insert(path, value).map_err(parse_error)?;

fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
self.tree
.insert(path, value.to_vec())
.map_err(parse_error)?;
Ok(())
}

Expand Down
11 changes: 4 additions & 7 deletions core/src/services/sqlite/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,18 +294,15 @@ impl kv::Adapter for Adapter {
}
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let this = self.clone();
let path = path.to_string();
// FIXME: can we avoid this copy?
let value = value.to_vec();

task::spawn_blocking(move || this.blocking_set(&path, &value))
task::spawn_blocking(move || this.blocking_set(&path, value))
.await
.map_err(new_task_join_error)?
}

fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
fn blocking_set(&self, path: &str, value: Buffer) -> Result<()> {
let conn = self.pool.get().map_err(parse_r2d2_error)?;

let query = format!(
Expand All @@ -314,7 +311,7 @@ impl kv::Adapter for Adapter {
);
let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?;
statement
.execute(params![path, value])
.execute(params![path, value.as_ref()])
.map_err(parse_rusqlite_error)?;
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/services/surrealdb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl kv::Adapter for Adapter {
Ok(value.map(Buffer::from))
}

async fn set(&self, path: &str, value: &[u8]) -> crate::Result<()> {
async fn set(&self, path: &str, value: Buffer) -> crate::Result<()> {
let query = format!(
"INSERT INTO {} ({}, {}) \
VALUES ($path, $value) \
Expand All @@ -366,7 +366,7 @@ impl kv::Adapter for Adapter {
.await?
.query(query)
.bind(("path", path))
.bind(("value", value))
.bind(("value", value.as_ref()))
.await
.map_err(parse_surrealdb_error)?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/tikv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl kv::Adapter for Adapter {
Ok(result.map(Buffer::from))
}

async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
self.get_connection()
.await?
.put(path.to_owned(), value.to_vec())
Expand Down
Loading

0 comments on commit a4df302

Please sign in to comment.