Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #178 from finos/db-contrib/develop
Browse files Browse the repository at this point in the history
Minor logging improvements + ability to skip publish to Nuget
  • Loading branch information
Mikhail Udalov authored Jul 23, 2020
2 parents 629938b + 5d54415 commit b0a06a0
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 22 deletions.
6 changes: 4 additions & 2 deletions desktop/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ plugins {
def dotnetSdkVersion = "2.1"
def buildCache = System.getenv("PLEXUS_BUILD_CACHE_DIR") ?: new File("${rootDir}/build").absolutePath
def fullDotNetBuild = Os.isFamily(Os.FAMILY_WINDOWS) && System.env['PLEXUS_BUILD_FULL_DOTNET'] != 'false'
def skipNugetPublish = System.env['PLEXUS_BUILD_SKIP_DOTNET_PUBLISH'] == 'true'
def testsEnabled = Os.isFamily(Os.FAMILY_WINDOWS) && System.env['BuildRunner'] != "MyGet" && System.env['PLEXUS_BUILD_SKIP_DOTNET_TESTS'] != 'true'
def nugetVersion = System.getenv("PLEXUS_BUILD_NUGET_VERSION") ?: System.getenv("APPVEYOR_REPO_TAG_NAME")
def dotnetParams = nugetVersion == null ? " " : "/p:Version=$nugetVersion"

if (!fullDotNetBuild) {
dotnetParams += " /p:CORE_ONLY=true"
}
Expand Down Expand Up @@ -128,7 +130,7 @@ if (testsEnabled) {
}
}

if (nugetVersion != null) {
if (nugetVersion != null && !skipNugetPublish) {
task push(dependsOn: test, type:Exec) {
dependsOn prepareDotnet
workingDir projectDir
Expand All @@ -141,7 +143,7 @@ if (nugetVersion != null) {
} else {
task push(dependsOn: test) {
doLast {
println "Skipping push because nuget version is not specified"
println "Skipping push because nuget version is not specified, or publish disabled"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private async Task<UniqueId> LaunchAsync(
throw new InvalidOperationException($"Launcher is not defined for application {appId}");
}

Log.Debug("Sending request to launcher {0}: appId={1}, params={2}", appDto.LauncherId, appId, appDto.LauncherParams);
Log.Debug("Sending request to launcher {0}: appId={1}, params={2}", appDto.LauncherId, appId, string.Join("; ", appDto.LauncherParams.Select(kvp => $"{kvp.Key}:{kvp.Value}")));

var referrer = new AppLaunchReferrer
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,37 +348,62 @@ private static void DisposeFrame(TransportMessageFrame frame)

private static async Task PropagateAsync(ITransportChannel source, ITransportChannel target)
{
int propagatedMessageCount = 0;
var targetId = target.Id;
var sourceId = source.Id;
bool exceptionLogged = false;
try
{
int propagatedMessageCount = 0;
while (true)
{
Log.Trace($"Waiting for TransportMessageFrame from {source.Id} to propagate to {target.Id}");
Log.Trace($"Waiting for TransportMessageFrame from {sourceId} to propagate to {targetId}");

var result = await source.In.TryReadAsync().ConfigureAwait(false);
Maybe<TransportMessageFrame> result;
try
{
result = await source.In.TryReadAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
Log.Warn(ex, $"Caught exception during attempt to read TransportMessageFrame from source channel {sourceId} to propagate it to {targetId} channel. Total {propagatedMessageCount} messages propagated");
exceptionLogged = true;
throw;
}

if (!result.HasValue)
{
Log.Trace($"Received empty TransportMessageFrame from {source.Id}. Will complete {target.Id} channel");
Log.Trace($"Received empty TransportMessageFrame from {sourceId}. Will complete {targetId} channel");
break;
}

var messageFrame = result.Value;

Log.Trace($"Received TransportMessageFrame {messageFrame} from {source.Id}. Will try to propagate it to {target.Id} channel");
await target.Out.WriteAsync(messageFrame).ConfigureAwait(false);
Log.Trace($"Received TransportMessageFrame {messageFrame} from {sourceId}. Will try to propagate it to {targetId} channel");
try
{
await target.Out.WriteAsync(messageFrame).ConfigureAwait(false);
}
catch (Exception ex)
{
Log.Warn(ex, $"Caught exception during attempt to write TransportMessageFrame to target channel {targetId} to propagate it from {sourceId} channel. Total {propagatedMessageCount} messages propagated");
exceptionLogged = true;
throw;
}

propagatedMessageCount++;

Log.Trace($"TransportMessageFrame {messageFrame} successfully propagated to {target.Id} (received from {source.Id})");
Log.Trace($"TransportMessageFrame {messageFrame} successfully propagated to {targetId} (received from {sourceId})");
}

target.Out.TryComplete();
Log.Trace($"Successfully completed TransportMessageFrame propagation from {source.Id} to {target.Id}. Total {propagatedMessageCount} messages propagated");
Log.Trace($"Successfully completed TransportMessageFrame propagation from {sourceId} to {targetId}. Total {propagatedMessageCount} messages propagated");
}
catch (Exception ex)
{
Log.Warn(ex, $"Caught exception during attempt to propagate TransportMessageFrame from {source.Id} to {target.Id}");
if (!exceptionLogged)
{
Log.Warn(ex, $"Caught exception during attempt to propagate TransportMessageFrame from {sourceId} to {targetId}. Total {propagatedMessageCount} messages propagated");
}
target.Out.TryTerminate(ex);
}
}
Expand Down
25 changes: 20 additions & 5 deletions desktop/src/Plexus.Interop.Broker.Core/Internal/RegistryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace Plexus.Interop.Broker.Internal
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Plexus.Interop.Transport.Protocol;

internal sealed class RegistryService : IRegistryService, IDisposable
{
Expand Down Expand Up @@ -71,7 +72,11 @@ public IApplication GetApplication(string appId)
_registryLock.EnterReadLock();
try
{
return _registry.Applications[appId];
if (_registry.Applications.TryGetValue(appId, out var application))
{
return application;
}
throw new MetadataViolationException($"Application {appId} do not exist in metadata. Available applications: {string.Join(", ", _registry.Applications.Keys)}");
}
finally
{
Expand All @@ -84,9 +89,14 @@ public IConsumedService GetConsumedService(string appId, IConsumedServiceReferen
_registryLock.EnterReadLock();
try
{
return _registry.Applications[appId].ConsumedServices
.FirstOrDefault(x =>
Equals(x.Service.Id, reference.ServiceId) && Equals(x.Alias, reference.ServiceAlias));
var application = GetApplication(appId);
var consumedService = application.ConsumedServices
.FirstOrDefault(service => Equals(service.Service.Id, reference.ServiceId) && Equals(service.Alias, reference.ServiceAlias));
if (consumedService != null)
{
return consumedService;
}
throw new MetadataViolationException($"Service {reference.ServiceId} with alias {reference.ServiceAlias} do not exist or is not consumed by {appId} application. Available services: {string.Join(", ", application.ConsumedServices.Select(service => service.Service.Id))}");
}
finally
{
Expand All @@ -99,8 +109,13 @@ public IConsumedMethod GetConsumedMethod(string appId, IConsumedMethodReference
_registryLock.EnterReadLock();
try
{
var methodId = reference.MethodId;
var service = GetConsumedService(appId, reference.ConsumedService);
return service.Methods[reference.MethodId];
if (service.Methods.TryGetValue(methodId, out var consumedMethod))
{
return consumedMethod;
}
throw new MetadataViolationException($"Method {methodId} do not exist in service {service.Service.Id} or is not consumed by {appId} application. Available methods: {string.Join(", ", service.Methods.Keys)}");
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<ProjectReference Include="..\Plexus.Interop.Metamodel.Json\Plexus.Interop.Metamodel.Json.csproj" />
<ProjectReference Include="..\Plexus.Interop.Protocol\Plexus.Interop.Protocol.csproj" />
<ProjectReference Include="..\Plexus.Interop.Transport.Contracts\Plexus.Interop.Transport.Contracts.csproj" />
<ProjectReference Include="..\Plexus.Interop.Transport.Protocol.Contracts\Plexus.Interop.Transport.Protocol.Contracts.csproj" />
<ProjectReference Include="..\Plexus.Utils\Plexus.Utils.csproj" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;

namespace Plexus.Interop.Transport.Protocol
{
public class MetadataViolationException : ProtocolException
{
public MetadataViolationException(string remoteMessage, Exception innerException = null) : base(remoteMessage, innerException)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,28 @@ private async Task ProcessAsync()

private async Task<int> ReadLengthAsync()
{
var readBytes = await ReadAsync(_lengthBuffer, 0, 2);
while (readBytes < 2)
{
_log.Info($"Read {readBytes} while reading length. Will try to read next byte");
readBytes += await ReadAsync(_lengthBuffer, readBytes, 2 - readBytes);
}
return (_lengthBuffer[0] << 8) | _lengthBuffer[1];
}

private async Task<int> ReadAsync(byte[] buffer, int offset, int count)
{
int readBytes;
#if NETSTANDARD2_0
var length = await _stream.ReadAsync(_lengthBuffer, 0, 2, _cancellationToken).ConfigureAwait(false);
readBytes = await _stream.ReadAsync(buffer, offset, count, _cancellationToken).ConfigureAwait(false);
#else
var length = await _stream.ReadAsync(_lengthBuffer, 0, 2, _cancellationToken).WithCancellation(_cancellationToken).ConfigureAwait(false);
readBytes = await _stream.ReadAsync(buffer, offset, count, _cancellationToken).WithCancellation(_cancellationToken).ConfigureAwait(false);
#endif
if (length != 2)
if (readBytes == 0)
{
throw new InvalidOperationException("Stream completed unexpectedly");
}
return (_lengthBuffer[0] << 8) | _lengthBuffer[1];
return readBytes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ private async Task ProcessAsync()
{
await Task.WhenAny(_writer.Completion, _reader.In.Completion).Unwrap().ConfigureAwait(false);
}
catch
catch (Exception ex)
{
_log.Warn(ex, $"Caught exception during {nameof(ProcessAsync)}({Id})");
_cancellation.Cancel();
throw;
}
await Task.WhenAll(_writer.Completion, _reader.In.Completion).ConfigureAwait(false);
_log.Trace("Processing completed. Disposing stream.");
Expand Down

0 comments on commit b0a06a0

Please sign in to comment.