Skip to content

Commit

Permalink
Added distributed lock
Browse files Browse the repository at this point in the history
Signed-off-by: Zachary Edgell <[email protected]>
  • Loading branch information
zedgell committed Mar 27, 2024
1 parent ac71bb7 commit a1843fd
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/validate-examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:
fail-fast: false
matrix:
examples:
[ "actors", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "secrets-bulk" ]
[ "actors", "client", "configuration", "crypto", "distributed-lock" "invoke/grpc", "invoke/grpc-proxying", "pubsub", "secrets-bulk" ]
steps:
- name: Check out code
uses: actions/checkout@v4
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ path = "examples/configuration/main.rs"
name = "crypto"
path = "examples/crypto/main.rs"

[[example]]
name = "distributed-lock"
path = "examples/distributed-lock/main.rs"

[[example]]
name = "invoke-grpc-client"
path = "examples/invoke/grpc/client.rs"
Expand Down
31 changes: 31 additions & 0 deletions examples/distributed-lock/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Distributed Lock

This is a simple example that demonstrates Dapr's Distributed Lock capabilities.

> **Note:** Make sure to use latest version of proto bindings.
## Running

To run this example:

1. Run the multi-app run template:

<!-- STEP
name: Run multi-app
output_match_mode: substring
match_order: none
expected_stdout_lines:
- '== APP - distributed-lock-example == Successfully locked my-data'
- '== APP - distributed-lock-example == Successfully unlocked my-data'
background: true
sleep: 30
timeout_seconds: 90
-->

```bash
dapr run -f .
```

<!-- END_STEP -->

2. Stop with `ctrl + c`
12 changes: 12 additions & 0 deletions examples/distributed-lock/components/local-storage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: lockstore
spec:
type: lock.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
14 changes: 14 additions & 0 deletions examples/distributed-lock/components/statestore.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
10 changes: 10 additions & 0 deletions examples/distributed-lock/dapr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: 1
common:
daprdLogDestination: console
apps:
- appID: distributed-lock-example
appDirPath: ./
daprGRPCPort: 35002
logLevel: debug
command: [ "cargo", "run", "--example", "distributed-lock" ]
resourcesPath: ./components
43 changes: 43 additions & 0 deletions examples/distributed-lock/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
sleep(std::time::Duration::new(2, 0)).await;
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let addr = format!("https://127.0.0.1:{}", port);

let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;

let files = vec![("my-data", b"some-data".to_vec())];

client.save_state("statestore", files).await.unwrap();

let result = client
.lock(dapr::client::TryLockRequest {
store_name: "lockstore".to_string(),
resource_id: "my-data".to_string(),
lock_owner: "some-random-id".to_string(),
expiry_in_seconds: 60,
})
.await
.unwrap();

assert!(result.success);

println!("Successfully locked my-data");

let result = client
.unlock(dapr::client::UnlockRequest {
store_name: "lockstore".to_string(),
resource_id: "my-data".to_string(),
lock_owner: "some-random-id".to_string(),
})
.await
.unwrap();

assert_eq!(0, result.status);

println!("Successfully unlocked my-data");

Ok(())
}
52 changes: 52 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,24 @@ impl<T: DaprInterface> Client<T> {
.collect();
self.0.decrypt(requested_items).await
}

/// Distributed lock request call
///
/// # Arguments
///
/// * `request` - Request to be made, TryLockRequest
pub async fn lock(&mut self, request: TryLockRequest) -> Result<TryLockResponse, Error> {
self.0.lock(request).await
}

/// Distributed lock request call
///
/// # Arguments
///
/// * `request` - Request to be made, TryLockRequest
pub async fn unlock(&mut self, request: UnlockRequest) -> Result<UnlockResponse, Error> {
self.0.unlock(request).await
}
}

#[async_trait]
Expand Down Expand Up @@ -501,6 +519,10 @@ pub trait DaprInterface: Sized {
-> Result<Vec<StreamPayload>, Status>;

async fn decrypt(&mut self, payload: Vec<DecryptRequest>) -> Result<Vec<u8>, Status>;

async fn lock(&mut self, request: TryLockRequest) -> Result<TryLockResponse, Error>;

async fn unlock(&mut self, request: UnlockRequest) -> Result<UnlockResponse, Error>;
}

#[async_trait]
Expand Down Expand Up @@ -661,6 +683,24 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
}
Ok(data)
}

/// Distributed lock request call
///
/// # Arguments
///
/// * `request` - Request to be made, TryLockRequest
async fn lock(&mut self, request: TryLockRequest) -> Result<TryLockResponse, Error> {
Ok(self.try_lock_alpha1(request).await?.into_inner())
}

/// Distributed unlock request call
///
/// # Arguments
///
/// * `request` - Request to be made, UnlockRequest
async fn unlock(&mut self, request: UnlockRequest) -> Result<UnlockResponse, Error> {
Ok(self.unlock_alpha1(request).await?.into_inner())
}
}

/// A request from invoking a service
Expand Down Expand Up @@ -752,6 +792,18 @@ pub type EncryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::EncryptR
/// Decryption request options
pub type DecryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::DecryptRequestOptions;

/// Lock response
pub type TryLockResponse = crate::dapr::dapr::proto::runtime::v1::TryLockResponse;

/// Lock request
pub type TryLockRequest = crate::dapr::dapr::proto::runtime::v1::TryLockRequest;

/// Unlock request
pub type UnlockRequest = crate::dapr::dapr::proto::runtime::v1::UnlockRequest;

/// Unlock response
pub type UnlockResponse = crate::dapr::dapr::proto::runtime::v1::UnlockResponse;

type StreamPayload = crate::dapr::dapr::proto::common::v1::StreamPayload;
impl<K> From<(K, Vec<u8>)> for common_v1::StateItem
where
Expand Down

0 comments on commit a1843fd

Please sign in to comment.