diff --git a/examples/Client/StateManagement/Program.cs b/examples/Client/StateManagement/Program.cs index 24e37d004..e9ef36979 100644 --- a/examples/Client/StateManagement/Program.cs +++ b/examples/Client/StateManagement/Program.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,7 +24,8 @@ class Program new StateStoreExample(), new StateStoreTransactionsExample(), new StateStoreETagsExample(), - new BulkStateExample() + new BulkStateExample(), + new StateStoreBinaryExample() }; static async Task Main(string[] args) diff --git a/examples/Client/StateManagement/StateStoreBinaryExample.cs b/examples/Client/StateManagement/StateStoreBinaryExample.cs new file mode 100644 index 000000000..edf23704e --- /dev/null +++ b/examples/Client/StateManagement/StateStoreBinaryExample.cs @@ -0,0 +1,47 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Dapr.Client; +using System.Threading.Tasks; +using System.Threading; +using Google.Protobuf; + +namespace Samples.Client +{ + public class StateStoreBinaryExample : Example + { + + private static readonly string stateKeyName = "binarydata"; + private static readonly string storeName = "statestore"; + + public override string DisplayName => "Using the State Store with binary data"; + + public override async Task RunAsync(CancellationToken cancellationToken) + { + using var client = new DaprClientBuilder().Build(); + + var state = "Test Binary Data"; + // convert variable in to byte array + var stateBytes = Encoding.UTF8.GetBytes(state); + await client.SaveByteStateAsync(storeName, stateKeyName, stateBytes.AsMemory(), cancellationToken: cancellationToken); + Console.WriteLine("Saved State!"); + + var responseBytes = await client.GetByteStateAsync(storeName, stateKeyName, cancellationToken: cancellationToken); + var savedState = Encoding.UTF8.GetString(ByteString.CopyFrom(responseBytes.Span).ToByteArray()); + + if (savedState == null) + { + Console.WriteLine("State not found in store"); + } + else + { + Console.WriteLine($"Got State: {savedState}"); + } + + await client.DeleteStateAsync(storeName, stateKeyName, cancellationToken: cancellationToken); + Console.WriteLine("Deleted State!"); + } + + + } +} diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index 43c640a69..6be31a648 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -850,6 +850,80 @@ public abstract Task SaveStateAsync( IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default); + + /// + /// Saves the provided associated with the provided to the Dapr state + /// store + /// + /// The name of the state store. + /// The state key. + /// The binary data that will be stored in the state store. + /// Options for performing save state operation. + /// A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used. + /// A that can be used to cancel the operation. + /// A that will complete when the operation has completed. + public abstract Task SaveByteStateAsync( + string storeName, + string key, + ReadOnlyMemory binaryValue, + StateOptions stateOptions = default, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default); + + /// + ///Saves the provided associated with the provided using the + /// to the Dapr state. State store implementation will allow the update only if the attached ETag matches with the latest ETag in the state store. + /// + /// The name of the state store. + /// The state key. + /// The binary data that will be stored in the state store. + /// An ETag. + /// Options for performing save state operation. + /// A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used. + /// A that can be used to cancel the operation. + /// A that will complete when the operation has completed. + public abstract Task TrySaveByteStateAsync( + string storeName, + string key, + ReadOnlyMemory binaryValue, + string etag, + StateOptions stateOptions = default, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default); + + + /// + /// Gets the current binary value associated with the from the Dapr state store. + /// + /// The name of state store to read from. + /// The state key. + /// The consistency mode . + /// A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used. + /// A that can be used to cancel the operation. + /// A that will return the value when the operation has completed. + public abstract Task> GetByteStateAsync( + string storeName, + string key, + ConsistencyMode? consistencyMode = default, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default); + + /// + /// Gets the current binary value associated with the from the Dapr state store and an ETag. + /// + /// The name of the state store. + /// The state key. + /// The consistency mode . + /// A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used. + /// A that can be used to cancel the operation. + /// A that will return the value when the operation has completed. This wraps the read value and an ETag. + public abstract Task<(ReadOnlyMemory, string etag)> GetByteStateAndETagAsync( + string storeName, + string key, + ConsistencyMode? consistencyMode = default, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default); + /// /// Tries to save the state associated with the provided using the /// to the Dapr state. State store implementation will allow the update only if the attached ETag matches with the latest ETag in the state store. diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index bd0bd1d01..40df4767c 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -74,6 +74,7 @@ internal DaprClientGrpc( } #region Publish Apis + /// public override Task PublishEventAsync( string pubsubName, @@ -86,7 +87,8 @@ public override Task PublishEventAsync( ArgumentVerifier.ThrowIfNull(data, nameof(data)); var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions); - return MakePublishRequest(pubsubName, topicName, content, null, data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken); + return MakePublishRequest(pubsubName, topicName, content, null, + data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken); } public override Task PublishEventAsync( @@ -102,7 +104,8 @@ public override Task PublishEventAsync( ArgumentVerifier.ThrowIfNull(metadata, nameof(metadata)); var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions); - return MakePublishRequest(pubsubName, topicName, content, metadata, data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken); + return MakePublishRequest(pubsubName, topicName, content, metadata, + data is CloudEvent ? Constants.ContentTypeCloudEvent : null, cancellationToken); } /// @@ -138,7 +141,8 @@ public override Task PublishByteEventAsync( { ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName)); - return MakePublishRequest(pubsubName, topicName, ByteString.CopyFrom(data.Span), metadata, dataContentType, cancellationToken); + return MakePublishRequest(pubsubName, topicName, ByteString.CopyFrom(data.Span), metadata, dataContentType, + cancellationToken); } private async Task MakePublishRequest( @@ -149,11 +153,7 @@ private async Task MakePublishRequest( string dataContentType, CancellationToken cancellationToken) { - var envelope = new Autogenerated.PublishEventRequest() - { - PubsubName = pubsubName, - Topic = topicName, - }; + var envelope = new Autogenerated.PublishEventRequest() { PubsubName = pubsubName, Topic = topicName, }; if (content != null) { @@ -177,7 +177,8 @@ private async Task MakePublishRequest( } catch (RpcException ex) { - throw new DaprException("Publish operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "Publish operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } @@ -194,7 +195,7 @@ public override Task> BulkPublishEventAsync( ArgumentVerifier.ThrowIfNull(events, nameof(events)); return MakeBulkPublishRequest(pubsubName, topicName, events, metadata, cancellationToken); } - + private async Task> MakeBulkPublishRequest( string pubsubName, string topicName, @@ -202,12 +203,8 @@ private async Task> MakeBulkPublishRequest( Dictionary metadata, CancellationToken cancellationToken) { - var envelope = new Autogenerated.BulkPublishRequest() - { - PubsubName = pubsubName, - Topic = topicName, - }; - + var envelope = new Autogenerated.BulkPublishRequest() { PubsubName = pubsubName, Topic = topicName, }; + Dictionary> entryMap = new Dictionary>(); for (int counter = 0; counter < events.Count; counter++) @@ -216,14 +213,17 @@ private async Task> MakeBulkPublishRequest( { EntryId = counter.ToString(), Event = TypeConverters.ToJsonByteString(events[counter], this.jsonSerializerOptions), - ContentType = events[counter] is CloudEvent ? Constants.ContentTypeCloudEvent : Constants.ContentTypeApplicationJson, - Metadata = {}, + ContentType = + events[counter] is CloudEvent + ? Constants.ContentTypeCloudEvent + : Constants.ContentTypeApplicationJson, + Metadata = { }, }; envelope.Entries.Add(entry); entryMap.Add(counter.ToString(), new BulkPublishEntry( entry.EntryId, events[counter], entry.ContentType, entry.Metadata)); } - + if (metadata != null) { foreach (var kvp in metadata) @@ -231,22 +231,23 @@ private async Task> MakeBulkPublishRequest( envelope.Metadata.Add(kvp.Key, kvp.Value); } } - + var options = CreateCallOptions(headers: null, cancellationToken); try { var response = await client.BulkPublishEventAlpha1Async(envelope, options); - List> failedEntries = new List>(); - + List> failedEntries = + new List>(); + foreach (var entry in response.FailedEntries) { BulkPublishResponseFailedEntry domainEntry = new BulkPublishResponseFailedEntry( entryMap[entry.EntryId], entry.Error); failedEntries.Add(domainEntry); } - + var bulkPublishResponse = new BulkPublishResponse(failedEntries); return bulkPublishResponse; @@ -257,6 +258,7 @@ private async Task> MakeBulkPublishRequest( "failure. See InnerException for details.", ex); } } + #endregion #region InvokeBinding Apis @@ -296,14 +298,18 @@ public override async Task InvokeBindingAsync( } catch (JsonException ex) { - throw new DaprException("Binding operation failed: the response payload could not be deserialized. See InnerException for details.", ex); + throw new DaprException( + "Binding operation failed: the response payload could not be deserialized. See InnerException for details.", + ex); } } - public override async Task InvokeBindingAsync(BindingRequest request, CancellationToken cancellationToken = default) + public override async Task InvokeBindingAsync(BindingRequest request, + CancellationToken cancellationToken = default) { var bytes = ByteString.CopyFrom(request.Data.Span); - var response = await this.MakeInvokeBindingRequestAsync(request.BindingName, request.Operation, bytes, request.Metadata, cancellationToken); + var response = await this.MakeInvokeBindingRequestAsync(request.BindingName, request.Operation, bytes, + request.Metadata, cancellationToken); return new BindingResponse(request, response.Data.Memory, response.Metadata); } @@ -314,11 +320,7 @@ public override async Task InvokeBindingAsync(BindingRequest re IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { - var envelope = new Autogenerated.InvokeBindingRequest() - { - Name = name, - Operation = operation - }; + var envelope = new Autogenerated.InvokeBindingRequest() { Name = name, Operation = operation }; if (data != null) { @@ -340,9 +342,11 @@ public override async Task InvokeBindingAsync(BindingRequest re } catch (RpcException ex) { - throw new DaprException("Binding operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "Binding operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } + #endregion #region InvokeMethod Apis @@ -385,7 +389,7 @@ public override HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMeth var path = $"/v1.0/invoke/{appId}/method/{methodName.TrimStart('/')}"; var requestUri = new Uri(this.httpEndpoint, path).AddQueryParameters(queryStringParameters); var request = new HttpRequestMessage(httpMethod, requestUri); - + request.Options.Set(new HttpRequestOptionsKey(AppIdKey), appId); request.Options.Set(new HttpRequestOptionsKey(MethodNameKey), methodName); @@ -410,7 +414,8 @@ public override HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMeth /// The data that will be JSON serialized and provided as the request body. /// A collection of key/value pairs to populate the query string from. /// An for use with SendInvokeMethodRequestAsync. - public override HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMethod, string appId, string methodName, + public override HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMethod, string appId, + string methodName, IReadOnlyCollection> queryStringParameters, TRequest data) { ArgumentVerifier.ThrowIfNull(httpMethod, nameof(httpMethod)); @@ -422,7 +427,8 @@ public override HttpRequestMessage CreateInvokeMethodRequest(HttpMetho return request; } - public override async Task InvokeMethodWithResponseAsync(HttpRequestMessage request, CancellationToken cancellationToken = default) + public override async Task InvokeMethodWithResponseAsync(HttpRequestMessage request, + CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNull(request, nameof(request)); @@ -477,7 +483,8 @@ public override HttpClient CreateInvokableHttpClient(string? appId = null) => DaprClient.CreateInvokeHttpClient(appId, this.httpEndpoint?.AbsoluteUri, this.apiTokenHeader?.Value); #nullable disable - public async override Task InvokeMethodAsync(HttpRequestMessage request, CancellationToken cancellationToken = default) + public async override Task InvokeMethodAsync(HttpRequestMessage request, + CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNull(request, nameof(request)); @@ -501,7 +508,8 @@ public async override Task InvokeMethodAsync(HttpRequestMessage request, Cancell } } - public async override Task InvokeMethodAsync(HttpRequestMessage request, CancellationToken cancellationToken = default) + public async override Task InvokeMethodAsync(HttpRequestMessage request, + CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNull(request, nameof(request)); @@ -554,18 +562,15 @@ public async override Task InvokeMethodAsync(HttpRequestMe } } - public override async Task InvokeMethodGrpcAsync(string appId, string methodName, CancellationToken cancellationToken = default) + public override async Task InvokeMethodGrpcAsync(string appId, string methodName, + CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId)); ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName)); var envelope = new Autogenerated.InvokeServiceRequest() { - Id = appId, - Message = new Autogenerated.InvokeRequest() - { - Method = methodName, - }, + Id = appId, Message = new Autogenerated.InvokeRequest() { Method = methodName, }, }; var options = CreateCallOptions(headers: null, cancellationToken); @@ -580,7 +585,8 @@ public override async Task InvokeMethodGrpcAsync(string appId, string methodName } } - public override async Task InvokeMethodGrpcAsync(string appId, string methodName, TRequest data, CancellationToken cancellationToken = default) + public override async Task InvokeMethodGrpcAsync(string appId, string methodName, TRequest data, + CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId)); ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName)); @@ -590,9 +596,7 @@ public override async Task InvokeMethodGrpcAsync(string appId, string Id = appId, Message = new Autogenerated.InvokeRequest() { - Method = methodName, - ContentType = Constants.ContentTypeApplicationGrpc, - Data = Any.Pack(data), + Method = methodName, ContentType = Constants.ContentTypeApplicationGrpc, Data = Any.Pack(data), }, }; @@ -602,24 +606,22 @@ public override async Task InvokeMethodGrpcAsync(string appId, string { _ = await this.Client.InvokeServiceAsync(envelope, options); } + catch (RpcException ex) { throw new InvocationException(appId, methodName, ex); } } - public override async Task InvokeMethodGrpcAsync(string appId, string methodName, CancellationToken cancellationToken = default) + public override async Task InvokeMethodGrpcAsync(string appId, string methodName, + CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId)); ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName)); var envelope = new Autogenerated.InvokeServiceRequest() { - Id = appId, - Message = new Autogenerated.InvokeRequest() - { - Method = methodName, - }, + Id = appId, Message = new Autogenerated.InvokeRequest() { Method = methodName, }, }; var options = CreateCallOptions(headers: null, cancellationToken); @@ -635,7 +637,8 @@ public override async Task InvokeMethodGrpcAsync(string ap } } - public override async Task InvokeMethodGrpcAsync(string appId, string methodName, TRequest data, CancellationToken cancellationToken = default) + public override async Task InvokeMethodGrpcAsync(string appId, string methodName, + TRequest data, CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId)); ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName)); @@ -645,9 +648,7 @@ public override async Task InvokeMethodGrpcAsync Id = appId, Message = new Autogenerated.InvokeRequest() { - Method = methodName, - ContentType = Constants.ContentTypeApplicationGrpc, - Data = Any.Pack(data), + Method = methodName, ContentType = Constants.ContentTypeApplicationGrpc, Data = Any.Pack(data), }, }; @@ -669,7 +670,9 @@ public override async Task InvokeMethodGrpcAsync #region State Apis /// - public override async Task> GetBulkStateAsync(string storeName, IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) + public override async Task> GetBulkStateAsync(string storeName, + IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) { var rawBulkState = await GetBulkStateRawAsync(storeName, keys, parallelism, metadata, cancellationToken); @@ -681,12 +684,12 @@ public override async Task> GetBulkStateAsync(strin return bulkResponse; } - + /// public override async Task>> GetBulkStateAsync( string storeName, - IReadOnlyList keys, - int? parallelism, + IReadOnlyList keys, + int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { @@ -708,8 +711,8 @@ public override async Task>> GetBulkStateAsy /// private async Task> GetBulkStateRawAsync( string storeName, - IReadOnlyList keys, - int? parallelism, + IReadOnlyList keys, + int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { @@ -719,8 +722,7 @@ public override async Task>> GetBulkStateAsy var envelope = new Autogenerated.GetBulkStateRequest() { - StoreName = storeName, - Parallelism = parallelism ?? default + StoreName = storeName, Parallelism = parallelism ?? default }; if (metadata != null) @@ -755,7 +757,7 @@ public override async Task>> GetBulkStateAsy return bulkResponse; } - + /// public override async Task GetStateAsync( string storeName, @@ -767,11 +769,7 @@ public override async Task GetStateAsync( ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key)); - var envelope = new Autogenerated.GetStateRequest() - { - StoreName = storeName, - Key = key, - }; + var envelope = new Autogenerated.GetStateRequest() { StoreName = storeName, Key = key, }; if (metadata != null) { @@ -795,7 +793,8 @@ public override async Task GetStateAsync( } catch (RpcException ex) { - throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } try @@ -804,12 +803,15 @@ public override async Task GetStateAsync( } catch (JsonException ex) { - throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the state payload could not be deserialized. See InnerException for details.", + ex); } } /// - public override async Task SaveBulkStateAsync(string storeName, IReadOnlyList> items, CancellationToken cancellationToken = default) + public override async Task SaveBulkStateAsync(string storeName, IReadOnlyList> items, + CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); @@ -818,17 +820,11 @@ public override async Task SaveBulkStateAsync(string storeName, IReadOnl throw new ArgumentException("items do not contain any elements"); } - var envelope = new Autogenerated.SaveStateRequest() - { - StoreName = storeName, - }; + var envelope = new Autogenerated.SaveStateRequest() { StoreName = storeName, }; foreach (var item in items) { - var stateItem = new Autogenerated.StateItem() - { - Key = item.Key, - }; + var stateItem = new Autogenerated.StateItem() { Key = item.Key, }; if (item.ETag != null) { @@ -862,25 +858,209 @@ public override async Task SaveBulkStateAsync(string storeName, IReadOnl } catch (RpcException ex) { - throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } + } + /// + public override async Task SaveByteStateAsync( + string storeName, + string key, + ReadOnlyMemory value, + StateOptions stateOptions = default, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); + ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key)); + _ = await this.MakeSaveByteStateCallAsync( + storeName, + key, + ByteString.CopyFrom(value.Span), + etag: null, + stateOptions, + metadata, + cancellationToken); } - /// - public override async Task DeleteBulkStateAsync(string storeName, IReadOnlyList items, CancellationToken cancellationToken = default) + /// + public override async Task TrySaveByteStateAsync( + string storeName, + string key, + ReadOnlyMemory value, + string etag, + StateOptions stateOptions = default, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) { - var envelope = new Autogenerated.DeleteBulkStateRequest() + ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); + ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key)); + ArgumentVerifier.ThrowIfNull(etag, nameof(etag)); + return await this.MakeSaveByteStateCallAsync(storeName, key, ByteString.CopyFrom(value.Span), etag, + stateOptions, metadata, cancellationToken); + } + + // Method MakeSaveStateCallAsync to save binary value + private async Task MakeSaveByteStateCallAsync( + string storeName, + string key, + ByteString value, + string etag = default, + StateOptions stateOptions = default, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) + { + var envelope = new Autogenerated.SaveStateRequest() { StoreName = storeName, }; + + + var stateItem = new Autogenerated.StateItem() { Key = key, }; + + if (metadata != null) { - StoreName = storeName, - }; + foreach (var kvp in metadata) + { + stateItem.Metadata.Add(kvp.Key, kvp.Value); + } + } - foreach (var item in items) + if (etag != null) + { + stateItem.Etag = new Autogenerated.Etag() { Value = etag }; + } + + if (stateOptions != null) + { + stateItem.Options = ToAutoGeneratedStateOptions(stateOptions); + } + + if (value != null) + { + + stateItem.Value = value; + } + + envelope.States.Add(stateItem); + + var options = CreateCallOptions(headers: null, cancellationToken); + try + { + await client.SaveStateAsync(envelope, options); + return true; + } + catch (RpcException rpc) when (etag != null && rpc.StatusCode == StatusCode.Aborted) + { + // This kind of failure indicates an ETag mismatch. Aborted doesn't seem like + // the right status code at first, but check the docs, it fits this use-case. + // + // When an ETag is used we surface this though the Try... pattern + return false; + } + catch (RpcException ex) + { + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + /// + public override async Task> GetByteStateAsync( + string storeName, + string key, + ConsistencyMode? consistencyMode = default, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); + ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key)); + var envelope = new Autogenerated.GetStateRequest() { StoreName = storeName, Key = key, }; + if (consistencyMode != null) + { + envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value); + } + + if (metadata != null) { - var stateItem = new Autogenerated.StateItem() + foreach (var kvp in metadata) { - Key = item.Key, - }; + envelope.Metadata.Add(kvp.Key, kvp.Value); + } + } + + var options = CreateCallOptions(headers: null, cancellationToken); + try + { + var response = await client.GetStateAsync(envelope, options); + return response.Data.ToByteArray().AsMemory(); + } + catch (RpcException ex) + { + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + /// + public override async Task<(ReadOnlyMemory, string etag)> GetByteStateAndETagAsync( + string storeName, + string key, + ConsistencyMode? consistencyMode = default, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); + ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key)); + + var envelope = new Autogenerated.GetStateRequest() { StoreName = storeName, Key = key }; + + if (metadata != null) + { + foreach (var kvp in metadata) + { + envelope.Metadata.Add(kvp.Key, kvp.Value); + } + } + + if (consistencyMode != null) + { + envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value); + } + + var options = CreateCallOptions(headers: null, cancellationToken); + Autogenerated.GetStateResponse response; + + try + { + response = await client.GetStateAsync(envelope, options); + } + catch (RpcException ex) + { + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", + ex); + } + + try + { + return (response.Data.ToByteArray().AsMemory(), response.Etag); + } + catch (JsonException ex) + { + throw new DaprException( + "State operation failed: the state payload could not be deserialized. See InnerException for details.", + ex); + } + } + + /// + public override async Task DeleteBulkStateAsync(string storeName, IReadOnlyList items, + CancellationToken cancellationToken = default) + { + var envelope = new Autogenerated.DeleteBulkStateRequest() { StoreName = storeName, }; + + foreach (var item in items) + { + var stateItem = new Autogenerated.StateItem() { Key = item.Key, }; if (item.ETag != null) { @@ -909,7 +1089,8 @@ public override async Task DeleteBulkStateAsync(string storeName, IReadOnlyList< } catch (RpcException ex) { - throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } @@ -925,11 +1106,7 @@ public override async Task DeleteBulkStateAsync(string storeName, IReadOnlyList< ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key)); - var envelope = new Autogenerated.GetStateRequest() - { - StoreName = storeName, - Key = key - }; + var envelope = new Autogenerated.GetStateRequest() { StoreName = storeName, Key = key }; if (metadata != null) { @@ -953,16 +1130,20 @@ public override async Task DeleteBulkStateAsync(string storeName, IReadOnlyList< } catch (RpcException ex) { - throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } try { - return (TypeConverters.FromJsonByteString(response.Data, this.JsonSerializerOptions), response.Etag); + return (TypeConverters.FromJsonByteString(response.Data, this.JsonSerializerOptions), + response.Etag); } catch (JsonException ex) { - throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the state payload could not be deserialized. See InnerException for details.", + ex); } } @@ -1004,7 +1185,8 @@ public override async Task TrySaveStateAsync( // rely on bubbling up the error if any from Dapr runtime ArgumentVerifier.ThrowIfNull(etag, nameof(etag)); - return await this.MakeSaveStateCallAsync(storeName, key, value, etag, stateOptions, metadata, cancellationToken); + return await this.MakeSaveStateCallAsync(storeName, key, value, etag, stateOptions, metadata, + cancellationToken); } private async Task MakeSaveStateCallAsync( @@ -1016,16 +1198,10 @@ private async Task MakeSaveStateCallAsync( IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { - var envelope = new Autogenerated.SaveStateRequest() - { - StoreName = storeName, - }; + var envelope = new Autogenerated.SaveStateRequest() { StoreName = storeName, }; - var stateItem = new Autogenerated.StateItem() - { - Key = key, - }; + var stateItem = new Autogenerated.StateItem() { Key = key, }; if (metadata != null) { @@ -1068,7 +1244,8 @@ private async Task MakeSaveStateCallAsync( } catch (RpcException ex) { - throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } @@ -1100,17 +1277,13 @@ private async Task MakeExecuteStateTransactionCallAsync( IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { - var envelope = new Autogenerated.ExecuteStateTransactionRequest() - { - StoreName = storeName, - }; + var envelope = new Autogenerated.ExecuteStateTransactionRequest() { StoreName = storeName, }; foreach (var state in states) { var stateOperation = new Autogenerated.TransactionalStateOperation { - OperationType = state.OperationType.ToString().ToLower(), - Request = ToAutogeneratedStateItem(state) + OperationType = state.OperationType.ToString().ToLower(), Request = ToAutogeneratedStateItem(state) }; envelope.Operations.Add(stateOperation); @@ -1133,16 +1306,14 @@ private async Task MakeExecuteStateTransactionCallAsync( } catch (RpcException ex) { - throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } private Autogenerated.StateItem ToAutogeneratedStateItem(StateTransactionRequest state) { - var stateOperation = new Autogenerated.StateItem - { - Key = state.Key - }; + var stateOperation = new Autogenerated.StateItem { Key = state.Key }; if (state.Value != null) { @@ -1170,7 +1341,6 @@ private Autogenerated.StateItem ToAutogeneratedStateItem(StateTransactionRequest return stateOperation; } - /// public override async Task DeleteStateAsync( string storeName, @@ -1217,11 +1387,7 @@ private async Task MakeDeleteStateCallAsync( IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { - var deleteStateEnvelope = new Autogenerated.DeleteStateRequest() - { - StoreName = storeName, - Key = key, - }; + var deleteStateEnvelope = new Autogenerated.DeleteStateRequest() { StoreName = storeName, Key = key, }; if (metadata != null) { @@ -1258,7 +1424,8 @@ private async Task MakeDeleteStateCallAsync( } catch (RpcException ex) { - throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } @@ -1269,11 +1436,7 @@ public async override Task> QueryStateAsync( IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { - var queryRequest = new Autogenerated.QueryStateRequest() - { - StoreName = storeName, - Query = jsonQuery - }; + var queryRequest = new Autogenerated.QueryStateRequest() { StoreName = storeName, Query = jsonQuery }; if (metadata != null) { @@ -1298,30 +1461,40 @@ public async override Task> QueryStateAsync( failedKeys.Add(item.Key); continue; } - items.Add(new StateQueryItem(item.Key, TypeConverters.FromJsonByteString(item.Data, this.JsonSerializerOptions), item.Etag, item.Error)); + + items.Add(new StateQueryItem(item.Key, + TypeConverters.FromJsonByteString(item.Data, this.JsonSerializerOptions), item.Etag, + item.Error)); } var results = new StateQueryResponse(items, queryResponse.Token, queryResponse.Metadata); if (failedKeys.Count > 0) { // We encountered some bad keys so we throw instead of returning to alert the user. - throw new StateQueryException($"Encountered an error while processing state query results.", results, failedKeys); + throw new StateQueryException($"Encountered an error while processing state query results.", + results, failedKeys); } return results; } catch (RpcException ex) { - throw new DaprException("Query state operation failed: the Dapr endpointed indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "Query state operation failed: the Dapr endpointed indicated a failure. See InnerException for details.", + ex); } catch (JsonException ex) { - throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex); + throw new DaprException( + "State operation failed: the state payload could not be deserialized. See InnerException for details.", + ex); } } + #endregion #region Secret Apis + /// public async override Task> GetSecretAsync( string storeName, @@ -1332,11 +1505,7 @@ public async override Task> GetSecretAsync( ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key)); - var envelope = new Autogenerated.GetSecretRequest() - { - StoreName = storeName, - Key = key - }; + var envelope = new Autogenerated.GetSecretRequest() { StoreName = storeName, Key = key }; if (metadata != null) { @@ -1355,7 +1524,8 @@ public async override Task> GetSecretAsync( } catch (RpcException ex) { - throw new DaprException("Secret operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "Secret operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } return response.Data.ToDictionary(kv => kv.Key, kv => kv.Value); @@ -1367,10 +1537,7 @@ public async override Task>> GetBu IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default) { - var envelope = new Autogenerated.GetBulkSecretRequest() - { - StoreName = storeName - }; + var envelope = new Autogenerated.GetBulkSecretRequest() { StoreName = storeName }; if (metadata != null) { @@ -1389,14 +1556,18 @@ public async override Task>> GetBu } catch (RpcException ex) { - throw new DaprException("Bulk secret operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "Bulk secret operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", + ex); } return response.Data.ToDictionary(r => r.Key, r => r.Value.Secrets.ToDictionary(s => s.Key, s => s.Value)); } + #endregion #region Configuration API + /// public async override Task GetConfiguration( string storeName, @@ -1406,10 +1577,7 @@ public async override Task GetConfiguration( { ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); - var request = new Autogenerated.GetConfigurationRequest() - { - StoreName = storeName - }; + var request = new Autogenerated.GetConfigurationRequest() { StoreName = storeName }; if (keys != null && keys.Count > 0) { @@ -1432,10 +1600,13 @@ public async override Task GetConfiguration( } catch (RpcException ex) { - throw new DaprException("GetConfiguration operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "GetConfiguration operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", + ex); } - var responseItems = response.Items.ToDictionary(item => item.Key, item => new ConfigurationItem(item.Value.Value, item.Value.Version, item.Value.Metadata)); + var responseItems = response.Items.ToDictionary(item => item.Key, + item => new ConfigurationItem(item.Value.Value, item.Value.Version, item.Value.Metadata)); return new GetConfigurationResponse(responseItems); } @@ -1468,7 +1639,8 @@ public override Task SubscribeConfiguration( } var options = CreateCallOptions(headers: null, cancellationToken: cancellationToken); - return Task.FromResult(new SubscribeConfigurationResponse(new DaprSubscribeConfigurationSource(client.SubscribeConfiguration(request, options)))); + return Task.FromResult(new SubscribeConfigurationResponse( + new DaprSubscribeConfigurationSource(client.SubscribeConfiguration(request, options)))); } public override async Task UnsubscribeConfiguration( @@ -1479,11 +1651,8 @@ public override async Task UnsubscribeConfigur ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); ArgumentVerifier.ThrowIfNullOrEmpty(id, nameof(id)); - Autogenerated.UnsubscribeConfigurationRequest request = new Autogenerated.UnsubscribeConfigurationRequest() - { - StoreName = storeName, - Id = id - }; + Autogenerated.UnsubscribeConfigurationRequest request = + new Autogenerated.UnsubscribeConfigurationRequest() { StoreName = storeName, Id = id }; var options = CreateCallOptions(headers: null, cancellationToken); var resp = await client.UnsubscribeConfigurationAsync(request, options); @@ -1495,32 +1664,37 @@ public override async Task UnsubscribeConfigur #region Cryptography /// - [Obsolete("The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] - public override async Task> EncryptAsync(string vaultResourceName, + [Obsolete( + "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + public override async Task> EncryptAsync(string vaultResourceName, ReadOnlyMemory plaintextBytes, string keyName, EncryptionOptions encryptionOptions, CancellationToken cancellationToken = default) { if (MemoryMarshal.TryGetArray(plaintextBytes, out var plaintextSegment) && plaintextSegment.Array != null) { - var encryptionResult = await EncryptAsync(vaultResourceName, new MemoryStream(plaintextSegment.Array), keyName, encryptionOptions, + var encryptionResult = await EncryptAsync(vaultResourceName, new MemoryStream(plaintextSegment.Array), + keyName, encryptionOptions, cancellationToken); - + var bufferedResult = new ArrayBufferWriter(); await foreach (var item in encryptionResult.WithCancellation(cancellationToken)) { bufferedResult.Write(item.Span); } - - return bufferedResult.WrittenMemory; + + return bufferedResult.WrittenMemory; } - throw new ArgumentException("The input instance doesn't have a valid underlying data store.", nameof(plaintextBytes)); + throw new ArgumentException("The input instance doesn't have a valid underlying data store.", + nameof(plaintextBytes)); } /// - [Obsolete("The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] - public override async Task>> EncryptAsync(string vaultResourceName, Stream plaintextStream, + [Obsolete( + "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + public override async Task>> EncryptAsync(string vaultResourceName, + Stream plaintextStream, string keyName, EncryptionOptions encryptionOptions, CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName)); @@ -1528,7 +1702,9 @@ public override async Task>> EncryptAsync( ArgumentVerifier.ThrowIfNull(plaintextStream, nameof(plaintextStream)); ArgumentVerifier.ThrowIfNull(encryptionOptions, nameof(encryptionOptions)); - var shouldOmitDecryptionKeyName = string.IsNullOrWhiteSpace(encryptionOptions.DecryptionKeyName); //Whitespace isn't likely a valid key name either + var shouldOmitDecryptionKeyName = + string.IsNullOrWhiteSpace(encryptionOptions + .DecryptionKeyName); //Whitespace isn't likely a valid key name either var encryptRequestOptions = new Autogenerated.EncryptRequestOptions { @@ -1541,7 +1717,8 @@ public override async Task>> EncryptAsync( if (!shouldOmitDecryptionKeyName) { - ArgumentVerifier.ThrowIfNullOrEmpty(encryptionOptions.DecryptionKeyName, nameof(encryptionOptions.DecryptionKeyName)); + ArgumentVerifier.ThrowIfNullOrEmpty(encryptionOptions.DecryptionKeyName, + nameof(encryptionOptions.DecryptionKeyName)); encryptRequestOptions.DecryptionKeyName = encryptRequestOptions.DecryptionKeyName; } @@ -1569,7 +1746,7 @@ private async Task SendPlaintextStreamAsync(Stream plaintextStream, { //Start with passing the metadata about the encryption request itself in the first message await duplexStream.RequestStream.WriteAsync( - new Autogenerated.EncryptRequest {Options = encryptRequestOptions}, cancellationToken); + new Autogenerated.EncryptRequest { Options = encryptRequestOptions }, cancellationToken); //Send the plaintext bytes in blocks in subsequent messages await using (var bufferedStream = new BufferedStream(plaintextStream, streamingBlockSizeInBytes)) @@ -1579,7 +1756,8 @@ await duplexStream.RequestStream.WriteAsync( ulong sequenceNumber = 0; while ((bytesRead = - await bufferedStream.ReadAsync(buffer.AsMemory(0, streamingBlockSizeInBytes), cancellationToken)) != + await bufferedStream.ReadAsync(buffer.AsMemory(0, streamingBlockSizeInBytes), + cancellationToken)) != 0) { await duplexStream.RequestStream.WriteAsync( @@ -1603,7 +1781,9 @@ await duplexStream.RequestStream.WriteAsync( /// /// Retrieves the encrypted bytes from the encryption operation on the sidecar and returns as an enumerable stream. /// - private async IAsyncEnumerable> RetrieveEncryptedStreamAsync(AsyncDuplexStreamingCall duplexStream, [EnumeratorCancellation] CancellationToken cancellationToken) + private async IAsyncEnumerable> RetrieveEncryptedStreamAsync( + AsyncDuplexStreamingCall duplexStream, + [EnumeratorCancellation] CancellationToken cancellationToken) { await foreach (var encryptResponse in duplexStream.ResponseStream.ReadAllAsync(cancellationToken) .ConfigureAwait(false)) @@ -1611,10 +1791,12 @@ private async IAsyncEnumerable> RetrieveEncryptedStreamAsyn yield return encryptResponse.Payload.Data.Memory; } } - + /// - [Obsolete("The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] - public override async Task>> DecryptAsync(string vaultResourceName, Stream ciphertextStream, string keyName, + [Obsolete( + "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + public override async Task>> DecryptAsync(string vaultResourceName, + Stream ciphertextStream, string keyName, DecryptionOptions decryptionOptions, CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName)); @@ -1624,8 +1806,7 @@ public override async Task>> DecryptAsync( var decryptRequestOptions = new Autogenerated.DecryptRequestOptions { - ComponentName = vaultResourceName, - KeyName = keyName + ComponentName = vaultResourceName, KeyName = keyName }; var options = CreateCallOptions(headers: null, cancellationToken); @@ -1644,12 +1825,13 @@ public override async Task>> DecryptAsync( } /// - [Obsolete("The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + [Obsolete( + "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] public override Task>> DecryptAsync(string vaultResourceName, Stream ciphertextStream, string keyName, CancellationToken cancellationToken = default) => DecryptAsync(vaultResourceName, ciphertextStream, keyName, new DecryptionOptions(), cancellationToken); - + /// /// Sends the ciphertext bytes in chunks to the sidecar to be decrypted. /// @@ -1662,30 +1844,32 @@ private async Task SendCiphertextStreamAsync(Stream ciphertextStream, //Start with passing the metadata about the decryption request itself in the first message await duplexStream.RequestStream.WriteAsync( new Autogenerated.DecryptRequest { Options = decryptRequestOptions }, cancellationToken); - + //Send the ciphertext bytes in blocks in subsequent messages await using (var bufferedStream = new BufferedStream(ciphertextStream, streamingBlockSizeInBytes)) { var buffer = new byte[streamingBlockSizeInBytes]; int bytesRead; ulong sequenceNumber = 0; - - while ((bytesRead = await bufferedStream.ReadAsync(buffer.AsMemory(0, streamingBlockSizeInBytes), cancellationToken)) != 0) + + while ((bytesRead = + await bufferedStream.ReadAsync(buffer.AsMemory(0, streamingBlockSizeInBytes), + cancellationToken)) != 0) { - await duplexStream.RequestStream.WriteAsync(new Autogenerated.DecryptRequest - { - Payload = new Autogenerated.StreamPayload + await duplexStream.RequestStream.WriteAsync( + new Autogenerated.DecryptRequest { - Data = ByteString.CopyFrom(buffer, 0, bytesRead), - Seq = sequenceNumber - } - }, cancellationToken); - + Payload = new Autogenerated.StreamPayload + { + Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber + } + }, cancellationToken); + //Increment the sequence number sequenceNumber++; } } - + //Send the completion message await duplexStream.RequestStream.CompleteAsync(); } @@ -1703,9 +1887,10 @@ private async IAsyncEnumerable> RetrieveDecryptedStreamAsyn yield return decryptResponse.Payload.Data.Memory; } } - + /// - [Obsolete("The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + [Obsolete( + "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] public override async Task> DecryptAsync(string vaultResourceName, ReadOnlyMemory ciphertextBytes, string keyName, DecryptionOptions decryptionOptions, CancellationToken cancellationToken = default) @@ -1713,8 +1898,8 @@ public override async Task> DecryptAsync(string vaultResour if (MemoryMarshal.TryGetArray(ciphertextBytes, out var ciphertextSegment) && ciphertextSegment.Array != null) { var decryptionResult = await DecryptAsync(vaultResourceName, new MemoryStream(ciphertextSegment.Array), - keyName, decryptionOptions, cancellationToken); - + keyName, decryptionOptions, cancellationToken); + var bufferedResult = new ArrayBufferWriter(); await foreach (var item in decryptionResult.WithCancellation(cancellationToken)) { @@ -1724,16 +1909,18 @@ public override async Task> DecryptAsync(string vaultResour return bufferedResult.WrittenMemory; } - throw new ArgumentException("The input instance doesn't have a valid underlying data store", nameof(ciphertextBytes)); + throw new ArgumentException("The input instance doesn't have a valid underlying data store", + nameof(ciphertextBytes)); } /// - [Obsolete("The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + [Obsolete( + "The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] public override async Task> DecryptAsync(string vaultResourceName, ReadOnlyMemory ciphertextBytes, string keyName, CancellationToken cancellationToken = default) => await DecryptAsync(vaultResourceName, ciphertextBytes, keyName, new DecryptionOptions(), cancellationToken); - + #region Subtle Crypto Implementation ///// @@ -1984,6 +2171,7 @@ await DecryptAsync(vaultResourceName, ciphertextBytes, keyName, #endregion #region Distributed Lock API + /// [Obsolete] public async override Task Lock( @@ -2004,27 +2192,23 @@ public async override Task Lock( var request = new Autogenerated.TryLockRequest() { - StoreName = storeName, - ResourceId = resourceId, - LockOwner = lockOwner, - ExpiryInSeconds = expiryInSeconds + StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner, ExpiryInSeconds = expiryInSeconds }; try { var options = CreateCallOptions(headers: null, cancellationToken); + var response = await client.TryLockAlpha1Async(request, options); return new TryLockResponse() { - StoreName = storeName, - ResourceId = resourceId, - LockOwner = lockOwner, - Success = response.Success + StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner, Success = response.Success }; } catch (RpcException ex) { - throw new DaprException("Lock operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "Lock operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } } @@ -2042,9 +2226,7 @@ public async override Task Unlock( var request = new Autogenerated.UnlockRequest() { - StoreName = storeName, - ResourceId = resourceId, - LockOwner = lockOwner + StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner }; var options = CreateCallOptions(headers: null, cancellationToken); @@ -2055,7 +2237,8 @@ public async override Task Unlock( } catch (RpcException ex) { - throw new DaprException("Lock operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "Lock operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); } return new UnlockResponse(GetUnLockStatus(response.Status)); @@ -2078,7 +2261,8 @@ public override async Task CheckHealthAsync(CancellationToken cancellation try { - using var response = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); + using var response = + await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); return response.IsSuccessStatusCode; } catch (HttpRequestException) @@ -2100,7 +2284,8 @@ public override async Task CheckOutboundHealthAsync(CancellationToken canc try { - using var response = await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); + using var response = + await this.httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken); return response.IsSuccessStatusCode; } catch (HttpRequestException) @@ -2119,6 +2304,7 @@ public override async Task WaitForSidecarAsync(CancellationToken cancellationTok { break; } + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); } } @@ -2147,20 +2333,19 @@ public override async Task GetMetadataAsync(CancellationToken canc } catch (RpcException ex) { - throw new DaprException("Get metadata operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "Get metadata operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", + ex); } } /// - public override async Task SetMetadataAsync(string attributeName, string attributeValue, CancellationToken cancellationToken = default) + public override async Task SetMetadataAsync(string attributeName, string attributeValue, + CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(attributeName, nameof(attributeName)); - var envelope = new Autogenerated.SetMetadataRequest() - { - Key = attributeName, - Value = attributeValue - }; + var envelope = new Autogenerated.SetMetadataRequest() { Key = attributeName, Value = attributeValue }; var options = CreateCallOptions(headers: null, cancellationToken); @@ -2170,9 +2355,12 @@ public override async Task SetMetadataAsync(string attributeName, string attribu } catch (RpcException ex) { - throw new DaprException("Set metadata operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + throw new DaprException( + "Set metadata operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", + ex); } } + #endregion protected override void Dispose(bool disposing) @@ -2209,7 +2397,8 @@ private CallOptions CreateCallOptions(Metadata headers, CancellationToken cancel /// /// /// - private async Task MakeGrpcCallHandleError(Func> callFunc, CancellationToken cancellationToken = default) + private async Task MakeGrpcCallHandleError( + Func> callFunc, CancellationToken cancellationToken = default) { var callOptions = CreateCallOptions(headers: null, cancellationToken); return await callFunc.Invoke(callOptions); @@ -2232,7 +2421,8 @@ private Autogenerated.StateOptions ToAutoGeneratedStateOptions(StateOptions stat return stateRequestOptions; } - private static Autogenerated.StateOptions.Types.StateConsistency GetStateConsistencyForConsistencyMode(ConsistencyMode consistencyMode) + private static Autogenerated.StateOptions.Types.StateConsistency GetStateConsistencyForConsistencyMode( + ConsistencyMode consistencyMode) { return consistencyMode switch { @@ -2242,7 +2432,8 @@ private static Autogenerated.StateOptions.Types.StateConsistency GetStateConsist }; } - private static Autogenerated.StateOptions.Types.StateConcurrency GetStateConcurrencyForConcurrencyMode(ConcurrencyMode concurrencyMode) + private static Autogenerated.StateOptions.Types.StateConcurrency GetStateConcurrencyForConcurrencyMode( + ConcurrencyMode concurrencyMode) { return concurrencyMode switch { diff --git a/src/Dapr.Client/Extensions/EnumExtensions.cs b/src/Dapr.Client/Extensions/EnumExtensions.cs new file mode 100644 index 000000000..df9c9ad33 --- /dev/null +++ b/src/Dapr.Client/Extensions/EnumExtensions.cs @@ -0,0 +1,41 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +#nullable enable +using System; +using System.Reflection; +using System.Runtime.Serialization; + +namespace Dapr.Client +{ + internal static class EnumExtensions + { + /// + /// Reads the value of an enum out of the attached attribute. + /// + /// The enum. + /// The value of the enum to pull the value for. + /// + public static string GetValueFromEnumMember(this T value) where T : Enum + { + ArgumentNullException.ThrowIfNull(value, nameof(value)); + + var memberInfo = typeof(T).GetMember(value.ToString(), BindingFlags.Static | BindingFlags.Public | BindingFlags.DeclaredOnly); + if (memberInfo.Length <= 0) + return value.ToString(); + + var attributes = memberInfo[0].GetCustomAttributes(typeof(EnumMemberAttribute), false); + return (attributes.Length > 0 ? ((EnumMemberAttribute)attributes[0]).Value : value.ToString()) ?? value.ToString(); + } + } +} diff --git a/src/Dapr.Common/DaprGenericClientBuilder.cs b/src/Dapr.Common/DaprGenericClientBuilder.cs index 60a9827a2..7a7abf025 100644 --- a/src/Dapr.Common/DaprGenericClientBuilder.cs +++ b/src/Dapr.Common/DaprGenericClientBuilder.cs @@ -199,8 +199,8 @@ public DaprGenericClientBuilder UseTimeout(TimeSpan timeout) // Set correct switch to make secure gRPC service calls. This switch must be set before creating the GrpcChannel. AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); } - - var httpEndpoint = new Uri(this.HttpEndpoint); + + var httpEndpoint = new Uri(this.HttpEndpoint); if (httpEndpoint.Scheme != "http" && httpEndpoint.Scheme != "https") { throw new InvalidOperationException("The HTTP endpoint must use http or https."); diff --git a/src/Dapr.Jobs/DaprJobsGrpcClient.cs b/src/Dapr.Jobs/DaprJobsGrpcClient.cs index 1f035220e..b548290df 100644 --- a/src/Dapr.Jobs/DaprJobsGrpcClient.cs +++ b/src/Dapr.Jobs/DaprJobsGrpcClient.cs @@ -47,7 +47,7 @@ internal sealed class DaprJobsGrpcClient : DaprJobsClient /// Property exposed for testing purposes. /// internal Autogenerated.Dapr.DaprClient Client { get; } - + internal DaprJobsGrpcClient( Autogenerated.Dapr.DaprClient innerClient, HttpClient httpClient, diff --git a/src/Dapr.Jobs/Extensions/DaprJobsServiceCollectionExtensions.cs b/src/Dapr.Jobs/Extensions/DaprJobsServiceCollectionExtensions.cs index e3680fd83..03540aae1 100644 --- a/src/Dapr.Jobs/Extensions/DaprJobsServiceCollectionExtensions.cs +++ b/src/Dapr.Jobs/Extensions/DaprJobsServiceCollectionExtensions.cs @@ -30,6 +30,7 @@ public static class DaprJobsServiceCollectionExtensions /// The lifetime of the registered services. /// public static IServiceCollection AddDaprJobsClient(this IServiceCollection serviceCollection, Action? configure = null, ServiceLifetime lifetime = ServiceLifetime.Singleton) + { ArgumentNullException.ThrowIfNull(serviceCollection, nameof(serviceCollection)); @@ -62,7 +63,7 @@ public static IServiceCollection AddDaprJobsClient(this IServiceCollection servi serviceCollection.TryAddSingleton(registration); break; } - + return serviceCollection; } } diff --git a/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto b/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto index 0ab371e6d..d1d8658f3 100644 --- a/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto +++ b/src/Dapr.Protos/Protos/dapr/proto/runtime/v1/dapr.proto @@ -1225,6 +1225,7 @@ message Job { // Systemd timer style cron accepts 6 fields: // seconds | minutes | hours | day of month | month | day of week // 0-59 | 0-59 | 0-23 | 1-31 | 1-12/jan-dec | 0-6/sun-sat + // // "0 30 * * * *" - every hour on the half hour // "0 15 3 * * *" - every day at 03:15 @@ -1344,4 +1345,4 @@ message ConversationResponse { // An array of results. repeated ConversationResult outputs = 2; -} \ No newline at end of file +} diff --git a/test/Dapr.Client.Test/Extensions/EnumExtensionTest.cs b/test/Dapr.Client.Test/Extensions/EnumExtensionTest.cs new file mode 100644 index 000000000..83c4354f9 --- /dev/null +++ b/test/Dapr.Client.Test/Extensions/EnumExtensionTest.cs @@ -0,0 +1,38 @@ +using System.Runtime.Serialization; +using Xunit; + +namespace Dapr.Client.Test.Extensions +{ + public class EnumExtensionTest + { + [Fact] + public void GetValueFromEnumMember_RedResolvesAsExpected() + { + var value = TestEnum.Red.GetValueFromEnumMember(); + Assert.Equal("red", value); + } + + [Fact] + public void GetValueFromEnumMember_YellowResolvesAsExpected() + { + var value = TestEnum.Yellow.GetValueFromEnumMember(); + Assert.Equal("YELLOW", value); + } + + [Fact] + public void GetValueFromEnumMember_BlueResolvesAsExpected() + { + var value = TestEnum.Blue.GetValueFromEnumMember(); + Assert.Equal("Blue", value); + } + } + + public enum TestEnum + { + [EnumMember(Value = "red")] + Red, + [EnumMember(Value = "YELLOW")] + Yellow, + Blue + } +} diff --git a/test/Dapr.Client.Test/StateApiTest.cs b/test/Dapr.Client.Test/StateApiTest.cs index f6ecb5d80..12fd0e3de 100644 --- a/test/Dapr.Client.Test/StateApiTest.cs +++ b/test/Dapr.Client.Test/StateApiTest.cs @@ -11,24 +11,26 @@ // limitations under the License. // ------------------------------------------------------------------------ + +using System; +using System.Collections.Generic; +using System.Net; +using System.Text.Json; +using System.Threading.Tasks; +using Autogenerated = Dapr.Client.Autogen.Grpc.v1; +using FluentAssertions; +using Google.Protobuf; +using Grpc.Core; +using Moq; +using StateConsistency = Dapr.Client.Autogen.Grpc.v1.StateOptions.Types.StateConsistency; +using StateConcurrency = Dapr.Client.Autogen.Grpc.v1.StateOptions.Types.StateConcurrency; +using Xunit; +using System.Threading; +using System.Net.Http; +using System.Text; + namespace Dapr.Client.Test { - using System; - using System.Collections.Generic; - using System.Net; - using System.Text.Json; - using System.Threading.Tasks; - using Autogenerated = Dapr.Client.Autogen.Grpc.v1; - using FluentAssertions; - using Google.Protobuf; - using Grpc.Core; - using Moq; - using StateConsistency = Dapr.Client.Autogen.Grpc.v1.StateOptions.Types.StateConsistency; - using StateConcurrency = Dapr.Client.Autogen.Grpc.v1.StateOptions.Types.StateConcurrency; - using Xunit; - using System.Threading; - using System.Net.Http; - public class StateApiTest { [Fact] @@ -36,10 +38,7 @@ public async Task GetStateAsync_CanReadState() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync("testStore", "test")); request.Dismiss(); @@ -58,14 +57,11 @@ public async Task GetBulkStateAsync_CanReadState() { await using var client = TestClient.CreateForDaprClient(); - var key = "test"; - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetBulkStateAsync("testStore", new List() { key }, null); - }); + const string key = "test"; + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetBulkStateAsync("testStore", new List() { key }, null)); // Create Response & Respond - var data = "value"; + const string data = "value"; var envelope = MakeGetBulkStateResponse(key, data); var state = await request.CompleteWithMessageAsync(envelope); @@ -78,11 +74,8 @@ public async Task GetBulkStateAsync_CanReadDeserializedState() { await using var client = TestClient.CreateForDaprClient(); - var key = "test"; - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetBulkStateAsync("testStore", new List() {key}, null); - }); + const string key = "test"; + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetBulkStateAsync("testStore", new List() {key}, null)); // Create Response & Respond const string size = "small"; @@ -102,11 +95,8 @@ public async Task GetBulkStateAsync_WrapsRpcException() { await using var client = TestClient.CreateForDaprClient(); - var key = "test"; - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetBulkStateAsync("testStore", new List() { key }, null); - }); + const string key = "test"; + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetBulkStateAsync("testStore", new List() { key }, null)); // Create Response & Respond var ex = await Assert.ThrowsAsync(async () => @@ -121,15 +111,12 @@ public async Task GetBulkStateAsync_ValidateRequest() { await using var client = TestClient.CreateForDaprClient(); - var key = "test"; + const string key = "test"; var metadata = new Dictionary { { "partitionKey", "mypartition" } }; - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetBulkStateAsync("testStore", new List() { key }, null, metadata: metadata); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetBulkStateAsync("testStore", new List() { key }, null, metadata: metadata)); request.Dismiss(); @@ -144,10 +131,7 @@ public async Task GetStateAndEtagAsync_CanReadState() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateAndETagAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAndETagAsync("testStore", "test")); // Create Response & Respond var data = new Widget() { Size = "small", Color = "yellow", }; @@ -165,10 +149,7 @@ public async Task GetStateAndETagAsync_WrapsRpcException() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateAndETagAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAndETagAsync("testStore", "test")); // Create Response & Respond var ex = await Assert.ThrowsAsync(async () => @@ -183,10 +164,7 @@ public async Task GetStateAndETagAsync_WrapsJsonException() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateAndETagAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAndETagAsync("testStore", "test")); // Create Response & Respond var envelope = new Autogenerated.GetStateResponse() @@ -206,10 +184,7 @@ public async Task GetStateAsync_CanReadEmptyState_ReturnsDefault() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateAsync("testStore", "test", ConsistencyMode.Eventual); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync("testStore", "test", ConsistencyMode.Eventual)); // Create Response & Respond var envelope = MakeGetStateResponse(null); @@ -226,10 +201,7 @@ public async Task GetStateAsync_ValidateRequest(ConsistencyMode consistencyMode, { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateAsync("testStore", "test", consistencyMode); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync("testStore", "test", consistencyMode)); // Get Request & Validate var envelope = await request.GetRequestEnvelopeAsync(); @@ -253,10 +225,7 @@ public async Task GetStateAndEtagAsync_ValidateRequest() { { "partitionKey", "mypartition" } }; - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateAsync("testStore", "test", metadata: metadata); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync("testStore", "test", metadata: metadata)); // Get Request & Validate var envelope = await request.GetRequestEnvelopeAsync(); @@ -276,10 +245,7 @@ public async Task GetStateAsync_WrapsRpcException() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync("testStore", "test")); // Create Response & Respond var ex = await Assert.ThrowsAsync(async () => @@ -294,10 +260,7 @@ public async Task GetStateAsync_WrapsJsonException() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync("testStore", "test")); // Create Response & Respond var stateResponse = new Autogenerated.GetStateResponse() @@ -467,10 +430,10 @@ public async Task ExecuteStateTransactionAsync_CanSaveState() }; var state1 = new StateTransactionRequest("stateKey1", JsonSerializer.SerializeToUtf8Bytes(stateValue1), StateOperationType.Upsert, "testEtag", metadata1, options1); - var stateValue2 = 100; + const int stateValue2 = 100; var state2 = new StateTransactionRequest("stateKey2", JsonSerializer.SerializeToUtf8Bytes(stateValue2), StateOperationType.Delete); - var stateValue3 = "teststring"; + const string stateValue3 = "teststring"; var state3 = new StateTransactionRequest("stateKey3", JsonSerializer.SerializeToUtf8Bytes(stateValue3), StateOperationType.Upsert); var states = new List @@ -619,10 +582,7 @@ public async Task GetStateEntryAsync_CanReadState() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateEntryAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateEntryAsync("testStore", "test")); // Create Response & Respond var data = new Widget() { Size = "small", Color = "yellow", }; @@ -639,10 +599,7 @@ public async Task GetStateEntryAsync_CanReadEmptyState_ReturnsDefault() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateEntryAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateEntryAsync("testStore", "test")); // Create Response & Respond var envelope = MakeGetStateResponse(null); @@ -657,10 +614,7 @@ public async Task GetStateEntryAsync_CanSaveState() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateEntryAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateEntryAsync("testStore", "test")); // Create Response & Respond var data = new Widget() { Size = "small", Color = "yellow", }; @@ -699,10 +653,7 @@ public async Task GetStateEntryAsync_CanDeleteState() { await using var client = TestClient.CreateForDaprClient(); - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.GetStateEntryAsync("testStore", "test"); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateEntryAsync("testStore", "test")); // Create Response & Respond var data = new Widget() { Size = "small", Color = "yellow", }; @@ -805,10 +756,7 @@ public async Task TrySaveStateAsync_ValidateOptions( { "key1", "value1" }, { "key2", "value2" } }; - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.TrySaveStateAsync("testStore", "test", widget, "Test_Etag", stateOptions, metadata); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.TrySaveStateAsync("testStore", "test", widget, "Test_Etag", stateOptions, metadata)); request.Dismiss(); @@ -1021,10 +969,7 @@ public async Task TryDeleteStateAsync_ValidateOptions( Consistency = consistencyMode }; - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.TryDeleteStateAsync("testStore", "test", "Test_Etag", stateOptions); - }); + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.TryDeleteStateAsync("testStore", "test", "Test_Etag", stateOptions)); request.Dismiss(); @@ -1042,8 +987,8 @@ public async Task DeleteBulkStateAsync_ValidateRequest() { await using var client = TestClient.CreateForDaprClient(); - var key = "test"; - var etag = "etag"; + const string key = "test"; + const string etag = "etag"; var metadata = new Dictionary { { "partitionKey", "mypartition" } @@ -1069,11 +1014,8 @@ public async Task QueryStateAsync_ValidateResult() { await using var client = TestClient.CreateForDaprClient(); - var queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}"; - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.QueryStateAsync("testStore", queryJson, new Dictionary()); - }); + const string queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}"; + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.QueryStateAsync("testStore", queryJson, new Dictionary())); // Validate request. var envelope = await request.GetRequestEnvelopeAsync(); @@ -1099,11 +1041,8 @@ public async Task QueryStateAsync_EncountersError_ValidatePartialResult() { await using var client = TestClient.CreateForDaprClient(); - var queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}"; - var request = await client.CaptureGrpcRequestAsync(async daprClient => - { - return await daprClient.QueryStateAsync("testStore", queryJson, new Dictionary()); - }); + const string queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}"; + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.QueryStateAsync("testStore", queryJson, new Dictionary())); // Validate request. var envelope = await request.GetRequestEnvelopeAsync(); @@ -1175,14 +1114,371 @@ private Autogenerated.GetBulkStateResponse MakeGetBulkStateResponse(string ke private Autogenerated.QueryStateItem MakeQueryStateItem(string key, T data, string etag = default, string error = default) { - var wireItem = new Autogenerated.QueryStateItem(); - wireItem.Key = key; - wireItem.Data = ByteString.CopyFromUtf8(JsonSerializer.Serialize(data)); - wireItem.Etag = etag ?? string.Empty; - wireItem.Error = error ?? string.Empty; + var wireItem = new Autogenerated.QueryStateItem + { + Key = key, Data = ByteString.CopyFromUtf8(JsonSerializer.Serialize(data)), Etag = etag ?? string.Empty, + Error = error ?? string.Empty + }; return wireItem; } + [Theory] + [InlineData(ConsistencyMode.Eventual, ConcurrencyMode.FirstWrite, StateConsistency.ConsistencyEventual, StateConcurrency.ConcurrencyFirstWrite)] + [InlineData(ConsistencyMode.Eventual, ConcurrencyMode.LastWrite, StateConsistency.ConsistencyEventual, StateConcurrency.ConcurrencyLastWrite)] + [InlineData(ConsistencyMode.Strong, ConcurrencyMode.FirstWrite, StateConsistency.ConsistencyStrong, StateConcurrency.ConcurrencyFirstWrite)] + [InlineData(ConsistencyMode.Strong, ConcurrencyMode.LastWrite, StateConsistency.ConsistencyStrong, StateConcurrency.ConcurrencyLastWrite)] + public async Task SaveByteStateAsync_ValidateOptions( + ConsistencyMode consistencyMode, + ConcurrencyMode concurrencyMode, + StateConsistency expectedConsistency, + StateConcurrency expectedConcurrency) + { + await using var client = TestClient.CreateForDaprClient(); + + const string data = "Test binary data"; + var stateBytes = Encoding.UTF8.GetBytes(data); + var stateOptions = new StateOptions + { + Concurrency = concurrencyMode, + Consistency = consistencyMode + }; + + var metadata = new Dictionary + { + { "key1", "value1" }, + { "key2", "value2" } + }; + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + await daprClient.SaveByteStateAsync("testStore", "test", stateBytes.AsMemory(), stateOptions, metadata); + }); + + request.Dismiss(); + + // Get Request and validate + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.StoreName.Should().Be("testStore"); + envelope.States.Count.Should().Be(1); + var state = envelope.States[0]; + state.Key.Should().Be("test"); + state.Metadata.Count.Should().Be(2); + state.Metadata.Keys.Contains("key1").Should().BeTrue(); + state.Metadata.Keys.Contains("key2").Should().BeTrue(); + state.Metadata["key1"].Should().Be("value1"); + state.Metadata["key2"].Should().Be("value2"); + state.Options.Concurrency.Should().Be(expectedConcurrency); + state.Options.Consistency.Should().Be(expectedConsistency); + + var stateBinaryData = state.Value.ToStringUtf8(); + stateBinaryData.Should().Be(data); + } + + [Fact] + public async Task SaveByteStateAsync_CanSaveState() + { + await using var client = TestClient.CreateForDaprClient(); + + const string data = "Test binary data"; + var stateBytes = Encoding.UTF8.GetBytes(data); + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + await daprClient.SaveByteStateAsync("testStore", "test", stateBytes.AsMemory()); + }); + + request.Dismiss(); + + // Get Request and validate + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.StoreName.Should().Be("testStore"); + envelope.States.Count.Should().Be(1); + var state = envelope.States[0]; + state.Key.Should().Be("test"); + + var stateBinaryData = state.Value.ToStringUtf8(); + stateBinaryData.Should().Be(data); + } + + [Fact] + public async Task SaveByteStateAsync_CanClearState() + { + await using var client = TestClient.CreateForDaprClient(); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + await daprClient.SaveByteStateAsync("testStore", "test", null); + }); + + request.Dismiss(); + + // Get Request and validate + var envelope = await request.GetRequestEnvelopeAsync(); + + envelope.StoreName.Should().Be("testStore"); + envelope.States.Count.Should().Be(1); + var state = envelope.States[0]; + state.Key.Should().Be("test"); + state.Value.Should().Equal(ByteString.Empty); + } + + [Fact] + public async Task SaveByteStateAsync_WithCancelledToken() + { + await using var client = TestClient.CreateForDaprClient(); + + var cts = new CancellationTokenSource(); + cts.Cancel(); + + await Assert.ThrowsAsync(async () => + { + await client.InnerClient.SaveByteStateAsync("testStore", "test", null, cancellationToken: cts.Token); + }); + } + + [Theory] + [InlineData(ConsistencyMode.Eventual, ConcurrencyMode.FirstWrite, StateConsistency.ConsistencyEventual, StateConcurrency.ConcurrencyFirstWrite)] + [InlineData(ConsistencyMode.Eventual, ConcurrencyMode.LastWrite, StateConsistency.ConsistencyEventual, StateConcurrency.ConcurrencyLastWrite)] + [InlineData(ConsistencyMode.Strong, ConcurrencyMode.FirstWrite, StateConsistency.ConsistencyStrong, StateConcurrency.ConcurrencyFirstWrite)] + [InlineData(ConsistencyMode.Strong, ConcurrencyMode.LastWrite, StateConsistency.ConsistencyStrong, StateConcurrency.ConcurrencyLastWrite)] + public async Task TrySaveByteStateAsync_ValidateOptions( + ConsistencyMode consistencyMode, + ConcurrencyMode concurrencyMode, + StateConsistency expectedConsistency, + StateConcurrency expectedConcurrency) + { + await using var client = TestClient.CreateForDaprClient(); + const string data = "Test binary data"; + var stateBytes = Encoding.UTF8.GetBytes(data); + var stateOptions = new StateOptions + { + Concurrency = concurrencyMode, + Consistency = consistencyMode + }; + + var metadata = new Dictionary + { + { "key1", "value1" }, + { "key2", "value2" } + }; + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.TrySaveByteStateAsync("testStore", "test", stateBytes.AsMemory(), "Test_Etag", stateOptions, metadata)); + + request.Dismiss(); + + // Get Request and validate + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.StoreName.Should().Be("testStore"); + envelope.States.Count.Should().Be(1); + var state = envelope.States[0]; + state.Etag.Value.Should().Be("Test_Etag"); + state.Metadata.Count.Should().Be(2); + state.Metadata.Keys.Contains("key1").Should().BeTrue(); + state.Metadata.Keys.Contains("key2").Should().BeTrue(); + state.Metadata["key1"].Should().Be("value1"); + state.Metadata["key2"].Should().Be("value2"); + state.Options.Concurrency.Should().Be(expectedConcurrency); + state.Options.Consistency.Should().Be(expectedConsistency); + + var stateBinaryData = state.Value.ToStringUtf8(); + stateBinaryData.Should().Be(data); + } + + [Fact] + public async Task TrySaveByteStateAsync_ValidateNonETagErrorThrowsException() + { + var client = new MockClient(); + + var response = client.CallStateApi() + .Build(); + const string data = "Test binary data"; + var stateBytes = Encoding.UTF8.GetBytes(data); + var rpcException = new RpcException(new Status(StatusCode.Internal, "Network Error")); + + // Setup the mock client to throw an Rpc Exception with the expected details info + client.Mock + .Setup(m => m.SaveStateAsync(It.IsAny(), It.IsAny())) + .Throws(rpcException); + + var ex = await Assert.ThrowsAsync(async () => + { + await client.DaprClient.TrySaveByteStateAsync("test", "test", stateBytes.AsMemory(), "someETag"); + }); + Assert.Same(rpcException, ex.InnerException); + } + + [Fact] + public async Task TrySaveByteStateAsync_ValidateETagRelatedExceptionReturnsFalse() + { + var client = new MockClient(); + + var response = client.CallStateApi() + .Build(); + const string data = "Test binary data"; + var stateBytes = Encoding.UTF8.GetBytes(data); + var rpcException = new RpcException(new Status(StatusCode.Aborted, $"failed saving state in state store testStore")); + // Setup the mock client to throw an Rpc Exception with the expected details info + client.Mock + .Setup(m => m.SaveStateAsync(It.IsAny(), It.IsAny())) + .Throws(rpcException); + + var operationResult = await client.DaprClient.TrySaveByteStateAsync("testStore", "test", stateBytes.AsMemory(), "invalidETag"); + Assert.False(operationResult); + } + + [Fact] + public async Task TrySaveByteStateAsync_NullEtagThrowsArgumentException() + { + var client = new MockClient(); + const string data = "Test binary data"; + var stateBytes = Encoding.UTF8.GetBytes(data); + var response = client.CallStateApi() + .Build(); + + await FluentActions.Awaiting(async () => await client.DaprClient.TrySaveByteStateAsync("test", "test", stateBytes.AsMemory(), null)) + .Should().ThrowAsync(); + } + + [Fact] + public async Task TrySaveByteStateAsync_EmptyEtagDoesNotThrow() + { + var client = new MockClient(); + const string data = "Test binary data"; + var stateBytes = Encoding.UTF8.GetBytes(data); + var response = client.CallStateApi() + .Build(); + + // Setup the mock client to return success + client.Mock + .Setup(m => m.SaveStateAsync(It.IsAny(), It.IsAny())) + .Returns(response); + + var result = await client.DaprClient.TrySaveByteStateAsync("test", "test", stateBytes.AsMemory(), ""); + Assert.True(result); + } + [Fact] + public async Task GetByteStateAsync_CanReadEmptyState_ReturnsDefault() + { + await using var client = TestClient.CreateForDaprClient(); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAsync("testStore", "test", ConsistencyMode.Eventual)); + + // Create Response & Respond to request + var envelope = MakeGetByteStateResponse(null); + var state = await request.CompleteWithMessageAsync(envelope); + + // Get response and validate + state.ToArray().Should().BeNullOrEmpty(); + } + + [Theory] + [InlineData(ConsistencyMode.Eventual, StateConsistency.ConsistencyEventual)] + [InlineData(ConsistencyMode.Strong, StateConsistency.ConsistencyStrong)] + public async Task GetByteStateAsync_ValidateRequest(ConsistencyMode consistencyMode, StateConsistency expectedConsistencyMode) + { + await using var client = TestClient.CreateForDaprClient(); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAsync("testStore", "test", consistencyMode)); + + // Get Request & Validate + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.StoreName.Should().Be("testStore"); + envelope.Key.Should().Be("test"); + envelope.Consistency.Should().Be(expectedConsistencyMode); + var binaryData = Encoding.ASCII.GetBytes("test data"); + // Create Response & Respond + var state = await request.CompleteWithMessageAsync(MakeGetByteStateResponse(binaryData.AsMemory())); + var stateStr = ByteString.CopyFrom(state.Span).ToByteArray(); + // Get response and validate + stateStr.Should().BeEquivalentTo(binaryData); + } + + [Fact] + public async Task GetByteStateAndEtagAsync_ValidateRequest() + { + await using var client = TestClient.CreateForDaprClient(); + + var metadata = new Dictionary + { + { "partitionKey", "mypartition" } + }; + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAndETagAsync("testStore", "test", metadata: metadata)); + + // Get Request & Validate + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.StoreName.Should().Be("testStore"); + envelope.Key.Should().Be("test"); + envelope.Metadata.Should().BeEquivalentTo(metadata); + var binaryData = Encoding.ASCII.GetBytes("test data"); + // Create Response & Respond + var (state, etag) = await request.CompleteWithMessageAsync((MakeGetByteStateResponse(binaryData.AsMemory()))); + var stateStr = ByteString.CopyFrom(state.Span).ToByteArray(); + // Get response and validate + stateStr.Should().BeEquivalentTo(binaryData); + } + [Fact] + public async Task GetByteStateAsync_WrapsRpcException() + { + await using var client = TestClient.CreateForDaprClient(); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAsync("testStore", "test")); + + // Create Response & Respond + var ex = await Assert.ThrowsAsync(async () => + { + await request.CompleteAsync(new HttpResponseMessage(HttpStatusCode.NotAcceptable)); + }); + Assert.IsType(ex.InnerException); + } + + [Fact] + public async Task GetByteStateAndEtagAsync_CanReadState() + { + await using var client = TestClient.CreateForDaprClient(); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAndETagAsync("testStore", "test")); + + // Create Response & Respond + var binaryData = Encoding.ASCII.GetBytes("test data"); + var envelope = MakeGetByteStateResponse(binaryData.AsMemory(), "Test_Etag"); + var (state, etag) = await request.CompleteWithMessageAsync(envelope); + var stateStr = ByteString.CopyFrom(state.Span).ToByteArray(); + // Get response and validate + stateStr.Should().BeEquivalentTo(binaryData); + etag.Should().Be("Test_Etag"); + } + + [Fact] + public async Task GetByteStateAndETagAsync_WrapsRpcException() + { + await using var client = TestClient.CreateForDaprClient(); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAndETagAsync("testStore", "test")); + + // Create Response & Respond + var ex = await Assert.ThrowsAsync(async () => + { + await request.CompleteAsync(new HttpResponseMessage(HttpStatusCode.NotAcceptable)); + }); + Assert.IsType(ex.InnerException); + } + + private Autogenerated.GetStateResponse MakeGetByteStateResponse(ReadOnlyMemory state, string etag = null) + { + + var response = new Autogenerated.GetStateResponse(); + + // convert to byte string if state is not null + if (!state.Span.IsEmpty) + { + response.Data = ByteString.CopyFrom(state.Span); + } + + if (etag != null) + { + response.Etag = etag; + } + + return response; + } private class Widget { public string Size { get; set; } diff --git a/test/Dapr.Jobs.Test/Extensions/DaprJobsServiceCollectionExtensionsTests.cs b/test/Dapr.Jobs.Test/Extensions/DaprJobsServiceCollectionExtensionsTests.cs index 3b2c5f990..28a8a0681 100644 --- a/test/Dapr.Jobs.Test/Extensions/DaprJobsServiceCollectionExtensionsTests.cs +++ b/test/Dapr.Jobs.Test/Extensions/DaprJobsServiceCollectionExtensionsTests.cs @@ -25,6 +25,7 @@ namespace Dapr.Jobs.Test.Extensions; public class DaprJobsServiceCollectionExtensionsTest { [Fact] + public void AddDaprJobsClient_FromIConfiguration() { const string apiToken = "abc123"; @@ -50,6 +51,7 @@ public void AddDaprJobsClient_RegistersDaprClientOnlyOnce() { var services = new ServiceCollection(); + var clientBuilder = new Action((sp, builder) => { builder.UseDaprApiToken("abc");