From 36ec161a1de888dbf8646315d803f9924037d459 Mon Sep 17 00:00:00 2001 From: Maksim Mukhamatulin Date: Fri, 26 Jun 2020 19:16:13 +0300 Subject: [PATCH 1/3] AAP-18596: Improved logging and exception messages --- .../Internal/InvocationRequestHandler.cs | 4 +-- .../Internal/RegistryService.cs | 25 +++++++++++++++---- .../Plexus.Interop.Broker.Core.csproj | 1 + .../MetadataViolationException.cs | 11 ++++++++ 4 files changed, 34 insertions(+), 7 deletions(-) create mode 100644 desktop/src/Plexus.Interop.Transport.Protocol.Contracts/MetadataViolationException.cs diff --git a/desktop/src/Plexus.Interop.Broker.Core/Internal/InvocationRequestHandler.cs b/desktop/src/Plexus.Interop.Broker.Core/Internal/InvocationRequestHandler.cs index 57004844f..67c9f7186 100644 --- a/desktop/src/Plexus.Interop.Broker.Core/Internal/InvocationRequestHandler.cs +++ b/desktop/src/Plexus.Interop.Broker.Core/Internal/InvocationRequestHandler.cs @@ -348,9 +348,9 @@ private static void DisposeFrame(TransportMessageFrame frame) private static async Task PropagateAsync(ITransportChannel source, ITransportChannel target) { + int propagatedMessageCount = 0; try { - int propagatedMessageCount = 0; while (true) { Log.Trace($"Waiting for TransportMessageFrame from {source.Id} to propagate to {target.Id}"); @@ -378,7 +378,7 @@ private static async Task PropagateAsync(ITransportChannel source, ITransportCha } catch (Exception ex) { - Log.Warn(ex, $"Caught exception during attempt to propagate TransportMessageFrame from {source.Id} to {target.Id}"); + Log.Warn(ex, $"Caught exception during attempt to propagate TransportMessageFrame from {source.Id} to {target.Id}. Total {propagatedMessageCount} messages propagated"); target.Out.TryTerminate(ex); } } diff --git a/desktop/src/Plexus.Interop.Broker.Core/Internal/RegistryService.cs b/desktop/src/Plexus.Interop.Broker.Core/Internal/RegistryService.cs index aa514637c..58009c86e 100644 --- a/desktop/src/Plexus.Interop.Broker.Core/Internal/RegistryService.cs +++ b/desktop/src/Plexus.Interop.Broker.Core/Internal/RegistryService.cs @@ -23,6 +23,7 @@ namespace Plexus.Interop.Broker.Internal using System.Collections.Generic; using System.Linq; using System.Threading; + using Plexus.Interop.Transport.Protocol; internal sealed class RegistryService : IRegistryService, IDisposable { @@ -71,7 +72,11 @@ public IApplication GetApplication(string appId) _registryLock.EnterReadLock(); try { - return _registry.Applications[appId]; + if (_registry.Applications.TryGetValue(appId, out var application)) + { + return application; + } + throw new MetadataViolationException($"Application {appId} do not exist in metadata. Available applications: {string.Join(", ", _registry.Applications.Keys)}"); } finally { @@ -84,9 +89,14 @@ public IConsumedService GetConsumedService(string appId, IConsumedServiceReferen _registryLock.EnterReadLock(); try { - return _registry.Applications[appId].ConsumedServices - .FirstOrDefault(x => - Equals(x.Service.Id, reference.ServiceId) && Equals(x.Alias, reference.ServiceAlias)); + var application = GetApplication(appId); + var consumedService = application.ConsumedServices + .FirstOrDefault(service => Equals(service.Service.Id, reference.ServiceId) && Equals(service.Alias, reference.ServiceAlias)); + if (consumedService != null) + { + return consumedService; + } + throw new MetadataViolationException($"Service {reference.ServiceId} with alias {reference.ServiceAlias} do not exist or is not consumed by {appId} application. Available services: {string.Join(", ", application.ConsumedServices.Select(service => service.Service.Id))}"); } finally { @@ -99,8 +109,13 @@ public IConsumedMethod GetConsumedMethod(string appId, IConsumedMethodReference _registryLock.EnterReadLock(); try { + var methodId = reference.MethodId; var service = GetConsumedService(appId, reference.ConsumedService); - return service.Methods[reference.MethodId]; + if (service.Methods.TryGetValue(methodId, out var consumedMethod)) + { + return consumedMethod; + } + throw new MetadataViolationException($"Method {methodId} do not exist in service {service.Service.Id} or is not consumed by {appId} application. Available methods: {string.Join(", ", service.Methods.Keys)}"); } finally { diff --git a/desktop/src/Plexus.Interop.Broker.Core/Plexus.Interop.Broker.Core.csproj b/desktop/src/Plexus.Interop.Broker.Core/Plexus.Interop.Broker.Core.csproj index 65b8cde62..a008a788a 100644 --- a/desktop/src/Plexus.Interop.Broker.Core/Plexus.Interop.Broker.Core.csproj +++ b/desktop/src/Plexus.Interop.Broker.Core/Plexus.Interop.Broker.Core.csproj @@ -20,6 +20,7 @@ + diff --git a/desktop/src/Plexus.Interop.Transport.Protocol.Contracts/MetadataViolationException.cs b/desktop/src/Plexus.Interop.Transport.Protocol.Contracts/MetadataViolationException.cs new file mode 100644 index 000000000..20df5f43e --- /dev/null +++ b/desktop/src/Plexus.Interop.Transport.Protocol.Contracts/MetadataViolationException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Plexus.Interop.Transport.Protocol +{ + public class MetadataViolationException : ProtocolException + { + public MetadataViolationException(string remoteMessage, Exception innerException = null) : base(remoteMessage, innerException) + { + } + } +} From 1b49e9091e8db38f33e9c4e8a7e5357a3e3556f2 Mon Sep 17 00:00:00 2001 From: Maksim Mukhamatulin Date: Thu, 16 Jul 2020 16:51:49 +0300 Subject: [PATCH 2/3] AAP-18858: Implemented reading message length in loop in case if length is splitted --- .../Internal/AppLifecycleManager.cs | 2 +- .../Internal/InvocationRequestHandler.cs | 41 +++++++++++++++---- .../Internal/StreamTransmissionReader.cs | 20 +++++++-- .../StreamTransmissionConnection.cs | 4 +- 4 files changed, 53 insertions(+), 14 deletions(-) diff --git a/desktop/src/Plexus.Interop.Apps.Manager/Internal/AppLifecycleManager.cs b/desktop/src/Plexus.Interop.Apps.Manager/Internal/AppLifecycleManager.cs index 68c9d51e8..0dcda344c 100644 --- a/desktop/src/Plexus.Interop.Apps.Manager/Internal/AppLifecycleManager.cs +++ b/desktop/src/Plexus.Interop.Apps.Manager/Internal/AppLifecycleManager.cs @@ -249,7 +249,7 @@ private async Task LaunchAsync( throw new InvalidOperationException($"Launcher is not defined for application {appId}"); } - Log.Debug("Sending request to launcher {0}: appId={1}, params={2}", appDto.LauncherId, appId, appDto.LauncherParams); + Log.Debug("Sending request to launcher {0}: appId={1}, params={2}", appDto.LauncherId, appId, string.Join("; ", appDto.LauncherParams.Select(kvp => $"{kvp.Key}:{kvp.Value}"))); var referrer = new AppLaunchReferrer { diff --git a/desktop/src/Plexus.Interop.Broker.Core/Internal/InvocationRequestHandler.cs b/desktop/src/Plexus.Interop.Broker.Core/Internal/InvocationRequestHandler.cs index 67c9f7186..4682acba1 100644 --- a/desktop/src/Plexus.Interop.Broker.Core/Internal/InvocationRequestHandler.cs +++ b/desktop/src/Plexus.Interop.Broker.Core/Internal/InvocationRequestHandler.cs @@ -349,36 +349,61 @@ private static void DisposeFrame(TransportMessageFrame frame) private static async Task PropagateAsync(ITransportChannel source, ITransportChannel target) { int propagatedMessageCount = 0; + var targetId = target.Id; + var sourceId = source.Id; + bool exceptionLogged = false; try { while (true) { - Log.Trace($"Waiting for TransportMessageFrame from {source.Id} to propagate to {target.Id}"); + Log.Trace($"Waiting for TransportMessageFrame from {sourceId} to propagate to {targetId}"); - var result = await source.In.TryReadAsync().ConfigureAwait(false); + Maybe result; + try + { + result = await source.In.TryReadAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + Log.Warn(ex, $"Caught exception during attempt to read TransportMessageFrame from source channel {sourceId} to propagate it to {targetId} channel. Total {propagatedMessageCount} messages propagated"); + exceptionLogged = true; + throw; + } if (!result.HasValue) { - Log.Trace($"Received empty TransportMessageFrame from {source.Id}. Will complete {target.Id} channel"); + Log.Trace($"Received empty TransportMessageFrame from {sourceId}. Will complete {targetId} channel"); break; } var messageFrame = result.Value; - Log.Trace($"Received TransportMessageFrame {messageFrame} from {source.Id}. Will try to propagate it to {target.Id} channel"); - await target.Out.WriteAsync(messageFrame).ConfigureAwait(false); + Log.Trace($"Received TransportMessageFrame {messageFrame} from {sourceId}. Will try to propagate it to {targetId} channel"); + try + { + await target.Out.WriteAsync(messageFrame).ConfigureAwait(false); + } + catch (Exception ex) + { + Log.Warn(ex, $"Caught exception during attempt to write TransportMessageFrame to target channel {targetId} to propagate it from {sourceId} channel. Total {propagatedMessageCount} messages propagated"); + exceptionLogged = true; + throw; + } propagatedMessageCount++; - Log.Trace($"TransportMessageFrame {messageFrame} successfully propagated to {target.Id} (received from {source.Id})"); + Log.Trace($"TransportMessageFrame {messageFrame} successfully propagated to {targetId} (received from {sourceId})"); } target.Out.TryComplete(); - Log.Trace($"Successfully completed TransportMessageFrame propagation from {source.Id} to {target.Id}. Total {propagatedMessageCount} messages propagated"); + Log.Trace($"Successfully completed TransportMessageFrame propagation from {sourceId} to {targetId}. Total {propagatedMessageCount} messages propagated"); } catch (Exception ex) { - Log.Warn(ex, $"Caught exception during attempt to propagate TransportMessageFrame from {source.Id} to {target.Id}. Total {propagatedMessageCount} messages propagated"); + if (!exceptionLogged) + { + Log.Warn(ex, $"Caught exception during attempt to propagate TransportMessageFrame from {sourceId} to {targetId}. Total {propagatedMessageCount} messages propagated"); + } target.Out.TryTerminate(ex); } } diff --git a/desktop/src/Plexus.Interop.Transport.Transmission.Streams/Internal/StreamTransmissionReader.cs b/desktop/src/Plexus.Interop.Transport.Transmission.Streams/Internal/StreamTransmissionReader.cs index 7ba76ccd0..707e0fc95 100644 --- a/desktop/src/Plexus.Interop.Transport.Transmission.Streams/Internal/StreamTransmissionReader.cs +++ b/desktop/src/Plexus.Interop.Transport.Transmission.Streams/Internal/StreamTransmissionReader.cs @@ -90,16 +90,28 @@ private async Task ProcessAsync() private async Task ReadLengthAsync() { + var readBytes = await ReadAsync(_lengthBuffer, 0, 2); + while (readBytes < 2) + { + _log.Info($"Read {readBytes} while reading length. Will try to read next byte"); + readBytes += await ReadAsync(_lengthBuffer, readBytes, 2 - readBytes); + } + return (_lengthBuffer[0] << 8) | _lengthBuffer[1]; + } + + private async Task ReadAsync(byte[] buffer, int offset, int count) + { + int readBytes; #if NETSTANDARD2_0 - var length = await _stream.ReadAsync(_lengthBuffer, 0, 2, _cancellationToken).ConfigureAwait(false); + readBytes = await _stream.ReadAsync(buffer, offset, count, _cancellationToken).ConfigureAwait(false); #else - var length = await _stream.ReadAsync(_lengthBuffer, 0, 2, _cancellationToken).WithCancellation(_cancellationToken).ConfigureAwait(false); + readBytes = await _stream.ReadAsync(buffer, offset, count, _cancellationToken).WithCancellation(_cancellationToken).ConfigureAwait(false); #endif - if (length != 2) + if (readBytes == 0) { throw new InvalidOperationException("Stream completed unexpectedly"); } - return (_lengthBuffer[0] << 8) | _lengthBuffer[1]; + return readBytes; } } } diff --git a/desktop/src/Plexus.Interop.Transport.Transmission.Streams/StreamTransmissionConnection.cs b/desktop/src/Plexus.Interop.Transport.Transmission.Streams/StreamTransmissionConnection.cs index f12cebdcb..fff76a9a6 100644 --- a/desktop/src/Plexus.Interop.Transport.Transmission.Streams/StreamTransmissionConnection.cs +++ b/desktop/src/Plexus.Interop.Transport.Transmission.Streams/StreamTransmissionConnection.cs @@ -66,9 +66,11 @@ private async Task ProcessAsync() { await Task.WhenAny(_writer.Completion, _reader.In.Completion).Unwrap().ConfigureAwait(false); } - catch + catch (Exception ex) { + _log.Warn(ex, $"Caught exception during {nameof(ProcessAsync)}({Id})"); _cancellation.Cancel(); + throw; } await Task.WhenAll(_writer.Completion, _reader.In.Completion).ConfigureAwait(false); _log.Trace("Processing completed. Disposing stream."); From b4fe65938db2f57bfe8582985f692b2391908310 Mon Sep 17 00:00:00 2001 From: udalmik Date: Thu, 23 Jul 2020 13:53:47 +0300 Subject: [PATCH 3/3] Add ability to skip publishing to nuget --- desktop/build.gradle | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/desktop/build.gradle b/desktop/build.gradle index c7965a2ff..a792f5b60 100644 --- a/desktop/build.gradle +++ b/desktop/build.gradle @@ -8,9 +8,11 @@ plugins { def dotnetSdkVersion = "2.1" def buildCache = System.getenv("PLEXUS_BUILD_CACHE_DIR") ?: new File("${rootDir}/build").absolutePath def fullDotNetBuild = Os.isFamily(Os.FAMILY_WINDOWS) && System.env['PLEXUS_BUILD_FULL_DOTNET'] != 'false' +def skipNugetPublish = System.env['PLEXUS_BUILD_SKIP_DOTNET_PUBLISH'] == 'true' def testsEnabled = Os.isFamily(Os.FAMILY_WINDOWS) && System.env['BuildRunner'] != "MyGet" && System.env['PLEXUS_BUILD_SKIP_DOTNET_TESTS'] != 'true' def nugetVersion = System.getenv("PLEXUS_BUILD_NUGET_VERSION") ?: System.getenv("APPVEYOR_REPO_TAG_NAME") def dotnetParams = nugetVersion == null ? " " : "/p:Version=$nugetVersion" + if (!fullDotNetBuild) { dotnetParams += " /p:CORE_ONLY=true" } @@ -128,7 +130,7 @@ if (testsEnabled) { } } -if (nugetVersion != null) { +if (nugetVersion != null && !skipNugetPublish) { task push(dependsOn: test, type:Exec) { dependsOn prepareDotnet workingDir projectDir @@ -141,7 +143,7 @@ if (nugetVersion != null) { } else { task push(dependsOn: test) { doLast { - println "Skipping push because nuget version is not specified" + println "Skipping push because nuget version is not specified, or publish disabled" } } }