Skip to content

Commit

Permalink
Merge pull request #6 from deeplay-io/feature/add-keep-alive-overload…
Browse files Browse the repository at this point in the history
…-for-etcd-grpc-proxy

Добавлены новые методы LeaseKeepAliveV2 и WatchV2, которые не падают при использовании GRPC-proxy.
  • Loading branch information
AlexZelarge authored Jan 17, 2024
2 parents 52764b6 + 6d2c497 commit a73ae9d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 14 deletions.
28 changes: 27 additions & 1 deletion dotnet-etcd/leaseClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,32 @@ public async Task LeaseKeepAlive(LeaseKeepAliveRequest request, Action<LeaseKeep
}
}).ConfigureAwait(false);

/// <summary>
/// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client
/// to the server and streaming keep alive responses from the server to the client.
/// </summary>
/// <param name="request">The request to send to the server.</param>
/// <param name="method"></param>
/// <param name="cancellationToken"></param>
/// <param name="headers">The initial metadata to send with the call. This parameter is optional.</param>
public async Task LeaseKeepAliveV2(LeaseKeepAliveRequest request, Action<LeaseKeepAliveResponse> method,
CancellationToken cancellationToken, Metadata headers = null) => await CallEtcdAsync(async connection =>
{
using (AsyncDuplexStreamingCall<LeaseKeepAliveRequest, LeaseKeepAliveResponse> leaser =
connection._leaseClient
.LeaseKeepAlive(headers, cancellationToken: cancellationToken))
{
await leaser.RequestStream.WriteAsync(request).ConfigureAwait(false);
await leaser.ResponseStream
.MoveNext(
cancellationToken)
.ConfigureAwait(false);
LeaseKeepAliveResponse update = leaser.ResponseStream.Current;
method(update);
await leaser.RequestStream.CompleteAsync().ConfigureAwait(false);
}
}).ConfigureAwait(false);

/// <summary>
/// LeaseKeepAlive keeps the lease alive by streaming keep alive requests from the client
/// to the server and streaming keep alive responses from the server to the client.
Expand Down Expand Up @@ -273,4 +299,4 @@ public async Task<LeaseTimeToLiveResponse> LeaseTimeToLiveAsync(LeaseTimeToLiveR
CancellationToken cancellationToken = default) => await CallEtcdAsync(async (connection) => await connection._leaseClient
.LeaseTimeToLiveAsync(request, headers, deadline, cancellationToken)).ConfigureAwait(false);
}
}
}
57 changes: 44 additions & 13 deletions dotnet-etcd/watchClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,38 @@ public async Task Watch(WatchRequest request, Action<WatchResponse> method, Grpc

/// <summary>
/// Watches a key according to the specified watch request and
/// passes the watch response to the methods provided.
/// passes the watch response to the method provided.
/// </summary>
/// <param name="request">Watch Request containing key to be watched</param>
/// <param name="method">Method to which watch response should be passed on</param>
/// <param name="headers">The initial metadata to send with the call. This parameter is optional.</param>
/// <param name="deadline">An optional deadline for the call. The call will be cancelled if deadline is hit.</param>
/// <param name="cancellationToken">An optional token for canceling the call.</param>
public async Task WatchV2(WatchRequest request, Action<WatchResponse> method, Grpc.Core.Metadata headers = null,
DateTime? deadline = null,
CancellationToken cancellationToken = default) => await CallEtcdAsync(async (connection) =>
{
using (AsyncDuplexStreamingCall<WatchRequest, WatchResponse> watcher =
connection._watchClient.Watch(headers, deadline, cancellationToken))
{
Task watcherTask = Task.Run(async () =>
{
while (await watcher.ResponseStream.MoveNext(cancellationToken).ConfigureAwait(false))
{
WatchResponse update = watcher.ResponseStream.Current;
method(update);
}
}, cancellationToken);
await watcher.RequestStream.WriteAsync(request).ConfigureAwait(false);
await watcherTask.ConfigureAwait(false);
await watcher.RequestStream.CompleteAsync().ConfigureAwait(false);
}
}).ConfigureAwait(false);

/// <summary>
/// Watches a key according to the specified watch request and
/// passes the watch response to the methods provided.
/// </summary>
/// <param name="request">Watch Request containing key to be watched</param>
/// <param name="methods">Methods to which watch response should be passed on</param>
Expand Down Expand Up @@ -115,7 +146,7 @@ public async Task Watch(WatchRequest request, Action<WatchResponse>[] methods,

/// <summary>
/// Watches a key according to the specified watch request and
/// passes the minimal watch event data to the method provided.
/// passes the minimal watch event data to the method provided.
/// </summary>
/// <param name="request">Watch Request containing key to be watched</param>
/// <param name="method">Method to which minimal watch events data should be passed on</param>
Expand Down Expand Up @@ -155,7 +186,7 @@ public async Task Watch(WatchRequest request, Action<WatchEvent[]> method, Grpc.

/// <summary>
/// Watches a key according to the specified watch request and
/// passes the minimal watch event data to the methods provided.
/// passes the minimal watch event data to the methods provided.
/// </summary>
/// <param name="request">Watch Request containing key to be watched</param>
/// <param name="methods">Methods to which minimal watch events data should be passed on</param>
Expand Down Expand Up @@ -233,7 +264,7 @@ public async Task Watch(WatchRequest[] requests, Action<WatchResponse> method,

/// <summary>
/// Watches a key according to the specified watch requests and
/// passes the watch response to the methods provided.
/// passes the watch response to the methods provided.
/// </summary>
/// <param name="requests">Watch Requests containing keys to be watched</param>
/// <param name="methods">Methods to which watch response should be passed on</param>
Expand Down Expand Up @@ -271,7 +302,7 @@ public async Task Watch(WatchRequest[] requests, Action<WatchResponse>[] methods

/// <summary>
/// Watches a key according to the specified watch request and
/// passes the minimal watch event data to the method provided.
/// passes the minimal watch event data to the method provided.
/// </summary>
/// <param name="requests">Watch Requests containing keys to be watched</param>
/// <param name="method">Method to which minimal watch events data should be passed on</param>
Expand Down Expand Up @@ -315,7 +346,7 @@ public async Task Watch(WatchRequest[] requests, Action<WatchEvent[]> method, Gr

/// <summary>
/// Watches a key according to the specified watch requests and
/// passes the minimal watch event data to the methods provided.
/// passes the minimal watch event data to the methods provided.
/// </summary>
/// <param name="requests">Watch Request containing keys to be watched</param>
/// <param name="methods">Methods to which minimal watch events data should be passed on</param>
Expand Down Expand Up @@ -605,7 +636,7 @@ public async Task WatchRange(WatchRequest request, Action<WatchResponse> method,

/// <summary>
/// Watches a key range according to the specified watch request and
/// passes the watch response to the methods provided.
/// passes the watch response to the methods provided.
/// </summary>
/// <param name="request">Watch Request containing key to be watched</param>
/// <param name="methods">Methods to which watch response should be passed on</param>
Expand Down Expand Up @@ -639,7 +670,7 @@ public async Task WatchRange(WatchRequest request, Action<WatchResponse>[] metho

/// <summary>
/// Watches a key range according to the specified watch request and
/// passes the minimal watch event data to the method provided.
/// passes the minimal watch event data to the method provided.
/// </summary>
/// <param name="request">Watch Request containing key to be watched</param>
/// <param name="method">Method to which minimal watch events data should be passed on</param>
Expand Down Expand Up @@ -679,7 +710,7 @@ public async Task WatchRange(WatchRequest request, Action<WatchEvent[]> method,

/// <summary>
/// Watches a key range according to the specified watch request and
/// passes the minimal watch event data to the methods provided.
/// passes the minimal watch event data to the methods provided.
/// </summary>
/// <param name="request">Watch Request containing key to be watched</param>
/// <param name="methods">Methods to which minimal watch events data should be passed on</param>
Expand Down Expand Up @@ -757,7 +788,7 @@ public async Task WatchRange(WatchRequest[] requests, Action<WatchResponse> meth

/// <summary>
/// Watches a key range according to the specified watch requests and
/// passes the watch response to the methods provided.
/// passes the watch response to the methods provided.
/// </summary>
/// <param name="requests">Watch Requests containing keys to be watched</param>
/// <param name="methods">Methods to which watch response should be passed on</param>
Expand Down Expand Up @@ -795,7 +826,7 @@ public async Task WatchRange(WatchRequest[] requests, Action<WatchResponse>[] me

/// <summary>
/// Watches a key range according to the specified watch request and
/// passes the minimal watch event data to the method provided.
/// passes the minimal watch event data to the method provided.
/// </summary>
/// <param name="requests">Watch Requests containing keys to be watched</param>
/// <param name="method">Method to which minimal watch events data should be passed on</param>
Expand Down Expand Up @@ -839,7 +870,7 @@ public async Task WatchRange(WatchRequest[] requests, Action<WatchEvent[]> metho

/// <summary>
/// Watches a key range according to the specified watch requests and
/// passes the minimal watch event data to the methods provided.
/// passes the minimal watch event data to the methods provided.
/// </summary>
/// <param name="requests">Watch Request containing keys to be watched</param>
/// <param name="methods">Methods to which minimal watch events data should be passed on</param>
Expand Down Expand Up @@ -1102,4 +1133,4 @@ public void WatchRange(string[] paths, Action<WatchEvent[]>[] methods, Grpc.Core

#endregion
}
}
}

0 comments on commit a73ae9d

Please sign in to comment.