diff --git a/Cargo.toml b/Cargo.toml index 40d63fe..914dedc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,6 @@ members = [ "hitbox-redis", "hitbox-stretto", "hitbox-tower", + "hitbox-tarantool", "examples", ] diff --git a/hitbox-backend/src/serializer.rs b/hitbox-backend/src/serializer.rs index f14016f..9dc7aa0 100644 --- a/hitbox-backend/src/serializer.rs +++ b/hitbox-backend/src/serializer.rs @@ -27,7 +27,7 @@ pub trait Serializer { } #[derive(Deserialize, Serialize)] -struct SerializableCachedValue { +pub struct SerializableCachedValue { data: U, expired: DateTime, } @@ -38,6 +38,15 @@ impl SerializableCachedValue { } } +impl From> for SerializableCachedValue { + fn from(value: CachedValue) -> SerializableCachedValue { + SerializableCachedValue { + data: value.data, + expired: value.expired, + } + } +} + #[derive(Default)] pub struct JsonSerializer> { _raw: PhantomData, diff --git a/hitbox-tarantool/CHANGELOG.md b/hitbox-tarantool/CHANGELOG.md new file mode 100644 index 0000000..01307c6 --- /dev/null +++ b/hitbox-tarantool/CHANGELOG.md @@ -0,0 +1,12 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## Unreleased + +### Added + +- Initial release diff --git a/hitbox-tarantool/Cargo.toml b/hitbox-tarantool/Cargo.toml new file mode 100644 index 0000000..b47068d --- /dev/null +++ b/hitbox-tarantool/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "hitbox-tarantool" +version = "0.1.0" +authors = [ + "Evgeniy ", + "Belousov Max ", + "Andrey Ermilov ", +] +license = "MIT" +edition = "2021" +description = "Hitbox tarantool backend." +readme = "README.md" +repository = "https://github.com/hit-box/hitbox/" +categories = ["caching", "asynchronous"] +keywords = ["cache", "async", "cache-backend", "hitbox", "tarantool"] + +[dependencies] +hitbox-backend = { path = "../hitbox-backend", version = "0.1.0" } +hitbox-core = { path = "../hitbox-core", version = "0.1.0" } +async-trait = "0.1" +serde = "1" +rusty_tarantool = "0.3.0" +typed-builder = "0.15" + +[dev-dependencies] +tokio = { version = "1", features = [ + "time", + "macros", + "test-util", + "rt-multi-thread", +] } +testcontainers = "0.14" +chrono = "0.4" +once_cell = "1" diff --git a/hitbox-tarantool/LICENSE b/hitbox-tarantool/LICENSE new file mode 100644 index 0000000..9a4a7a6 --- /dev/null +++ b/hitbox-tarantool/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 Makc + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/hitbox-tarantool/src/backend.rs b/hitbox-tarantool/src/backend.rs new file mode 100644 index 0000000..8ebfd18 --- /dev/null +++ b/hitbox-tarantool/src/backend.rs @@ -0,0 +1,147 @@ +use async_trait::async_trait; +use hitbox_backend::{ + serializer::SerializableCachedValue, BackendError, BackendResult, CacheBackend, DeleteStatus, +}; +use hitbox_core::{CacheKey, CacheableResponse, CachedValue}; +use rusty_tarantool::tarantool::{Client, ClientConfig, ExecWithParamaters}; +use serde::{Deserialize, Serialize}; +use std::io::Error; +use typed_builder::TypedBuilder; + +const TARANTOOL_INIT_LUA: &str = include_str!("init.lua"); + +/// Tarantool cache backend based on rusty_tarantool crate. +/// +/// # Examples +/// ``` +/// use hitbox_tarantool::Tarantool; +/// +/// #[tokio::main] +/// async fn main() { +/// let mut backend = Tarantool::builder().build(); +/// // backend.init().await.unwrap(); +/// } +/// ``` +#[derive(Clone, TypedBuilder)] +#[builder(build_method(vis="", name=__build))] +pub struct Tarantool { + #[builder(default = "hitbox".to_string())] + user: String, + #[builder(default = "hitbox".to_string())] + password: String, + #[builder(default = "127.0.0.1".to_string())] + host: String, + #[builder(default = "3301".to_string())] + port: String, +} + +pub struct TarantoolBackend { + client: Client, +} + +#[allow(non_camel_case_types)] +impl< + __user: ::typed_builder::Optional, + __password: ::typed_builder::Optional, + __host: ::typed_builder::Optional, + __port: ::typed_builder::Optional, + > TarantoolBuilder<(__user, __password, __host, __port)> +{ + pub fn build(self) -> TarantoolBackend { + let t = self.__build(); + let client = + ClientConfig::new(format!("{}:{}", t.host, t.port), t.user, t.password).build(); + TarantoolBackend { client } + } +} + +impl TarantoolBackend { + /// Init backend and configure tarantool instance + /// This function is idempotent + pub async fn init(&mut self) -> BackendResult<()> { + self.client + .eval(TARANTOOL_INIT_LUA, &("hitbox_cache",)) + .await + .map_err(|err| BackendError::InternalError(Box::new(err)))?; + + Ok(()) + } + + fn map_err(err: Error) -> BackendError { + BackendError::InternalError(Box::new(err)) + } +} + +#[doc(hidden)] +#[derive(Serialize, Deserialize)] +pub struct CacheEntry { + pub key: String, + pub ttl: Option, + pub value: SerializableCachedValue, +} + +#[async_trait] +impl CacheBackend for TarantoolBackend { + async fn get(&self, key: &CacheKey) -> BackendResult>> + where + T: CacheableResponse, + ::Cached: serde::de::DeserializeOwned, + { + self.client + .prepare_fn_call("hitbox.get") + .bind_ref(&(key.serialize())) + .map_err(TarantoolBackend::map_err)? + .execute() + .await + .map_err(TarantoolBackend::map_err)? + .decode_single::>>() + .map_err(TarantoolBackend::map_err) + .map(|v| v.map(|v| v.value.into_cached_value())) + } + + async fn delete(&self, key: &CacheKey) -> BackendResult { + let result: bool = self + .client + .prepare_fn_call("hitbox.delete") + .bind_ref(&(key.serialize())) + .map_err(TarantoolBackend::map_err)? + .execute() + .await + .map_err(TarantoolBackend::map_err)? + .decode_single() + .map_err(TarantoolBackend::map_err)?; + match result { + true => Ok(DeleteStatus::Deleted(1)), + false => Ok(DeleteStatus::Missing), + } + } + + async fn set( + &self, + key: &CacheKey, + value: &CachedValue, + ttl: Option, + ) -> BackendResult<()> + where + T: CacheableResponse + Send, + T::Cached: serde::Serialize + Send + Sync, + { + let entry: CacheEntry = CacheEntry { + key: key.serialize(), + ttl, + value: value.clone().into(), + }; + self.client + .prepare_fn_call("hitbox.set") + .bind_ref(&entry) + .map_err(TarantoolBackend::map_err)? + .execute() + .await + .map(|_| ()) + .map_err(TarantoolBackend::map_err) + } + + async fn start(&self) -> BackendResult<()> { + Ok(()) + } +} diff --git a/hitbox-tarantool/src/init.lua b/hitbox-tarantool/src/init.lua new file mode 100644 index 0000000..336a463 --- /dev/null +++ b/hitbox-tarantool/src/init.lua @@ -0,0 +1,73 @@ +local fiber = require("fiber") +local log = require("log") + +local SCAN_INTERVAL = 0.1 +local MAX_TUPLES_FOR_DELETE = 1000 + +box.cfg({}) + +local space_name = ... + +box.schema.space.create(space_name, { if_not_exists = true }) +box.space[space_name]:create_index("primary", { + type = "HASH", + parts = { { 1, "string" } }, + if_not_exists = true, +}) +box.space[space_name]:create_index("by_ttl", { + parts = { { 2, "integer" } }, + if_not_exists = true, +}) + +if not _G.__hitbox_cache_fiber then + _G.__hitbox_cache_fiber = fiber.create(function() + fiber.name("hitbox_cache_fiber") + while true do + box.ctl.wait_rw() + + local ok, err = pcall(function() + local for_del = box.space[space_name].index.by_ttl + :pairs({ math.floor(fiber.time()) }, { iterator = "LE" }) + :take(MAX_TUPLES_FOR_DELETE) + :totable() + + box.atomic(function() + for _, t in pairs(for_del) do + box.space[space_name]:delete(t[1]) + end + end) + + return true + end) + + if not ok then + log.error(err) + end + + fiber.testcancel() + fiber.sleep(SCAN_INTERVAL) + end + end) +end + +-- lua api for hitbox +_G.hitbox = { + ---Get cache entry by key + ---@param key string + ---@return table? + get = function(key) + return box.space[space_name]:get(key) + end, + ---Insert cache entry + ---@param entry table {key: string, ttl: number, value: any} + ---@return table saved entry + set = function(entry) + return box.space[space_name]:replace(entry) + end, + ---Delete cache entry + ---@param key string + ---@return boolean + delete = function(key) + return box.space[space_name]:delete(key) and true or false + end, +} diff --git a/hitbox-tarantool/src/lib.rs b/hitbox-tarantool/src/lib.rs new file mode 100644 index 0000000..4ef2e4d --- /dev/null +++ b/hitbox-tarantool/src/lib.rs @@ -0,0 +1,6 @@ +//! hitbox [Backend] implementation for Tarantool. +//! [Backend]: hitbox_backend::Backend +pub mod backend; + +#[doc(inline)] +pub use crate::backend::Tarantool; diff --git a/hitbox-tarantool/tests/integration_test.rs b/hitbox-tarantool/tests/integration_test.rs new file mode 100644 index 0000000..68a03ec --- /dev/null +++ b/hitbox-tarantool/tests/integration_test.rs @@ -0,0 +1,272 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use hitbox_backend::{CacheBackend, DeleteStatus}; +use hitbox_core::{CacheKey, CachePolicy, CacheableResponse, CachedValue, PredicateResult}; +use hitbox_tarantool::{backend::CacheEntry, backend::TarantoolBackend, Tarantool}; +use once_cell::sync::Lazy; +use rusty_tarantool::tarantool::{Client, ClientConfig, ExecWithParamaters}; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, str::FromStr, thread, time::Duration}; +use testcontainers::{clients, core::WaitFor, Container, Image}; + +static DOCKER: Lazy = Lazy::new(clients::Cli::default); + +impl Image for TarantoolImage { + type Args = (); + + fn name(&self) -> String { + "tarantool/tarantool".to_owned() + } + + fn tag(&self) -> String { + "latest".to_owned() + } + + fn ready_conditions(&self) -> Vec { + vec![WaitFor::Healthcheck] + } + + fn expose_ports(&self) -> Vec { + vec![3301] + } + + fn env_vars(&self) -> Box + '_> { + Box::new(self.env_vars.iter()) + } +} + +#[derive(Debug)] +struct TarantoolImage { + env_vars: HashMap, +} + +impl<'a> TarantoolImage { + async fn start() -> TarantoolContainer<'a> { + let container = DOCKER.run(TarantoolImage::default()); + let port = &container.ports().map_to_host_port_ipv4(3301).unwrap(); + let client = + ClientConfig::new(format!("{}:{}", "127.0.0.1", &port), "hitbox", "hitbox").build(); + let mut backend = Tarantool::builder().port(port.to_string()).build(); + backend.init().await.unwrap(); + TarantoolContainer { + _container: container, + client, + backend, + } + } +} + +struct TarantoolContainer<'a> { + _container: Container<'a, TarantoolImage>, + client: Client, + backend: TarantoolBackend, +} + +impl<'a> TarantoolContainer<'a> { + async fn eval(&self, cmd: &str, params: &T) -> R + where + T: Serialize, + R: Deserialize<'a>, + { + self.client + .eval(cmd, params) + .await + .unwrap() + .decode() + .unwrap() + } +} + +impl Default for TarantoolImage { + fn default() -> Self { + let mut env_vars = HashMap::new(); + env_vars.insert("TARANTOOL_USER_NAME".to_owned(), "hitbox".to_owned()); + env_vars.insert("TARANTOOL_USER_PASSWORD".to_owned(), "hitbox".to_owned()); + + Self { env_vars } + } +} + +#[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] +struct Test { + a: i32, + b: String, +} + +#[async_trait] +impl CacheableResponse for Test { + type Cached = Self; + type Subject = Self; + + async fn cache_policy

(self, predicates: P) -> hitbox_core::ResponseCachePolicy + where + P: hitbox_core::Predicate + Send + Sync, + { + match predicates.check(self).await { + PredicateResult::Cacheable(cacheable) => match cacheable.into_cached().await { + CachePolicy::Cacheable(res) => { + CachePolicy::Cacheable(CachedValue::new(res, Utc::now())) + } + CachePolicy::NonCacheable(res) => CachePolicy::NonCacheable(res), + }, + PredicateResult::NonCacheable(res) => CachePolicy::NonCacheable(res), + } + } + async fn into_cached(self) -> CachePolicy { + CachePolicy::Cacheable(self) + } + async fn from_cached(cached: Self::Cached) -> Self { + cached + } +} + +impl Default for Test { + fn default() -> Self { + Self { + a: 42, + b: "nope".to_owned(), + } + } +} + +#[tokio::test] +async fn test_init() { + let t = TarantoolImage::start().await; + + let space_exists: (bool,) = t + .eval( + "return box.space[...] and true or false", + &("hitbox_cache",), + ) + .await; + assert!(space_exists.0); + + let fiber_exists: (bool,) = t + .eval( + "return _G[...] and true or false", + &("__hitbox_cache_fiber",), + ) + .await; + assert!(fiber_exists.0); +} + +#[tokio::test] +async fn test_set() { + let t = TarantoolImage::start().await; + let key = CacheKey::from_str("test_key", "1"); + let dt = "2012-12-12T12:12:12Z"; + let ttl = 42; + let value = CachedValue::new(Test::default(), DateTime::from_str(dt).unwrap()); + + t.backend + .set::(&key, &value, Some(ttl)) + .await + .unwrap(); + + let result = t + .client + .prepare_fn_call("box.space.hitbox_cache:get") + .bind_ref(&key.serialize()) + .unwrap() + .execute() + .await + .unwrap() + .decode_single::>() + .unwrap(); + + assert_eq!(&result.ttl.unwrap(), &ttl); + assert_eq!(&result.value.into_cached_value().data, &Test::default()); +} + +#[tokio::test] +async fn test_expire() { + let t = TarantoolImage::start().await; + let key = CacheKey::from_str("test_key", "1"); + let dt = "2012-12-12T12:12:12Z"; + let value = CachedValue::new(Test::default(), DateTime::from_str(dt).unwrap()); + + t.backend.set::(&key, &value, Some(0)).await.unwrap(); + + thread::sleep(Duration::from_secs(1)); + + let result = t + .client + .prepare_fn_call("box.space.hitbox_cache:get") + .bind_ref(&key.serialize()) + .unwrap() + .execute() + .await + .unwrap() + .decode_result_set::>() + .unwrap(); + assert!(result.is_empty()) +} + +#[tokio::test] +async fn test_delete() { + let t = TarantoolImage::start().await; + let key = CacheKey::from_str("test_key", "1"); + let dt: DateTime = DateTime::from_str("2012-12-12T12:12:12Z").unwrap(); + let value = Test::default(); + let cached_value = CachedValue::new(&value, dt); + let entry = CacheEntry { + key: key.serialize(), + ttl: Some(42), + value: cached_value.into(), + }; + + let status = t.backend.delete(&key).await.unwrap(); + assert_eq!(status, DeleteStatus::Missing); + + t.client + .prepare_fn_call("box.space.hitbox_cache:replace") + .bind_ref(&entry) + .unwrap() + .execute() + .await + .unwrap(); + + let status = t.backend.delete(&key).await.unwrap(); + assert_eq!(status, DeleteStatus::Deleted(1)); + + let result = t + .client + .prepare_fn_call("box.space.hitbox_cache:get") + .bind_ref(&key.serialize()) + .unwrap() + .execute() + .await + .unwrap() + .decode_result_set::>() + .unwrap(); + + assert!(result.is_empty()) +} + +#[tokio::test] +async fn test_get() { + let t = TarantoolImage::start().await; + let key = CacheKey::from_str("test_key", "1"); + let dt: DateTime = DateTime::from_str("2012-12-12T12:12:12Z").unwrap(); + + let value = Test::default(); + let cached_value = CachedValue::new(&value, dt); + let entry = CacheEntry { + key: key.serialize(), + ttl: Some(42), + value: cached_value.into(), + }; + + t.client + .prepare_fn_call("box.space.hitbox_cache:replace") + .bind_ref(&entry) + .unwrap() + .execute() + .await + .unwrap(); + + let data = t.backend.get::(&key).await.unwrap().unwrap(); + + assert_eq!(data.data, value); + assert_eq!(data.expired, dt); +}