From e86e9158e176fb54af1621553fb77b489c81229d Mon Sep 17 00:00:00 2001 From: Leon Matthes Date: Tue, 10 Dec 2024 08:29:17 +0100 Subject: [PATCH 1/5] Return Result when creating DaprHttpServer Especially when running custom docker setups, the container that contains the sidecar may not be running exactly when the Rust code starts running. To fix this, before we needed to sleep(2s) to avoid a panic in the Rust program. With this patch, this can be handled on the user side (e.g. the connection can be retried multiple times with timeouts in-between). Signed-off-by: Leon Matthes --- dapr/src/server/http.rs | 39 ++++++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/dapr/src/server/http.rs b/dapr/src/server/http.rs index f0e0063f..802d6530 100644 --- a/dapr/src/server/http.rs +++ b/dapr/src/server/http.rs @@ -84,6 +84,12 @@ pub struct DaprHttpServer { impl DaprHttpServer { /// Creates a new instance of the Dapr HTTP server with default options. + /// + /// # Panics + /// + /// This function panics if the Dapr Sidecar cannot be reached! + /// For a non-panicking version that allows you to handle any errors yourself, see: + /// [DaprHttpServer::try_new_with_dapr_port] pub async fn new() -> Self { let dapr_port: u16 = std::env::var("DAPR_GRPC_PORT") .unwrap_or("3501".into()) @@ -92,19 +98,38 @@ impl DaprHttpServer { Self::with_dapr_port(dapr_port).await } + /// Creates a new instance of the Dapr HTTP server that connects to the Dapr sidecar on the + /// given dapr_port. + /// + /// # Panics + /// + /// This function panics if the Dapr Sidecar cannot be reached! + /// For a non-panicking version that allows you to handle any errors yourself, see: + /// [DaprHttpServer::try_new_with_dapr_port] pub async fn with_dapr_port(dapr_port: u16) -> Self { - let dapr_addr = format!("https://127.0.0.1:{}", dapr_port); - - let cc = match TonicClient::connect(dapr_addr).await { + match Self::try_new_with_dapr_port(dapr_port).await { Ok(c) => c, Err(err) => panic!("failed to connect to dapr: {}", err), - }; + } + } + + /// Creates a new instance of the Dapr HTTP server that connects to the Dapr sidecar on the + /// given dapr_port. + /// + /// In contrast to the other functions that create a DaprHttpServer, this function does + /// not panic, but instead returns a Result. + pub async fn try_new_with_dapr_port( + dapr_port: u16, + ) -> Result> { + let dapr_addr = format!("https://127.0.0.1:{}", dapr_port); + + let cc = TonicClient::connect(dapr_addr).await?; let rt = ActorRuntime::new(cc); - DaprHttpServer { + Ok(DaprHttpServer { actor_runtime: Arc::new(rt), shutdown_signal: None, - } + }) } pub fn with_graceful_shutdown(self, signal: F) -> Self @@ -138,7 +163,7 @@ impl DaprHttpServer { .unwrap_or(8080); let address = format!("127.0.0.1:{}", port.unwrap_or(default_port)); - let listener = TcpListener::bind(address).await.unwrap(); + let listener = TcpListener::bind(address).await?; let server = axum::serve(listener, app.into_make_service()); From a5296a05a663b239a33982c7d24eb642e9db127b Mon Sep 17 00:00:00 2001 From: Mike Nguyen Date: Mon, 16 Dec 2024 12:07:12 +0000 Subject: [PATCH 2/5] release: v0.16.0-rc.4 Signed-off-by: Mike Nguyen --- Cargo.toml | 2 +- README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 54ab49e9..bfba581e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ tonic = "0.12.1" tonic-build = "0.12.1" [workspace.package] -version = "0.16.0-rc.3" +version = "0.16.0-rc.4" authors = [ "Mike Nguyen ", "The Dapr Authors " diff --git a/README.md b/README.md index bc417834..22101a48 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ Add the following to your `Cargo.toml` file: ```toml [dependencies] -dapr = "0.16.0-rc.3" +dapr = "0.16.0-rc.4" ``` Here's a basic example to create a client: From 3be7ef4527380c6df458fc7d658f6c54754b5482 Mon Sep 17 00:00:00 2001 From: Mike Nguyen Date: Mon, 16 Dec 2024 12:15:21 +0000 Subject: [PATCH 3/5] chore(deps): remove unused crates Signed-off-by: Mike Nguyen --- Cargo.toml | 2 -- dapr-macros/Cargo.toml | 3 --- dapr/Cargo.toml | 2 -- 3 files changed, 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bfba581e..9d0e1ee0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,6 @@ resolver = "2" [workspace.dependencies] async-trait = "0.1" prost = "0.13.1" -prost-build = "0.13.1" prost-types = "0.13.1" serde = "1.0" @@ -21,7 +20,6 @@ serde_json = "1.0" tokio = "1.39" tokio-stream = "0.1" -tokio-test = "0.4" tokio-util = "0.7" tonic = "0.12.1" diff --git a/dapr-macros/Cargo.toml b/dapr-macros/Cargo.toml index 748e0760..18211764 100644 --- a/dapr-macros/Cargo.toml +++ b/dapr-macros/Cargo.toml @@ -12,9 +12,6 @@ rust-version.workspace = true proc-macro = true [dependencies] -async-trait = { workspace = true } -axum = "0.7" -log = "0.4" proc-macro2 = "1.0" quote = "1.0" syn = { version = "2.0", features = ["full"] } diff --git a/dapr/Cargo.toml b/dapr/Cargo.toml index 47be9a99..c12dd562 100644 --- a/dapr/Cargo.toml +++ b/dapr/Cargo.toml @@ -13,7 +13,6 @@ rust-version.workspace = true [dependencies] async-trait = { workspace = true } axum = "0.7" -bytes = "1.7" chrono = "0.4" futures = "0.3" log = "0.4" @@ -33,5 +32,4 @@ dapr = { path = "./" } dapr-macros = { path = "../dapr-macros" } tokio = { workspace = true, features = ["full"] } uuid = { version = "1.10", features = ["v4"] } -tokio-test = { workspace = true } tokio-stream = { workspace = true } \ No newline at end of file From 8b97f4a48ff032945f22d083a48468aa921c0eb3 Mon Sep 17 00:00:00 2001 From: Mike Nguyen Date: Mon, 16 Dec 2024 15:28:26 +0000 Subject: [PATCH 4/5] doc: missing expression closure Signed-off-by: Mike Nguyen --- daprdocs/content/en/rust-sdk-docs/rust-client/_index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daprdocs/content/en/rust-sdk-docs/rust-client/_index.md b/daprdocs/content/en/rust-sdk-docs/rust-client/_index.md index 2d34e518..1dfe3a85 100644 --- a/daprdocs/content/en/rust-sdk-docs/rust-client/_index.md +++ b/daprdocs/content/en/rust-sdk-docs/rust-client/_index.md @@ -34,7 +34,7 @@ dapr = "0.16.0" You can either reference `dapr::Client` or bind the full path to a new name as follows: ```rust -use dapr::Client as DaprClient +use dapr::Client as DaprClient; ``` ## Instantiating the Dapr client @@ -43,7 +43,7 @@ use dapr::Client as DaprClient let addr = "https://127.0.0.1".to_string(); let mut client = dapr::Client::::connect(addr, - port).await?; +port).await?; ``` ## Building blocks From 658dcc4abdc88669d48a0ed01b35ba656655e58a Mon Sep 17 00:00:00 2001 From: Mike Nguyen Date: Mon, 16 Dec 2024 15:34:43 +0000 Subject: [PATCH 5/5] refactor: lint issues and correctness improvements Signed-off-by: Mike Nguyen --- .github/workflows/dapr-bot/Cargo.toml | 2 +- dapr/src/appcallback.rs | 19 ++++++++-------- dapr/src/client.rs | 24 ++++++++++----------- examples/Cargo.toml | 1 - examples/src/actors/client.rs | 2 +- examples/src/bindings/input.rs | 2 +- examples/src/bindings/output.rs | 4 ++-- examples/src/client/client.rs | 2 +- examples/src/configuration/main.rs | 6 +++--- examples/src/conversation/main.rs | 3 +-- examples/src/invoke/grpc-proxying/client.rs | 4 ++-- examples/src/invoke/grpc/client.rs | 4 ++-- examples/src/pubsub/publisher.rs | 4 ++-- examples/src/query_state/query1.rs | 2 +- examples/src/query_state/query2.rs | 2 +- 15 files changed, 39 insertions(+), 42 deletions(-) diff --git a/.github/workflows/dapr-bot/Cargo.toml b/.github/workflows/dapr-bot/Cargo.toml index 34893476..288686e6 100644 --- a/.github/workflows/dapr-bot/Cargo.toml +++ b/.github/workflows/dapr-bot/Cargo.toml @@ -11,6 +11,6 @@ rust-version = "1.70.0" [dependencies] exitcode = "1.1.2" -octocrab = "0.34.1" +octocrab = "0.42.1" serde_json = "1.0.114" tokio = { version = "1.36.0", features = ["full"] } diff --git a/dapr/src/appcallback.rs b/dapr/src/appcallback.rs index 87675ee5..e3164c6c 100644 --- a/dapr/src/appcallback.rs +++ b/dapr/src/appcallback.rs @@ -1,37 +1,36 @@ -use crate::dapr; use crate::dapr::proto::runtime::v1::app_callback_server::AppCallback; use crate::dapr::proto::{common, runtime}; use std::collections::HashMap; use tonic::{Code, Request, Response, Status}; /// InvokeRequest is the message to invoke a method with the data. -pub type InvokeRequest = dapr::proto::common::v1::InvokeRequest; +pub type InvokeRequest = common::v1::InvokeRequest; /// InvokeResponse is the response message inclduing data and its content type /// from app callback. -pub type InvokeResponse = dapr::proto::common::v1::InvokeResponse; +pub type InvokeResponse = common::v1::InvokeResponse; /// ListTopicSubscriptionsResponse is the message including the list of the subscribing topics. -pub type ListTopicSubscriptionsResponse = dapr::proto::runtime::v1::ListTopicSubscriptionsResponse; +pub type ListTopicSubscriptionsResponse = runtime::v1::ListTopicSubscriptionsResponse; /// TopicSubscription represents a topic and it's metadata (session id etc.) -pub type TopicSubscription = dapr::proto::runtime::v1::TopicSubscription; +pub type TopicSubscription = runtime::v1::TopicSubscription; /// TopicEventRequest message is compatiable with CloudEvent spec v1.0. -pub type TopicEventRequest = dapr::proto::runtime::v1::TopicEventRequest; +pub type TopicEventRequest = runtime::v1::TopicEventRequest; /// TopicEventResponse is response from app on published message -pub type TopicEventResponse = dapr::proto::runtime::v1::TopicEventResponse; +pub type TopicEventResponse = runtime::v1::TopicEventResponse; /// ListInputBindingsResponse is the message including the list of input bindings. -pub type ListInputBindingsResponse = dapr::proto::runtime::v1::ListInputBindingsResponse; +pub type ListInputBindingsResponse = runtime::v1::ListInputBindingsResponse; /// BindingEventRequest represents input bindings event. -pub type BindingEventRequest = dapr::proto::runtime::v1::BindingEventRequest; +pub type BindingEventRequest = runtime::v1::BindingEventRequest; /// BindingEventResponse includes operations to save state or /// send data to output bindings optionally. -pub type BindingEventResponse = dapr::proto::runtime::v1::BindingEventResponse; +pub type BindingEventResponse = runtime::v1::BindingEventResponse; impl ListTopicSubscriptionsResponse { /// Create `ListTopicSubscriptionsResponse` with a topic. diff --git a/dapr/src/client.rs b/dapr/src/client.rs index a042ccbc..b6461602 100644 --- a/dapr/src/client.rs +++ b/dapr/src/client.rs @@ -656,6 +656,13 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient { Ok(dapr_v1::dapr_client::DaprClient::connect(addr).await?) } + async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error> { + self.publish_event(Request::new(request)) + .await? + .into_inner(); + Ok(()) + } + async fn invoke_service( &mut self, request: InvokeServiceRequest, @@ -676,13 +683,6 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient { .into_inner()) } - async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error> { - self.publish_event(Request::new(request)) - .await? - .into_inner(); - Ok(()) - } - async fn get_secret(&mut self, request: GetSecretRequest) -> Result { Ok(self.get_secret(Request::new(request)).await?.into_inner()) } @@ -701,6 +701,11 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient { Ok(self.get_state(Request::new(request)).await?.into_inner()) } + async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error> { + self.save_state(Request::new(request)).await?.into_inner(); + Ok(()) + } + async fn query_state_alpha1( &mut self, request: QueryStateRequest, @@ -711,11 +716,6 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient { .into_inner()) } - async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error> { - self.save_state(Request::new(request)).await?.into_inner(); - Ok(()) - } - async fn delete_state(&mut self, request: DeleteStateRequest) -> Result<(), Error> { self.delete_state(Request::new(request)).await?.into_inner(); Ok(()) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ccdbd8a1..f2ebc7e3 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -10,7 +10,6 @@ rust-version.workspace = true [dependencies] async-trait = { workspace = true } -base64 = "0.22" dapr = { path = "../dapr" } dapr-macros = { path = "../dapr-macros" } env_logger = "0.11" diff --git a/examples/src/actors/client.rs b/examples/src/actors/client.rs index 11eec37e..fd5ff196 100644 --- a/examples/src/actors/client.rs +++ b/examples/src/actors/client.rs @@ -14,7 +14,7 @@ pub struct MyRequest { async fn main() -> Result<(), Box> { // TODO: Handle this issue in the sdk // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(2, 0)); + tokio::time::sleep(std::time::Duration::new(2, 0)).await; // Define the Dapr address let addr = "https://127.0.0.1".to_string(); diff --git a/examples/src/bindings/input.rs b/examples/src/bindings/input.rs index f305fd61..39aed46c 100644 --- a/examples/src/bindings/input.rs +++ b/examples/src/bindings/input.rs @@ -61,7 +61,7 @@ impl AppCallback for AppCallbackService { let name = &r.name; let data = &r.data; - let message = String::from_utf8_lossy(&data); + let message = String::from_utf8_lossy(data); println!("Binding Name: {}", &name); println!("Message: {}", &message); diff --git a/examples/src/bindings/output.rs b/examples/src/bindings/output.rs index 608a45cb..b26235e5 100644 --- a/examples/src/bindings/output.rs +++ b/examples/src/bindings/output.rs @@ -1,10 +1,10 @@ -use std::{collections::HashMap, thread, time::Duration}; +use std::{collections::HashMap, time::Duration}; #[tokio::main] async fn main() -> Result<(), Box> { // TODO: Handle this issue in the sdk // Introduce delay so that dapr grpc port is assigned before app tries to connect - thread::sleep(Duration::from_secs(2)); + tokio::time::sleep(Duration::from_secs(2)).await; // Get the Dapr port and create a connection let addr = "https://127.0.0.1".to_string(); diff --git a/examples/src/client/client.rs b/examples/src/client/client.rs index d07c54c6..e6000be1 100644 --- a/examples/src/client/client.rs +++ b/examples/src/client/client.rs @@ -2,7 +2,7 @@ async fn main() -> Result<(), Box> { // TODO: Handle this issue in the sdk // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(2, 0)); + tokio::time::sleep(std::time::Duration::new(2, 0)).await; // Set the Dapr address let addr = "https://127.0.0.1".to_string(); diff --git a/examples/src/configuration/main.rs b/examples/src/configuration/main.rs index 0413b5a2..0f202dc6 100644 --- a/examples/src/configuration/main.rs +++ b/examples/src/configuration/main.rs @@ -7,7 +7,7 @@ type DaprClient = dapr::Client; async fn main() -> Result<(), Box> { // TODO: Handle this issue in the sdk // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(2, 0)); + tokio::time::sleep(std::time::Duration::new(2, 0)).await; // Set the Dapr address let addr = "https://127.0.0.1".to_string(); @@ -19,14 +19,14 @@ async fn main() -> Result<(), Box> { // get key-value pair in the state store let response = client - .get_configuration(CONFIGSTORE_NAME, vec![(&key)], None) + .get_configuration(CONFIGSTORE_NAME, vec![&key], None) .await?; let val = response.items.get("hello").unwrap(); println!("Configuration value: {val:?}"); // Subscribe for configuration changes let mut stream = client - .subscribe_configuration(CONFIGSTORE_NAME, vec![(&key)], None) + .subscribe_configuration(CONFIGSTORE_NAME, vec![&key], None) .await?; let mut subscription_id = String::new(); diff --git a/examples/src/conversation/main.rs b/examples/src/conversation/main.rs index 6ca4a890..3caf4b75 100644 --- a/examples/src/conversation/main.rs +++ b/examples/src/conversation/main.rs @@ -1,5 +1,4 @@ use dapr::client::{ConversationInputBuilder, ConversationRequestBuilder}; -use std::thread; use std::time::Duration; type DaprClient = dapr::Client; @@ -7,7 +6,7 @@ type DaprClient = dapr::Client; #[tokio::main] async fn main() -> Result<(), Box> { // Sleep to allow for the server to become available - thread::sleep(Duration::from_secs(5)); + tokio::time::sleep(Duration::from_secs(5)).await; // Set the Dapr address let address = "https://127.0.0.1".to_string(); diff --git a/examples/src/invoke/grpc-proxying/client.rs b/examples/src/invoke/grpc-proxying/client.rs index 3cdd49a5..35c66e50 100644 --- a/examples/src/invoke/grpc-proxying/client.rs +++ b/examples/src/invoke/grpc-proxying/client.rs @@ -1,4 +1,4 @@ -use std::{thread, time::Duration}; +use std::time::Duration; use hello_world::{greeter_client::GreeterClient, HelloRequest}; @@ -11,7 +11,7 @@ pub mod hello_world { #[tokio::main] async fn main() -> Result<(), Box> { // Sleep to allow for the server to become available - thread::sleep(Duration::from_secs(5)); + tokio::time::sleep(Duration::from_secs(5)).await; // Get the Dapr port and create a connection let port: u16 = std::env::var("DAPR_GRPC_PORT").unwrap().parse().unwrap(); diff --git a/examples/src/invoke/grpc/client.rs b/examples/src/invoke/grpc/client.rs index bda11d8b..c05ef29f 100644 --- a/examples/src/invoke/grpc/client.rs +++ b/examples/src/invoke/grpc/client.rs @@ -1,5 +1,5 @@ use crate::hello_world::HelloReply; -use std::{thread, time::Duration}; +use std::time::Duration; use prost::Message; @@ -12,7 +12,7 @@ type DaprClient = dapr::Client; #[tokio::main] async fn main() -> Result<(), Box> { // Sleep to allow for the server to become available - thread::sleep(Duration::from_secs(5)); + tokio::time::sleep(Duration::from_secs(5)).await; // Set the Dapr address let address = "https://127.0.0.1".to_string(); diff --git a/examples/src/pubsub/publisher.rs b/examples/src/pubsub/publisher.rs index 71086a15..1d645163 100644 --- a/examples/src/pubsub/publisher.rs +++ b/examples/src/pubsub/publisher.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, thread, time::Duration}; +use std::{collections::HashMap, time::Duration}; use tokio::time; @@ -21,7 +21,7 @@ struct Refund { async fn main() -> Result<(), Box> { // TODO: Handle this issue in the sdk // Introduce delay so that dapr grpc port is assigned before app tries to connect - thread::sleep(Duration::from_secs(2)); + tokio::time::sleep(Duration::from_secs(2)).await; // Set address for Dapr connection let addr = "https://127.0.0.1".to_string(); diff --git a/examples/src/query_state/query1.rs b/examples/src/query_state/query1.rs index dc6859f8..bc682371 100644 --- a/examples/src/query_state/query1.rs +++ b/examples/src/query_state/query1.rs @@ -3,7 +3,7 @@ use serde_json::json; #[tokio::main] async fn main() -> Result<(), Box> { // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(5, 0)); + tokio::time::sleep(std::time::Duration::new(5, 0)).await; // Set the Dapr address and create a connection let addr = "https://127.0.0.1".to_string(); diff --git a/examples/src/query_state/query2.rs b/examples/src/query_state/query2.rs index e8e0c7cb..35c6d104 100644 --- a/examples/src/query_state/query2.rs +++ b/examples/src/query_state/query2.rs @@ -3,7 +3,7 @@ use serde_json::json; #[tokio::main] async fn main() -> Result<(), Box> { // Introduce delay so that dapr grpc port is assigned before app tries to connect - std::thread::sleep(std::time::Duration::new(5, 0)); + tokio::time::sleep(std::time::Duration::new(5, 0)).await; // Set the Dapr address and create a connection let addr = "https://127.0.0.1".to_string();