Skip to content

Commit

Permalink
4213 show failed transfers (#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
tnickelsen authored Feb 11, 2025
1 parent 94be644 commit 33a398e
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
using MassTransit;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using ProjectOrigin.Vault.Database;
using ProjectOrigin.Vault.Exceptions;
using ProjectOrigin.Vault.Metrics;
using ProjectOrigin.Vault.Models;
using ProjectOrigin.Vault.Options;
Expand Down Expand Up @@ -43,18 +45,42 @@ public async Task<ExecutionResult> Execute(ExecuteContext<SendInformationToRecei
_logger.LogInformation("Starting Activity: {Activity}, RequestId: {RequestId} ",
nameof(SendInformationToReceiverWalletActivity), context.Arguments.RequestStatusArgs.RequestId);

var newSlice = await _unitOfWork.TransferRepository.GetTransferredSlice(context.Arguments.SliceId);
var externalEndpoint = await _unitOfWork.WalletRepository.GetExternalEndpoint(context.Arguments.ExternalEndpointId);
try
{
var newSlice = await _unitOfWork.TransferRepository.GetTransferredSlice(context.Arguments.SliceId);
var externalEndpoint =
await _unitOfWork.WalletRepository.GetExternalEndpoint(context.Arguments.ExternalEndpointId);

if (externalEndpoint.Endpoint.Equals(_ownEndpoint.ToString()))
if (externalEndpoint.Endpoint.Equals(_ownEndpoint.ToString()))
{
_logger.LogInformation("Sending to local wallet. RequestId: {RequestId}", context.Arguments.RequestStatusArgs.RequestId);
return await InsertIntoLocalWallet(context, newSlice, externalEndpoint);
}
else
{
_logger.LogInformation("Sending to external wallet. RequestId: {RequestId}", context.Arguments.RequestStatusArgs.RequestId);
return await SendOverRestToExternalWallet(context, newSlice, externalEndpoint);
}
}
catch (HttpRequestException ex)
{
_logger.LogInformation("Sending to local wallet.");
return await InsertIntoLocalWallet(context, newSlice, externalEndpoint);
_unitOfWork.Rollback();
_logger.LogError(ex, "Failed to send transfer to receiver wallet.");
throw new TransientException("Failed to send transfer to receiver wallet.", ex);
}
else
catch (PostgresException ex)
{
_logger.LogInformation("Sending to external wallet.");
return await SendOverRestToExternalWallet(context, newSlice, externalEndpoint);
_logger.LogError(ex, "Failed to communicate with the database.");
throw new TransientException("Failed to communicate with the database.", ex);
}
catch (Exception ex)
{
_unitOfWork.Rollback();
_logger.LogError(ex, "Failed to send transfer to receiver.");
await _unitOfWork.RequestStatusRepository.SetRequestStatus(context.Arguments.RequestStatusArgs.RequestId, context.Arguments.RequestStatusArgs.Owner, RequestStatusState.Failed, failedReason: "Failed to send transfer to receiver.");
_unitOfWork.Commit();
_transferMetrics.IncrementFailedTransfers();
throw;
}
}

Expand All @@ -63,62 +89,52 @@ private async Task<ExecutionResult> SendOverRestToExternalWallet(
TransferredSlice newSlice,
ExternalEndpoint externalEndpoint)
{
try
{
_logger.LogInformation("Preparing to send information to receiver");
_logger.LogInformation("Preparing to send slice to receiver. RequestId: {RequestId}", context.Arguments.RequestStatusArgs.RequestId);

var request = new ReceiveRequest
var request = new ReceiveRequest
{
PublicKey = externalEndpoint.PublicKey.Export().ToArray(),
Position = (uint)newSlice.ExternalEndpointPosition,
CertificateId = new FederatedStreamId
{
PublicKey = externalEndpoint.PublicKey.Export().ToArray(),
Position = (uint)newSlice.ExternalEndpointPosition,
CertificateId = new FederatedStreamId
Registry = newSlice.RegistryName,
StreamId = newSlice.CertificateId
},
Quantity = (uint)newSlice.Quantity,
RandomR = newSlice.RandomR,
HashedAttributes = context.Arguments.WalletAttributes.Select(ha =>
new HashedAttribute
{
Registry = newSlice.RegistryName,
StreamId = newSlice.CertificateId
},
Quantity = (uint)newSlice.Quantity,
RandomR = newSlice.RandomR,
HashedAttributes = context.Arguments.WalletAttributes.Select(ha =>
new HashedAttribute
{
Key = ha.Key,
Value = ha.Value,
Salt = ha.Salt,
})
};

var client = new HttpClient();
_logger.LogInformation("Sending information to receiver");

var response = await client.PostAsJsonAsync(externalEndpoint.Endpoint, request);
response.EnsureSuccessStatusCode();
await _unitOfWork.TransferRepository.SetTransferredSliceState(newSlice.Id, TransferredSliceState.Transferred);
await _unitOfWork.RequestStatusRepository.SetRequestStatus(context.Arguments.RequestStatusArgs.RequestId, context.Arguments.RequestStatusArgs.Owner, RequestStatusState.Completed);
Key = ha.Key,
Value = ha.Value,
Salt = ha.Salt,
})
};

_unitOfWork.Commit();
_transferMetrics.IncrementCompleted();
var client = new HttpClient();
_logger.LogInformation("Sending slice to receiver. RequestId: {RequestId}", context.Arguments.RequestStatusArgs.RequestId);

_logger.LogInformation("Information Sent to receiver");
_logger.LogInformation("Ending ExternalWallet Activity: {Activity}, RequestId: {RequestId} ", nameof(SendInformationToReceiverWalletActivity), context.Arguments.RequestStatusArgs.RequestId);
var response = await client.PostAsJsonAsync(externalEndpoint.Endpoint, request);
response.EnsureSuccessStatusCode();
await _unitOfWork.TransferRepository.SetTransferredSliceState(newSlice.Id, TransferredSliceState.Transferred);
await _unitOfWork.RequestStatusRepository.SetRequestStatus(context.Arguments.RequestStatusArgs.RequestId, context.Arguments.RequestStatusArgs.Owner, RequestStatusState.Completed);

return context.Completed();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send information to receiver");
throw;
}
_unitOfWork.Commit();
_transferMetrics.IncrementCompleted();

_logger.LogInformation("Slice sent to receiver. RequestId: {RequestId}", context.Arguments.RequestStatusArgs.RequestId);
_logger.LogInformation("Ending ExternalWallet Activity: {Activity}, RequestId: {RequestId} ", nameof(SendInformationToReceiverWalletActivity), context.Arguments.RequestStatusArgs.RequestId);

return context.Completed();
}

private async Task<ExecutionResult> InsertIntoLocalWallet(ExecuteContext<SendInformationToReceiverWalletArgument> context, TransferredSlice newSlice, ExternalEndpoint externalEndpoint)
{
_logger.LogInformation("Receiver is local.");

var walletEndpoint = await _unitOfWork.WalletRepository.GetWalletEndpoint(externalEndpoint.PublicKey);

if (walletEndpoint is null)
{
_logger.LogError("Local receiver wallet could not be found for reciever wallet {ReceiverWalletId}", externalEndpoint.Id);
_logger.LogError("Local receiver wallet could not be found for receiver wallet {ReceiverWalletId}. RequestId {RequestId}", externalEndpoint.Id, context.Arguments.RequestStatusArgs.RequestId);
return context.Faulted(new Exception($"Local receiver wallet could not be found for reciever wallet {externalEndpoint.Id}"));
}

Expand All @@ -144,7 +160,7 @@ private async Task<ExecutionResult> InsertIntoLocalWallet(ExecuteContext<SendInf
_unitOfWork.Commit();
_transferMetrics.IncrementCompleted();

_logger.LogInformation("Slice inserted locally into receiver wallet.");
_logger.LogInformation("Slice inserted locally into receiver wallet. RequestId: {RequestId}", context.Arguments.RequestStatusArgs.RequestId);
_logger.LogInformation("Ending IntoLocalWallet Activity: {Activity}, RequestId: {RequestId} ", nameof(SendInformationToReceiverWalletActivity), context.Arguments.RequestStatusArgs.RequestId);
return context.Completed();
}
Expand Down
21 changes: 17 additions & 4 deletions src/ProjectOrigin.Vault/Activities/TransferFullSliceActivity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
using Google.Protobuf;
using MassTransit;
using Microsoft.Extensions.Logging;
using Npgsql;
using ProjectOrigin.Electricity.V1;
using ProjectOrigin.HierarchicalDeterministicKeys.Interfaces;
using ProjectOrigin.Registry.V1;
using ProjectOrigin.Vault.Database;
using ProjectOrigin.Vault.Exceptions;
using ProjectOrigin.Vault.Extensions;
using ProjectOrigin.Vault.Metrics;
using ProjectOrigin.Vault.Models;

namespace ProjectOrigin.Vault.Activities;
Expand All @@ -28,15 +31,18 @@ public class TransferFullSliceActivity : IExecuteActivity<TransferFullSliceArgum
private readonly IUnitOfWork _unitOfWork;
private readonly ILogger<TransferFullSliceActivity> _logger;
private readonly IEndpointNameFormatter _formatter;
private readonly ITransferMetrics _transferMetrics;

public TransferFullSliceActivity(
IUnitOfWork unitOfWork,
ILogger<TransferFullSliceActivity> logger,
IEndpointNameFormatter formatter)
IEndpointNameFormatter formatter,
ITransferMetrics transferMetrics)
{
_unitOfWork = unitOfWork;
_logger = logger;
_formatter = formatter;
_transferMetrics = transferMetrics;
}

public async Task<ExecutionResult> Execute(ExecuteContext<TransferFullSliceArguments> context)
Expand Down Expand Up @@ -81,14 +87,21 @@ public async Task<ExecutionResult> Execute(ExecuteContext<TransferFullSliceArgum
};
_logger.LogInformation("Ending Activity: {Activity}, RequestId: {RequestId} ", nameof(TransferFullSliceArguments), context.Arguments.RequestStatusArgs.RequestId);


return AddTransferRequiredActivities(context, externalEndpoint, transferredSlice, transaction, states, walletAttributes);
}
catch (PostgresException ex)
{
_logger.LogError(ex, "Failed to communicate with the database.");
throw new TransientException("Failed to communicate with the database.", ex);
}
catch (Exception ex)
{
_unitOfWork.Rollback();
_logger.LogError(ex, "Error sending transactions to registry");
return context.Faulted(ex);
_logger.LogError(ex, "Error sending full slice transfer transactions to registry");
await _unitOfWork.RequestStatusRepository.SetRequestStatus(context.Arguments.RequestStatusArgs.RequestId, context.Arguments.RequestStatusArgs.Owner, RequestStatusState.Failed, failedReason: "Error sending full slice transfer transactions to registry.");
_unitOfWork.Commit();
_transferMetrics.IncrementFailedTransfers();
throw;
}
}

Expand Down
20 changes: 17 additions & 3 deletions src/ProjectOrigin.Vault/Activities/TransferPartialSliceActivity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
using Google.Protobuf;
using MassTransit;
using Microsoft.Extensions.Logging;
using Npgsql;
using ProjectOrigin.Electricity.V1;
using ProjectOrigin.HierarchicalDeterministicKeys.Interfaces;
using ProjectOrigin.PedersenCommitment;
using ProjectOrigin.Registry.V1;
using ProjectOrigin.Vault.Database;
using ProjectOrigin.Vault.Exceptions;
using ProjectOrigin.Vault.Extensions;
using ProjectOrigin.Vault.Metrics;
using ProjectOrigin.Vault.Models;

namespace ProjectOrigin.Vault.Activities;
Expand All @@ -30,15 +33,18 @@ public class TransferPartialSliceActivity : IExecuteActivity<TransferPartialSlic
private readonly IUnitOfWork _unitOfWork;
private readonly ILogger<TransferPartialSliceActivity> _logger;
private readonly IEndpointNameFormatter _formatter;
private readonly ITransferMetrics _transferMetrics;

public TransferPartialSliceActivity(
IUnitOfWork unitOfWork,
ILogger<TransferPartialSliceActivity> logger,
IEndpointNameFormatter formatter)
IEndpointNameFormatter formatter,
ITransferMetrics transferMetrics)
{
_unitOfWork = unitOfWork;
_logger = logger;
_formatter = formatter;
_transferMetrics = transferMetrics;
}

public async Task<ExecutionResult> Execute(ExecuteContext<TransferPartialSliceArguments> context)
Expand Down Expand Up @@ -103,11 +109,19 @@ public async Task<ExecutionResult> Execute(ExecuteContext<TransferPartialSliceAr

return AddTransferRequiredActivities(context, receiverEndpoints, transferredSlice, transaction, states, walletAttributes);
}
catch (PostgresException ex)
{
_logger.LogError(ex, "Failed to communicate with the database.");
throw new TransientException("Failed to communicate with the database.", ex);
}
catch (Exception ex)
{
_unitOfWork.Rollback();
_logger.LogError(ex, "Error sending transactions to registry");
return context.Faulted(ex);
_logger.LogError(ex, "Error sending partial slice transfer transactions to registry");
await _unitOfWork.RequestStatusRepository.SetRequestStatus(context.Arguments.RequestStatusArgs.RequestId, context.Arguments.RequestStatusArgs.Owner, RequestStatusState.Failed, failedReason: "Error sending partial slice transfer transactions to registry.");
_unitOfWork.Commit();
_transferMetrics.IncrementFailedTransfers();
throw;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ public class UpdateSliceStateActivity : IExecuteActivity<UpdateSliceStateArgumen
private readonly IUnitOfWork _unitOfWork;
private readonly ILogger<UpdateSliceStateActivity> _logger;
private readonly IClaimMetrics _claimMetrics;
private readonly ITransferMetrics _transferMetrics;

public UpdateSliceStateActivity(IUnitOfWork unitOfWork, ILogger<UpdateSliceStateActivity> logger, IClaimMetrics claimMetrics)
public UpdateSliceStateActivity(IUnitOfWork unitOfWork, ILogger<UpdateSliceStateActivity> logger, IClaimMetrics claimMetrics, ITransferMetrics transferMetrics)
{
_unitOfWork = unitOfWork;
_logger = logger;
_claimMetrics = claimMetrics;
_transferMetrics = transferMetrics;
}

public async Task<ExecutionResult> Execute(ExecuteContext<UpdateSliceStateArguments> context)
Expand Down Expand Up @@ -67,6 +69,10 @@ await _unitOfWork.RequestStatusRepository.SetRequestStatus(
{
_claimMetrics.IncrementFailedClaims();
}
else if (context.Arguments.RequestStatusArgs.RequestStatusType == RequestStatusType.Transfer)
{
_transferMetrics.IncrementFailedTransfers();
}
}
throw;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ public class WaitCommittedRegistryTransactionActivity : IExecuteActivity<WaitCom
private readonly ILogger<WaitCommittedRegistryTransactionActivity> _logger;
private readonly IUnitOfWork _unitOfWork;
private readonly IClaimMetrics _claimMetrics;
private readonly ITransferMetrics _transferMetrics;

public WaitCommittedRegistryTransactionActivity(IOptions<NetworkOptions> networkOptions, ILogger<WaitCommittedRegistryTransactionActivity> logger, IUnitOfWork unitOfWork, IClaimMetrics claimMetrics)
public WaitCommittedRegistryTransactionActivity(IOptions<NetworkOptions> networkOptions, ILogger<WaitCommittedRegistryTransactionActivity> logger, IUnitOfWork unitOfWork, IClaimMetrics claimMetrics, ITransferMetrics transferMetrics)
{
_networkOptions = networkOptions;
_logger = logger;
_unitOfWork = unitOfWork;
_claimMetrics = claimMetrics;
_transferMetrics = transferMetrics;
}

public async Task<ExecutionResult> Execute(ExecuteContext<WaitCommittedTransactionArguments> context)

Check warning on line 45 in src/ProjectOrigin.Vault/Activities/WaitCommittedRegistryTransactionActivity.cs

View workflow job for this annotation

GitHub Actions / analyse / sonar-analysis

Refactor this method to reduce its Cognitive Complexity from 21 to the 15 allowed. (https://rules.sonarsource.com/csharp/RSPEC-3776)
Expand Down Expand Up @@ -118,6 +120,10 @@ public async Task<ExecutionResult> Execute(ExecuteContext<WaitCommittedTransacti
{
_claimMetrics.IncrementFailedClaims();
}
else if (context.Arguments.RequestStatusArgs.RequestStatusType == RequestStatusType.Transfer)
{
_transferMetrics.IncrementFailedTransfers();
}
}
throw;
}
Expand Down
Loading

0 comments on commit 33a398e

Please sign in to comment.