From 64b21d14984a7e80ba61bbca44b85287a5b296be Mon Sep 17 00:00:00 2001 From: 007gzs <007gzs@gmail.com> Date: Mon, 28 Oct 2024 17:43:10 +0800 Subject: [PATCH] add blank line --- plugins/wasm-rust/src/cluster_wrapper.rs | 9 ++++ plugins/wasm-rust/src/event_stream.rs | 1 + plugins/wasm-rust/src/internal.rs | 1 + plugins/wasm-rust/src/plugin_wrapper.rs | 25 ++++++++++ plugins/wasm-rust/src/redis_wrapper.rs | 60 ++++++++++++++++++++++++ plugins/wasm-rust/src/request_wrapper.rs | 2 + 6 files changed, 98 insertions(+) diff --git a/plugins/wasm-rust/src/cluster_wrapper.rs b/plugins/wasm-rust/src/cluster_wrapper.rs index 2891293fb3..8d3e204458 100644 --- a/plugins/wasm-rust/src/cluster_wrapper.rs +++ b/plugins/wasm-rust/src/cluster_wrapper.rs @@ -4,10 +4,12 @@ pub trait Cluster { fn cluster_name(&self) -> String; fn host_name(&self) -> String; } + #[derive(Debug, Clone)] pub struct RouteCluster { host: String, } + impl RouteCluster { pub fn new(host: &str) -> Self { RouteCluster { @@ -15,6 +17,7 @@ impl RouteCluster { } } } + impl Cluster for RouteCluster { fn cluster_name(&self) -> String { if let Some(res) = get_property(vec!["cluster_name"]) { @@ -111,6 +114,7 @@ impl NacosCluster { } } } + impl Cluster for NacosCluster { fn cluster_name(&self) -> String { let group = if self.group.is_empty() { @@ -154,6 +158,7 @@ impl StaticIpCluster { } } } + impl Cluster for StaticIpCluster { fn cluster_name(&self) -> String { format!("outbound|{}||{}.static", self.port, self.service_name) @@ -184,6 +189,7 @@ impl DnsCluster { } } } + impl Cluster for DnsCluster { fn cluster_name(&self) -> String { format!("outbound|{}||{}.dns", self.port, self.service_name) @@ -212,6 +218,7 @@ impl ConsulCluster { } } } + impl Cluster for ConsulCluster { fn cluster_name(&self) -> String { format!( @@ -245,10 +252,12 @@ impl FQDNCluster { } } } + impl Cluster for FQDNCluster { fn cluster_name(&self) -> String { format!("outbound|{}||{}", self.port, self.fqdn) } + fn host_name(&self) -> String { if self.host.is_empty() { self.fqdn.clone() diff --git a/plugins/wasm-rust/src/event_stream.rs b/plugins/wasm-rust/src/event_stream.rs index 46f4a3ca0f..37b0783914 100644 --- a/plugins/wasm-rust/src/event_stream.rs +++ b/plugins/wasm-rust/src/event_stream.rs @@ -78,6 +78,7 @@ impl Iterator for EventStream { None } } + impl EventStream { pub fn new() -> Self { EventStream { diff --git a/plugins/wasm-rust/src/internal.rs b/plugins/wasm-rust/src/internal.rs index 5a55726960..5a562419d4 100644 --- a/plugins/wasm-rust/src/internal.rs +++ b/plugins/wasm-rust/src/internal.rs @@ -390,6 +390,7 @@ pub(crate) fn redis_init( ) -> Result<(), Status> { hostcalls::redis_init(upstream, username, password, timeout) } + pub(crate) fn dispatch_redis_call( upstream: &str, query: &[u8], diff --git a/plugins/wasm-rust/src/plugin_wrapper.rs b/plugins/wasm-rust/src/plugin_wrapper.rs index 633b69f55f..6f9460dbc6 100644 --- a/plugins/wasm-rust/src/plugin_wrapper.rs +++ b/plugins/wasm-rust/src/plugin_wrapper.rs @@ -30,6 +30,7 @@ use serde::de::DeserializeOwned; lazy_static! { static ref LOG: Log = Log::new("plugin_wrapper".to_string()); } + thread_local! { static HTTP_CALLBACK_DISPATCHER: HttpCallbackDispatcher = HttpCallbackDispatcher::new(); } @@ -49,7 +50,9 @@ where None => None, } } + fn rule_matcher(&self) -> &SharedRuleMatcher; + fn create_http_context_wrapper( &self, _context_id: u32, @@ -63,20 +66,24 @@ pub type HttpCallbackFn = dyn FnOnce(u16, &MultiMap, Option>>, } + impl Default for HttpCallbackDispatcher { fn default() -> Self { Self::new() } } + impl HttpCallbackDispatcher { pub fn new() -> Self { HttpCallbackDispatcher { call_fns: RefCell::new(HashMap::new()), } } + pub fn set(&self, token_id: u32, arg: Box) { self.call_fns.borrow_mut().insert(token_id, arg); } + pub fn pop(&self, token_id: u32) -> Option> { self.call_fns.borrow_mut().remove(&token_id) } @@ -91,31 +98,39 @@ where _self_weak: Weak>>>, ) { } + fn log(&self) -> &Log { &LOG } + fn on_config(&mut self, _config: Rc) {} + fn on_http_request_complete_headers( &mut self, _headers: &MultiMap, ) -> HeaderAction { HeaderAction::Continue } + fn on_http_response_complete_headers( &mut self, _headers: &MultiMap, ) -> HeaderAction { HeaderAction::Continue } + fn cache_request_body(&self) -> bool { false } + fn cache_response_body(&self) -> bool { false } + fn on_http_request_complete_body(&mut self, _req_body: &Bytes) -> DataAction { DataAction::Continue } + fn on_http_response_complete_body(&mut self, _res_body: &Bytes) -> DataAction { DataAction::Continue } @@ -123,9 +138,11 @@ where fn replace_http_request_body(&mut self, body: &[u8]) { self.set_http_request_body(0, i32::MAX as usize, body) } + fn replace_http_response_body(&mut self, body: &[u8]) { self.set_http_response_body(0, i32::MAX as usize, body) } + #[allow(clippy::too_many_arguments)] fn http_call( &mut self, @@ -187,6 +204,7 @@ pub struct PluginHttpWrapper { rule_matcher: SharedRuleMatcher, http_content: Rc>>>, } + impl PluginHttpWrapper where PluginConfig: Default + DeserializeOwned + Clone + 'static, @@ -207,10 +225,12 @@ where http_content: rc_content, } } + fn get_http_call_fn(&mut self, token_id: u32) -> Option> { HTTP_CALLBACK_DISPATCHER.with(|dispatcher| dispatcher.pop(token_id)) } } + impl Context for PluginHttpWrapper where PluginConfig: Default + DeserializeOwned + Clone + 'static, @@ -272,21 +292,25 @@ where .borrow_mut() .on_grpc_call_response(token_id, status_code, response_size) } + fn on_grpc_stream_initial_metadata(&mut self, token_id: u32, num_elements: u32) { self.http_content .borrow_mut() .on_grpc_stream_initial_metadata(token_id, num_elements) } + fn on_grpc_stream_message(&mut self, token_id: u32, message_size: usize) { self.http_content .borrow_mut() .on_grpc_stream_message(token_id, message_size) } + fn on_grpc_stream_trailing_metadata(&mut self, token_id: u32, num_elements: u32) { self.http_content .borrow_mut() .on_grpc_stream_trailing_metadata(token_id, num_elements) } + fn on_grpc_stream_close(&mut self, token_id: u32, status_code: u32) { self.http_content .borrow_mut() @@ -297,6 +321,7 @@ where self.http_content.borrow_mut().on_done() } } + impl HttpContext for PluginHttpWrapper where PluginConfig: Default + DeserializeOwned + Clone + 'static, diff --git a/plugins/wasm-rust/src/redis_wrapper.rs b/plugins/wasm-rust/src/redis_wrapper.rs index 320c4bf8ca..ae25a6cd24 100644 --- a/plugins/wasm-rust/src/redis_wrapper.rs +++ b/plugins/wasm-rust/src/redis_wrapper.rs @@ -25,6 +25,7 @@ pub struct RedisClientBuilder { password: Option, timeout: Duration, } + impl RedisClientBuilder { pub fn new(cluster: &dyn Cluster, timeout: Duration) -> Self { RedisClientBuilder { @@ -34,14 +35,17 @@ impl RedisClientBuilder { timeout, } } + pub fn username>(mut self, username: Option) -> Self { self.username = username.map(|u| u.as_ref().to_string()); self } + pub fn password>(mut self, password: Option) -> Self { self.password = password.map(|p| p.as_ref().to_string()); self } + pub fn build(self) -> RedisClient { RedisClient { upstream: self.upstream, @@ -67,10 +71,12 @@ impl RedisClientConfig { timeout, } } + pub fn username>(&mut self, username: Option) -> &Self { self.username = username.map(|u| u.as_ref().to_string()); self } + pub fn password>(&mut self, password: Option) -> &Self { self.password = password.map(|p| p.as_ref().to_string()); self @@ -93,6 +99,7 @@ impl RedisClient { timeout: config.timeout, } } + pub fn init(&self) -> Result<(), Status> { internal::redis_init( &self.upstream, @@ -101,12 +108,15 @@ impl RedisClient { self.timeout, ) } + fn call(&self, query: &[u8], call_fn: Box) -> Result { internal::dispatch_redis_call(&self.upstream, query, gen_callback(call_fn)) } + pub fn command(&self, cmd: &Cmd, call_fn: Box) -> Result { self.call(&cmd.get_packed_command(), call_fn) } + pub fn eval( &self, script: &str, @@ -132,11 +142,13 @@ impl RedisClient { cmd.arg(key); self.command(&cmd, call_fn) } + pub fn exists(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("exists"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn expire( &self, key: &str, @@ -147,6 +159,7 @@ impl RedisClient { cmd.arg(key).arg(ttl); self.command(&cmd, call_fn) } + pub fn persist(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("persist"); cmd.arg(key); @@ -159,6 +172,7 @@ impl RedisClient { cmd.arg(key); self.command(&cmd, call_fn) } + pub fn set( &self, key: &str, @@ -169,6 +183,7 @@ impl RedisClient { cmd.arg(key).arg(value); self.command(&cmd, call_fn) } + pub fn setex( &self, key: &str, @@ -180,6 +195,7 @@ impl RedisClient { cmd.arg(key).arg(ttl).arg(value); self.command(&cmd, call_fn) } + pub fn mget(&self, keys: Vec<&str>, call_fn: Box) -> Result { let mut cmd = redis::cmd("mget"); for key in keys { @@ -187,6 +203,7 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn mset( &self, kv_map: HashMap<&str, T>, @@ -198,16 +215,19 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn incr(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("incr"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn decr(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("decr"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn incrby( &self, key: &str, @@ -218,6 +238,7 @@ impl RedisClient { cmd.arg(key).arg(delta); self.command(&cmd, call_fn) } + pub fn decrby( &self, key: &str, @@ -235,6 +256,7 @@ impl RedisClient { cmd.arg(key); self.command(&cmd, call_fn) } + pub fn rpush( &self, key: &str, @@ -248,11 +270,13 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn rpop(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("rpop"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn lpush( &self, key: &str, @@ -266,11 +290,13 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn lpop(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("lpop"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn lindex( &self, key: &str, @@ -281,6 +307,7 @@ impl RedisClient { cmd.arg(key).arg(index); self.command(&cmd, call_fn) } + pub fn lrange( &self, key: &str, @@ -292,6 +319,7 @@ impl RedisClient { cmd.arg(key).arg(start).arg(stop); self.command(&cmd, call_fn) } + pub fn lrem( &self, key: &str, @@ -303,6 +331,7 @@ impl RedisClient { cmd.arg(key).arg(count).arg(value); self.command(&cmd, call_fn) } + pub fn linsert_before( &self, key: &str, @@ -314,6 +343,7 @@ impl RedisClient { cmd.arg(key).arg("before").arg(pivot).arg(value); self.command(&cmd, call_fn) } + pub fn linsert_after( &self, key: &str, @@ -338,6 +368,7 @@ impl RedisClient { cmd.arg(key).arg(field); self.command(&cmd, call_fn) } + pub fn hdel( &self, key: &str, @@ -351,11 +382,13 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn hlen(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("hlen"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn hget( &self, key: &str, @@ -366,6 +399,7 @@ impl RedisClient { cmd.arg(key).arg(field); self.command(&cmd, call_fn) } + pub fn hset( &self, key: &str, @@ -377,6 +411,7 @@ impl RedisClient { cmd.arg(key).arg(field).arg(value); self.command(&cmd, call_fn) } + pub fn hmget( &self, key: &str, @@ -390,6 +425,7 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn hmset( &self, key: &str, @@ -403,21 +439,25 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn hkeys(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("hkeys"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn hvals(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("hvals"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn hgetall(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("hgetall"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn hincrby( &self, key: &str, @@ -429,6 +469,7 @@ impl RedisClient { cmd.arg(key).arg(field).arg(delta); self.command(&cmd, call_fn) } + pub fn hincrbyfloat( &self, key: &str, @@ -447,6 +488,7 @@ impl RedisClient { cmd.arg(key); self.command(&cmd, call_fn) } + pub fn sadd( &self, key: &str, @@ -460,6 +502,7 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn srem( &self, key: &str, @@ -473,6 +516,7 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn sismember( &self, key: &str, @@ -483,11 +527,13 @@ impl RedisClient { cmd.arg(key).arg(value); self.command(&cmd, call_fn) } + pub fn smembers(&self, key: &str, call_fn: Box) -> Result { let mut cmd = redis::cmd("smembers"); cmd.arg(key); self.command(&cmd, call_fn) } + pub fn sdiff( &self, key1: &str, @@ -498,6 +544,7 @@ impl RedisClient { cmd.arg(key1).arg(key2); self.command(&cmd, call_fn) } + pub fn sdiffstore( &self, destination: &str, @@ -509,6 +556,7 @@ impl RedisClient { cmd.arg(destination).arg(key1).arg(key2); self.command(&cmd, call_fn) } + pub fn sinter( &self, key1: &str, @@ -519,6 +567,7 @@ impl RedisClient { cmd.arg(key1).arg(key2); self.command(&cmd, call_fn) } + pub fn sinterstore( &self, destination: &str, @@ -530,6 +579,7 @@ impl RedisClient { cmd.arg(destination).arg(key1).arg(key2); self.command(&cmd, call_fn) } + pub fn sunion( &self, key1: &str, @@ -540,6 +590,7 @@ impl RedisClient { cmd.arg(key1).arg(key2); self.command(&cmd, call_fn) } + pub fn sunion_store( &self, destination: &str, @@ -558,6 +609,7 @@ impl RedisClient { cmd.arg(key); self.command(&cmd, call_fn) } + pub fn zadd( &self, key: &str, @@ -571,6 +623,7 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn zcount( &self, key: &str, @@ -582,6 +635,7 @@ impl RedisClient { cmd.arg(key).arg(min).arg(max); self.command(&cmd, call_fn) } + pub fn zincrby( &self, key: &str, @@ -593,6 +647,7 @@ impl RedisClient { cmd.arg(key).arg(delta).arg(member); self.command(&cmd, call_fn) } + pub fn zscore( &self, key: &str, @@ -603,6 +658,7 @@ impl RedisClient { cmd.arg(key).arg(member); self.command(&cmd, call_fn) } + pub fn zrank( &self, key: &str, @@ -613,6 +669,7 @@ impl RedisClient { cmd.arg(key).arg(member); self.command(&cmd, call_fn) } + pub fn zrev_rank( &self, key: &str, @@ -623,6 +680,7 @@ impl RedisClient { cmd.arg(key).arg(member); self.command(&cmd, call_fn) } + pub fn zrem( &self, key: &str, @@ -636,6 +694,7 @@ impl RedisClient { } self.command(&cmd, call_fn) } + pub fn zrange( &self, key: &str, @@ -647,6 +706,7 @@ impl RedisClient { cmd.arg(key).arg(start).arg(stop); self.command(&cmd, call_fn) } + pub fn zrevrange( &self, key: &str, diff --git a/plugins/wasm-rust/src/request_wrapper.rs b/plugins/wasm-rust/src/request_wrapper.rs index c9a997456c..f37fd8f498 100644 --- a/plugins/wasm-rust/src/request_wrapper.rs +++ b/plugins/wasm-rust/src/request_wrapper.rs @@ -14,6 +14,7 @@ fn get_request_head(head: &str, log_flag: &str) -> String { String::new() } } + pub fn get_request_scheme() -> String { get_request_head(":scheme", "head") } @@ -57,6 +58,7 @@ pub fn is_binary_response_body() -> bool { } false } + pub fn has_request_body() -> bool { let content_type = internal::get_http_request_header("content-type"); let content_length_str = internal::get_http_request_header("content-length");