diff --git a/JDBC.NET.Data/JDBC.NET.Data.csproj b/JDBC.NET.Data/JDBC.NET.Data.csproj index 336d6a0..3cc1675 100644 --- a/JDBC.NET.Data/JDBC.NET.Data.csproj +++ b/JDBC.NET.Data/JDBC.NET.Data.csproj @@ -10,14 +10,14 @@ https://github.com/chequer-io/JDBC.NET https://github.com/chequer-io/JDBC.NET Logo.jpg - 3.5.8 + 3.5.9 true - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/JDBC.NET.Data/JdbcCallInterceptor.cs b/JDBC.NET.Data/JdbcCallInterceptor.cs new file mode 100644 index 0000000..b91e7d8 --- /dev/null +++ b/JDBC.NET.Data/JdbcCallInterceptor.cs @@ -0,0 +1,171 @@ +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Interceptors; +using JDBC.NET.Data.Exceptions; + +namespace JDBC.NET.Data; + +internal sealed class JdbcCallInterceptor : Interceptor +{ + public override TResponse BlockingUnaryCall(TRequest request, ClientInterceptorContext context, BlockingUnaryCallContinuation continuation) + { + try + { + return base.BlockingUnaryCall(request, context, continuation); + } + catch (RpcException e) + { + throw new JdbcException(e); + } + } + + public override AsyncUnaryCall AsyncUnaryCall(TRequest request, ClientInterceptorContext context, AsyncUnaryCallContinuation continuation) + { + AsyncUnaryCall asyncCall = base.AsyncUnaryCall(request, context, continuation); + + return new AsyncUnaryCall( + Wrap(asyncCall.ResponseAsync), + Wrap(asyncCall.ResponseHeadersAsync), + asyncCall.GetStatus, + asyncCall.GetTrailers, + asyncCall.Dispose + ); + } + + public override AsyncClientStreamingCall AsyncClientStreamingCall(ClientInterceptorContext context, AsyncClientStreamingCallContinuation continuation) + { + AsyncClientStreamingCall asyncCall = base.AsyncClientStreamingCall(context, continuation); + + return new AsyncClientStreamingCall( + Wrap(asyncCall.RequestStream), + Wrap(asyncCall.ResponseAsync), + Wrap(asyncCall.ResponseHeadersAsync), + asyncCall.GetStatus, + asyncCall.GetTrailers, + asyncCall.Dispose + ); + } + + public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(ClientInterceptorContext context, AsyncDuplexStreamingCallContinuation continuation) + { + AsyncDuplexStreamingCall asyncCall = base.AsyncDuplexStreamingCall(context, continuation); + + return new AsyncDuplexStreamingCall( + Wrap(asyncCall.RequestStream), + Wrap(asyncCall.ResponseStream), + Wrap(asyncCall.ResponseHeadersAsync), + asyncCall.GetStatus, + asyncCall.GetTrailers, + asyncCall.Dispose + ); + } + + public override AsyncServerStreamingCall AsyncServerStreamingCall(TRequest request, ClientInterceptorContext context, AsyncServerStreamingCallContinuation continuation) + { + AsyncServerStreamingCall asyncCall = base.AsyncServerStreamingCall(request, context, continuation); + + return new AsyncServerStreamingCall( + Wrap(asyncCall.ResponseStream), + Wrap(asyncCall.ResponseHeadersAsync), + asyncCall.GetStatus, + asyncCall.GetTrailers, + asyncCall.Dispose + ); + } + + private static async Task Wrap(Task task) + { + if (task.Status != TaskStatus.WaitingForActivation) + return await task; + + try + { + return await task; + } + catch (RpcException e) + { + throw new JdbcException(e); + } + } + + private static IClientStreamWriter Wrap(IClientStreamWriter writer) + { + if (writer == null) + return null; + + return new JdbcClientStreamWriter(writer); + } + + private static IAsyncStreamReader Wrap(IAsyncStreamReader reader) + { + if (reader == null) + return null; + + return new JdbcAsyncStreamReader(reader); + } + + private readonly struct JdbcClientStreamWriter : IClientStreamWriter + { + public WriteOptions WriteOptions + { + get => _writer.WriteOptions; + set => _writer.WriteOptions = value; + } + + private readonly IClientStreamWriter _writer; + + public JdbcClientStreamWriter(IClientStreamWriter writer) + { + _writer = writer; + } + + public async Task WriteAsync(T message) + { + try + { + await _writer.WriteAsync(message); + } + catch (RpcException e) + { + throw new JdbcException(e); + } + } + + public async Task CompleteAsync() + { + try + { + await _writer.CompleteAsync(); + } + catch (RpcException e) + { + throw new JdbcException(e); + } + } + } + + private readonly struct JdbcAsyncStreamReader : IAsyncStreamReader + { + public T Current => _reader.Current; + + private readonly IAsyncStreamReader _reader; + + public JdbcAsyncStreamReader(IAsyncStreamReader reader) + { + _reader = reader; + } + + public async Task MoveNext(CancellationToken cancellationToken) + { + try + { + return await _reader.MoveNext(cancellationToken); + } + catch (RpcException e) + { + throw new JdbcException(e); + } + } + } +} diff --git a/JDBC.NET.Data/JdbcCallInvoker.cs b/JDBC.NET.Data/JdbcCallInvoker.cs deleted file mode 100644 index 000f189..0000000 --- a/JDBC.NET.Data/JdbcCallInvoker.cs +++ /dev/null @@ -1,175 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; -using Grpc.Core; -using JDBC.NET.Data.Exceptions; - -namespace JDBC.NET.Data -{ - internal sealed class JdbcCallInvoker : DefaultCallInvoker - { - public JdbcCallInvoker(Channel channel) : base(channel) - { - } - - public override TResponse BlockingUnaryCall(Method method, string host, CallOptions options, TRequest request) - { - try - { - return base.BlockingUnaryCall(method, host, options, request); - } - catch (RpcException e) - { - throw new JdbcException(e); - } - } - - public override AsyncUnaryCall AsyncUnaryCall(Method method, string host, CallOptions options, TRequest request) - { - AsyncUnaryCall asyncCall = base.AsyncUnaryCall(method, host, options, request); - - return new AsyncUnaryCall( - Wrap(asyncCall.ResponseAsync), - Wrap(asyncCall.ResponseHeadersAsync), - asyncCall.GetStatus, - asyncCall.GetTrailers, - asyncCall.Dispose - ); - } - - public override AsyncClientStreamingCall AsyncClientStreamingCall(Method method, string host, CallOptions options) - { - AsyncClientStreamingCall asyncCall = base.AsyncClientStreamingCall(method, host, options); - - return new AsyncClientStreamingCall( - Wrap(asyncCall.RequestStream), - Wrap(asyncCall.ResponseAsync), - Wrap(asyncCall.ResponseHeadersAsync), - asyncCall.GetStatus, - asyncCall.GetTrailers, - asyncCall.Dispose - ); - } - - public override AsyncDuplexStreamingCall AsyncDuplexStreamingCall(Method method, string host, CallOptions options) - { - AsyncDuplexStreamingCall asyncCall = base.AsyncDuplexStreamingCall(method, host, options); - - return new AsyncDuplexStreamingCall( - Wrap(asyncCall.RequestStream), - Wrap(asyncCall.ResponseStream), - Wrap(asyncCall.ResponseHeadersAsync), - asyncCall.GetStatus, - asyncCall.GetTrailers, - asyncCall.Dispose - ); - } - - public override AsyncServerStreamingCall AsyncServerStreamingCall(Method method, string host, CallOptions options, TRequest request) - { - AsyncServerStreamingCall asyncCall = base.AsyncServerStreamingCall(method, host, options, request); - - return new AsyncServerStreamingCall( - Wrap(asyncCall.ResponseStream), - Wrap(asyncCall.ResponseHeadersAsync), - asyncCall.GetStatus, - asyncCall.GetTrailers, - asyncCall.Dispose - ); - } - - private static async Task Wrap(Task task) - { - if (task.Status != TaskStatus.WaitingForActivation) - return await task; - - try - { - return await task; - } - catch (RpcException e) - { - throw new JdbcException(e); - } - } - - private static IClientStreamWriter Wrap(IClientStreamWriter writer) - { - if (writer == null) - return null; - - return new JdbcClientStreamWriter(writer); - } - - private static IAsyncStreamReader Wrap(IAsyncStreamReader reader) - { - if (reader == null) - return null; - - return new JdbcAsyncStreamReader(reader); - } - - private readonly struct JdbcClientStreamWriter : IClientStreamWriter - { - public WriteOptions WriteOptions - { - get => _writer.WriteOptions; - set => _writer.WriteOptions = value; - } - - private readonly IClientStreamWriter _writer; - - public JdbcClientStreamWriter(IClientStreamWriter writer) - { - _writer = writer; - } - - public async Task WriteAsync(T message) - { - try - { - await _writer.WriteAsync(message); - } - catch (RpcException e) - { - throw new JdbcException(e); - } - } - - public async Task CompleteAsync() - { - try - { - await _writer.CompleteAsync(); - } - catch (RpcException e) - { - throw new JdbcException(e); - } - } - } - - private readonly struct JdbcAsyncStreamReader : IAsyncStreamReader - { - public T Current => _reader.Current; - - private readonly IAsyncStreamReader _reader; - - public JdbcAsyncStreamReader(IAsyncStreamReader reader) - { - _reader = reader; - } - - public async Task MoveNext(CancellationToken cancellationToken) - { - try - { - return await _reader.MoveNext(cancellationToken); - } - catch (RpcException e) - { - throw new JdbcException(e); - } - } - } - } -} diff --git a/JDBC.NET.Data/JdbcChannel.cs b/JDBC.NET.Data/JdbcChannel.cs deleted file mode 100644 index c6d89fd..0000000 --- a/JDBC.NET.Data/JdbcChannel.cs +++ /dev/null @@ -1,29 +0,0 @@ -using System.Collections.Generic; -using Grpc.Core; - -namespace JDBC.NET.Data -{ - internal sealed class JdbcChannel : Channel - { - public JdbcChannel(string target, ChannelCredentials credentials) : base(target, credentials) - { - } - - public JdbcChannel(string target, ChannelCredentials credentials, IEnumerable options) : base(target, credentials, options) - { - } - - public JdbcChannel(string host, int port, ChannelCredentials credentials) : base(host, port, credentials) - { - } - - public JdbcChannel(string host, int port, ChannelCredentials credentials, IEnumerable options) : base(host, port, credentials, options) - { - } - - public override CallInvoker CreateCallInvoker() - { - return new JdbcCallInvoker(this); - } - } -} diff --git a/JDBC.NET.Data/Models/JdbcBridge.cs b/JDBC.NET.Data/Models/JdbcBridge.cs index 845790e..6946563 100644 --- a/JDBC.NET.Data/Models/JdbcBridge.cs +++ b/JDBC.NET.Data/Models/JdbcBridge.cs @@ -6,7 +6,8 @@ using System.Reflection; using System.Runtime.InteropServices; using System.Threading; -using Grpc.Core; +using Grpc.Core.Interceptors; +using Grpc.Net.Client; using J2NET; using JDBC.NET.Proto; @@ -15,7 +16,7 @@ namespace JDBC.NET.Data.Models internal sealed class JdbcBridge : IDisposable { #region Fields - private Channel _channel; + private GrpcChannel _channel; private Process _process; #endregion @@ -109,14 +110,21 @@ private void Initialize() { var port = bridgePort.GetPort(); - var channel = new JdbcChannel(host, port, ChannelCredentials.Insecure); - channel.ConnectAsync().Wait(CancellationToken.None); - - Driver = new DriverService.DriverServiceClient(channel); - Reader = new ReaderService.ReaderServiceClient(channel); - Statement = new StatementService.StatementServiceClient(channel); - Database = new DatabaseService.DatabaseServiceClient(channel); - MetaData = new MetaDataService.MetaDataServiceClient(channel); + var options = new GrpcChannelOptions + { + MaxReceiveMessageSize = null, + MaxSendMessageSize = null, + DisposeHttpClient = true, + }; + + var channel = GrpcChannel.ForAddress($"http://{host}:{port}", options); + var channelWithInterceptor = channel.Intercept(new JdbcCallInterceptor()); + + Driver = new DriverService.DriverServiceClient(channelWithInterceptor); + Reader = new ReaderService.ReaderServiceClient(channelWithInterceptor); + Statement = new StatementService.StatementServiceClient(channelWithInterceptor); + Database = new DatabaseService.DatabaseServiceClient(channelWithInterceptor); + MetaData = new MetaDataService.MetaDataServiceClient(channelWithInterceptor); var loadDriverResponse = Driver.loadDriver( new LoadDriverRequest