Skip to content

Commit

Permalink
add blank line
Browse files Browse the repository at this point in the history
  • Loading branch information
007gzs committed Oct 28, 2024
1 parent c9dddff commit 64b21d1
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 0 deletions.
9 changes: 9 additions & 0 deletions plugins/wasm-rust/src/cluster_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ pub trait Cluster {
fn cluster_name(&self) -> String;
fn host_name(&self) -> String;
}

#[derive(Debug, Clone)]
pub struct RouteCluster {
host: String,
}

impl RouteCluster {
pub fn new(host: &str) -> Self {
RouteCluster {
host: host.to_string(),
}
}
}

impl Cluster for RouteCluster {
fn cluster_name(&self) -> String {
if let Some(res) = get_property(vec!["cluster_name"]) {
Expand Down Expand Up @@ -111,6 +114,7 @@ impl NacosCluster {
}
}
}

impl Cluster for NacosCluster {
fn cluster_name(&self) -> String {
let group = if self.group.is_empty() {
Expand Down Expand Up @@ -154,6 +158,7 @@ impl StaticIpCluster {
}
}
}

impl Cluster for StaticIpCluster {
fn cluster_name(&self) -> String {
format!("outbound|{}||{}.static", self.port, self.service_name)
Expand Down Expand Up @@ -184,6 +189,7 @@ impl DnsCluster {
}
}
}

impl Cluster for DnsCluster {
fn cluster_name(&self) -> String {
format!("outbound|{}||{}.dns", self.port, self.service_name)
Expand Down Expand Up @@ -212,6 +218,7 @@ impl ConsulCluster {
}
}
}

impl Cluster for ConsulCluster {
fn cluster_name(&self) -> String {
format!(
Expand Down Expand Up @@ -245,10 +252,12 @@ impl FQDNCluster {
}
}
}

impl Cluster for FQDNCluster {
fn cluster_name(&self) -> String {
format!("outbound|{}||{}", self.port, self.fqdn)
}

fn host_name(&self) -> String {
if self.host.is_empty() {
self.fqdn.clone()
Expand Down
1 change: 1 addition & 0 deletions plugins/wasm-rust/src/event_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl Iterator for EventStream {
None
}
}

impl EventStream {
pub fn new() -> Self {
EventStream {
Expand Down
1 change: 1 addition & 0 deletions plugins/wasm-rust/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ pub(crate) fn redis_init(
) -> Result<(), Status> {
hostcalls::redis_init(upstream, username, password, timeout)
}

pub(crate) fn dispatch_redis_call(
upstream: &str,
query: &[u8],
Expand Down
25 changes: 25 additions & 0 deletions plugins/wasm-rust/src/plugin_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use serde::de::DeserializeOwned;
lazy_static! {
static ref LOG: Log = Log::new("plugin_wrapper".to_string());
}

thread_local! {
static HTTP_CALLBACK_DISPATCHER: HttpCallbackDispatcher = HttpCallbackDispatcher::new();
}
Expand All @@ -49,7 +50,9 @@ where
None => None,
}
}

fn rule_matcher(&self) -> &SharedRuleMatcher<PluginConfig>;

fn create_http_context_wrapper(
&self,
_context_id: u32,
Expand All @@ -63,20 +66,24 @@ pub type HttpCallbackFn = dyn FnOnce(u16, &MultiMap<String, String>, Option<Vec<
pub struct HttpCallbackDispatcher {
call_fns: RefCell<HashMap<u32, Box<HttpCallbackFn>>>,
}

impl Default for HttpCallbackDispatcher {
fn default() -> Self {
Self::new()
}
}

impl HttpCallbackDispatcher {
pub fn new() -> Self {
HttpCallbackDispatcher {
call_fns: RefCell::new(HashMap::new()),
}
}

pub fn set(&self, token_id: u32, arg: Box<HttpCallbackFn>) {
self.call_fns.borrow_mut().insert(token_id, arg);
}

pub fn pop(&self, token_id: u32) -> Option<Box<HttpCallbackFn>> {
self.call_fns.borrow_mut().remove(&token_id)
}
Expand All @@ -91,41 +98,51 @@ where
_self_weak: Weak<RefCell<Box<dyn HttpContextWrapper<PluginConfig>>>>,
) {
}

fn log(&self) -> &Log {
&LOG
}

fn on_config(&mut self, _config: Rc<PluginConfig>) {}

fn on_http_request_complete_headers(
&mut self,
_headers: &MultiMap<String, String>,
) -> HeaderAction {
HeaderAction::Continue
}

fn on_http_response_complete_headers(
&mut self,
_headers: &MultiMap<String, String>,
) -> HeaderAction {
HeaderAction::Continue
}

fn cache_request_body(&self) -> bool {
false
}

fn cache_response_body(&self) -> bool {
false
}

fn on_http_request_complete_body(&mut self, _req_body: &Bytes) -> DataAction {
DataAction::Continue
}

fn on_http_response_complete_body(&mut self, _res_body: &Bytes) -> DataAction {
DataAction::Continue
}

fn replace_http_request_body(&mut self, body: &[u8]) {
self.set_http_request_body(0, i32::MAX as usize, body)
}

fn replace_http_response_body(&mut self, body: &[u8]) {
self.set_http_response_body(0, i32::MAX as usize, body)
}

#[allow(clippy::too_many_arguments)]
fn http_call(
&mut self,
Expand Down Expand Up @@ -187,6 +204,7 @@ pub struct PluginHttpWrapper<PluginConfig> {
rule_matcher: SharedRuleMatcher<PluginConfig>,
http_content: Rc<RefCell<Box<dyn HttpContextWrapper<PluginConfig>>>>,
}

impl<PluginConfig> PluginHttpWrapper<PluginConfig>
where
PluginConfig: Default + DeserializeOwned + Clone + 'static,
Expand All @@ -207,10 +225,12 @@ where
http_content: rc_content,
}
}

fn get_http_call_fn(&mut self, token_id: u32) -> Option<Box<HttpCallbackFn>> {
HTTP_CALLBACK_DISPATCHER.with(|dispatcher| dispatcher.pop(token_id))
}
}

impl<PluginConfig> Context for PluginHttpWrapper<PluginConfig>
where
PluginConfig: Default + DeserializeOwned + Clone + 'static,
Expand Down Expand Up @@ -272,21 +292,25 @@ where
.borrow_mut()
.on_grpc_call_response(token_id, status_code, response_size)
}

fn on_grpc_stream_initial_metadata(&mut self, token_id: u32, num_elements: u32) {
self.http_content
.borrow_mut()
.on_grpc_stream_initial_metadata(token_id, num_elements)
}

fn on_grpc_stream_message(&mut self, token_id: u32, message_size: usize) {
self.http_content
.borrow_mut()
.on_grpc_stream_message(token_id, message_size)
}

fn on_grpc_stream_trailing_metadata(&mut self, token_id: u32, num_elements: u32) {
self.http_content
.borrow_mut()
.on_grpc_stream_trailing_metadata(token_id, num_elements)
}

fn on_grpc_stream_close(&mut self, token_id: u32, status_code: u32) {
self.http_content
.borrow_mut()
Expand All @@ -297,6 +321,7 @@ where
self.http_content.borrow_mut().on_done()
}
}

impl<PluginConfig> HttpContext for PluginHttpWrapper<PluginConfig>
where
PluginConfig: Default + DeserializeOwned + Clone + 'static,
Expand Down
Loading

0 comments on commit 64b21d1

Please sign in to comment.