Skip to content

Commit

Permalink
feat: DTOSS-6682-review-comment-updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Swapnil261188 committed Jan 15, 2025
1 parent 1a6b533 commit 179511b
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 203 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace NHS.Screening.ReceiveCaasFile;

using Model;

public interface IProcessRecordsManager
{
Task ProcessRecordsWithRetry(
List<ParticipantsParquetMap> values,
ParallelOptions options,
ScreeningService screeningService,
string name);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace NHS.Screening.ReceiveCaasFile;

using System.Text.Json;
using Azure.Storage.Blobs;
using System.Threading.Tasks.Dataflow;
using Common;
using Common.Interfaces;
Expand All @@ -13,8 +12,6 @@ public class ProcessCaasFile : IProcessCaasFile
{

private readonly ILogger<ProcessCaasFile> _logger;
private readonly IBlobStorageHelper _blobStorageHelper;
private readonly IStateStore _stateStore;
private readonly ICallFunction _callFunction;

private readonly IReceiveCaasFileHelper _receiveCaasFileHelper;
Expand All @@ -32,7 +29,7 @@ public class ProcessCaasFile : IProcessCaasFile

public ProcessCaasFile(ILogger<ProcessCaasFile> logger, ICallFunction callFunction, ICheckDemographic checkDemographic, ICreateBasicParticipantData createBasicParticipantData,
IExceptionHandler handleException, IAddBatchToQueue addBatchToQueue, IReceiveCaasFileHelper receiveCaasFileHelper, IExceptionHandler exceptionHandler
, IRecordsProcessedTracker recordsProcessedTracker, IValidateDates validateDates, IBlobStorageHelper blobStorageHelper, IStateStore stateStore
, IRecordsProcessedTracker recordsProcessedTracker, IValidateDates validateDates
)
{
_logger = logger;
Expand All @@ -45,202 +42,8 @@ public ProcessCaasFile(ILogger<ProcessCaasFile> logger, ICallFunction callFuncti
_exceptionHandler = exceptionHandler;
_recordsProcessTracker = recordsProcessedTracker;
_validateDates = validateDates;
_blobStorageHelper = blobStorageHelper;
_stateStore = stateStore;
}

/// <summary>
/// Processes a file containing participant data with retry logic in case of failures.
/// Tracks the processing state to ensure records are not reprocessed unnecessarily.
/// </summary>
/// <param name="filePath">The path to the file being processed.</param>
/// <param name="values">The list of participant records to be processed.</param>
/// <param name="options">Options for parallel processing.</param>
/// <param name="screeningService">The service used for participant screening and validation.</param>
/// <param name="fileName">The name of the file being processed.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public async Task ProcessRecordsWithRetry(
string filePath,
List<ParticipantsParquetMap> values,
ParallelOptions options,
ScreeningService screeningService,
string fileName)
{
const int MaxRetryAttempts = 3;
int retryCount = 0;
bool isSuccessful = false;
int lastProcessedIndex = await _stateStore.GetLastProcessedRecordIndex(fileName) ?? 0;

while (retryCount < MaxRetryAttempts && !isSuccessful)
{
try
{
_logger.LogInformation(
"Starting to process file {FilePath}, attempt {RetryAttempt}, resuming from index {LastProcessedIndex}",
filePath, retryCount + 1, lastProcessedIndex);

var remainingRecords = values.Skip(lastProcessedIndex).ToList();
await ProcessRemainingRecords(remainingRecords, screeningService, fileName);

_logger.LogInformation("File {FilePath} processed successfully.", filePath);
isSuccessful = true;
}
catch (Exception ex)
{
await HandleBatchProcessingFailure(filePath, fileName, ex, ++retryCount, MaxRetryAttempts);
}
}

if (isSuccessful)
{
await _stateStore.ClearProcessingState(fileName);
}
}

/// <summary>
/// Processes the remaining records in the file.
/// </summary>
/// <param name="remainingRecords">The list of remaining records to process.</param>
/// <param name="screeningService">The screening service for participant validation.</param>
/// <param name="fileName">The name of the file being processed.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
private async Task ProcessRemainingRecords(
List<ParticipantsParquetMap> remainingRecords,
ScreeningService screeningService,
string fileName)
{
foreach (var record in remainingRecords)
{
try
{
var participant = await MapParticipant(record, screeningService, fileName);

if (participant == null || !ValidateParticipant(participant, fileName))
{
continue;
}

_logger.LogInformation("Successfully processed record for participant {ParticipantId}.", participant.ParticipantId);
int currentIndex = remainingRecords.IndexOf(record) + 1;
await _stateStore.UpdateLastProcessedRecordIndex(fileName, currentIndex);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing record for file {FileName}.", fileName);
}
}
}

/// <summary>
/// Handles batch processing failure and retries.
/// </summary>
/// <param name="filePath">The path to the file being processed.</param>
/// <param name="fileName">The name of the file being processed.</param>
/// <param name="exception">The exception that caused the failure.</param>
/// <param name="retryCount">The current retry attempt.</param>
/// <param name="maxRetries">The maximum number of retry attempts allowed.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
private async Task HandleBatchProcessingFailure(
string filePath,
string fileName,
Exception exception,
int retryCount,
int maxRetries)
{
_logger.LogError(exception, "Error occurred while processing file {FilePath}. Attempt {RetryAttempt} of {MaxRetries}", filePath, retryCount, maxRetries);

if (retryCount >= maxRetries)
{
_logger.LogWarning("Max retry attempts reached for file {FileName}. Handling failure.", fileName);
await HandleFileFailure(filePath, fileName, exception.Message);
}
else
{
int retryDelay = BaseDelayMilliseconds * (int)Math.Pow(2, retryCount - 1);
_logger.LogInformation("Retrying in {RetryDelay} milliseconds...", retryDelay);
await Task.Delay(retryDelay);
}
}

/// <summary>
/// Maps a record to a participant object.
/// </summary>
/// <param name="record">The record to map.</param>
/// <param name="screeningService">The screening service for participant validation.</param>
/// <param name="fileName">The name of the file being processed.</param>
/// <returns>The mapped participant object, or null if mapping fails.</returns>
private async Task<Participant> MapParticipant(ParticipantsParquetMap record, ScreeningService screeningService, string fileName)
{
return await _receiveCaasFileHelper.MapParticipant(record, screeningService.ScreeningId, screeningService.ScreeningName, fileName);
}

/// <summary>
/// Validates a participant object.
/// </summary>
/// <param name="participant">The participant to validate.</param>
/// <param name="fileName">The name of the file being processed.</param>
/// <returns>True if the participant is valid, otherwise false.</returns>
private bool ValidateParticipant(Participant participant, string fileName)
{
if (!ValidationHelper.ValidateNHSNumber(participant.NhsNumber))
{
_exceptionHandler.CreateSystemExceptionLog(
new Exception($"Invalid NHS Number for participant {participant.ParticipantId}"),
participant,
fileName);
return false;
}

if (!_validateDates.ValidateAllDates(participant))
{
_exceptionHandler.CreateSystemExceptionLog(
new Exception($"Invalid effective date for participant {participant.ParticipantId}"),
participant,
fileName);
return false;
}

return true;
}
private async Task HandleFileFailure(string filePath, string fileName, string errorMessage, Participant participant = null)
{
try
{
byte[] fileData = await File.ReadAllBytesAsync(filePath);

var blobFile = new BlobFile(fileData, fileName);

bool isUploaded = await _blobStorageHelper.UploadFileToBlobStorage(
connectionString: Environment.GetEnvironmentVariable("AzureBlobConnectionString"),
containerName: "FailedFilesContainer",
blobFile: blobFile,
overwrite: true);

if (isUploaded)
{
_logger.LogInformation("File {FileName} successfully moved to blob storage after max retries.", fileName);
}
else
{
_logger.LogWarning("Failed to move file {FileName} to blob storage after max retries.", fileName);
}

await _exceptionHandler.CreateSystemExceptionLog(
new Exception(errorMessage),
participant,
fileName);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error while handling file failure for FilePath: {FilePath}, FileName: {FileName}", filePath, fileName);
await _exceptionHandler.CreateSystemExceptionLog(
ex,
participant,
fileName);
}
}


/// <summary>
/// process a given batch and send it the queue
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
namespace NHS.Screening.ReceiveCaasFile;

using System.Text.Json;
using Azure.Storage.Blobs;
using System.Threading.Tasks.Dataflow;
using Common;
using Common.Interfaces;
using Microsoft.Extensions.Logging;
using Model;
using receiveCaasFile;

public class ProcessRecordsManager : IProcessRecordsManager
{
private const int MaxRetryAttempts = 3;
private readonly ILogger<ProcessRecordsManager> _logger;
private readonly IBlobStorageHelper _blobStorageHelper;
private readonly IStateStore _stateStore;
private readonly IExceptionHandler _exceptionHandler;
private readonly IReceiveCaasFileHelper _receiveCaasFileHelper;
private readonly IValidateDates _validateDates;
private const int BaseDelayMilliseconds = 2000;
private readonly int _maxRetryAttempts = 3;

public ProcessRecordsManager(
ILogger<ProcessRecordsManager> logger,
IStateStore stateStore,
IExceptionHandler exceptionHandler,
IReceiveCaasFileHelper receiveCaasFileHelper,
IValidateDates validateDates,
IBlobStorageHelper blobStorageHelper)
{
_logger = logger;
_stateStore = stateStore;
_exceptionHandler = exceptionHandler;
_receiveCaasFileHelper = receiveCaasFileHelper;
_validateDates = validateDates;
_blobStorageHelper = blobStorageHelper;
}

/// <summary>
/// Processes a list of participant records with retry logic for handling failures.
/// </summary>
/// <param name="participants">The list of participant records to process.</param>
/// <param name="options">Parallel options for configuring parallel execution.</param>
/// <param name="screeningService">The screening service used for mapping participants.</param>
/// <param name="fileName">The name of the file being processed.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
/// <remarks>
/// This method retries processing up to the specified maximum number of attempts if an exception occurs.
/// The state of the last processed record is maintained to avoid reprocessing already successful records.
/// </remarks>
/// <exception cref="Exception">
/// Thrown if the maximum number of retry attempts is reached and the file cannot be processed successfully.
/// </exception>
public async Task ProcessRecordsWithRetry(
List<ParticipantsParquetMap> participants,
ParallelOptions options,
ScreeningService screeningService,
string fileName)
{
int retryCount = 0;
bool isSuccessful = false;
int lastProcessedIndex = await _stateStore.GetLastProcessedRecordIndex(fileName) ?? 0;

while (retryCount < MaxRetryAttempts && !isSuccessful)
{
try
{
_logger.LogInformation(
"Starting processing with retry attempt {RetryAttempt}, resuming from index {LastProcessedIndex}.",
retryCount + 1, lastProcessedIndex);

var remainingRecords = participants.Skip(lastProcessedIndex).ToList();
foreach (var record in remainingRecords)
{
try
{
var participant = await _receiveCaasFileHelper.MapParticipant(record, screeningService.ScreeningId, screeningService.ScreeningName, fileName);
if (participant == null || !ValidateParticipant(participant, fileName))
{
continue;
}

_logger.LogInformation("Processed participant {ParticipantId}.", participant.ParticipantId);
int currentIndex = participants.IndexOf(record) + 1;
await _stateStore.UpdateLastProcessedRecordIndex(fileName, currentIndex);
}
catch (Exception recordEx)
{
_logger.LogError(recordEx, "Error processing participant record in file {FileName}.", fileName);
}
}

_logger.LogInformation("File processed successfully.");
isSuccessful = true;
}
catch (Exception ex)
{
retryCount++;
_logger.LogError(ex, "Batch processing failed on attempt {RetryAttempt}.", retryCount);

if (retryCount >= MaxRetryAttempts)
{
_logger.LogWarning("Maximum retry attempts reached. Handling failure for file {FileName}.", fileName);
await HandleFailure(fileName, ex.Message);
break;
}

await Task.Delay(2000 * retryCount);
}
}

if (isSuccessful)
{
await _stateStore.ClearProcessingState(fileName);
}
}

private async Task HandleFailure(string fileName, string errorMessage, Participant participant = null)
{
_logger.LogError("Handling failure for file {FileName}.", fileName);
await _exceptionHandler.CreateSystemExceptionLog(
new Exception(errorMessage), participant, fileName);
}

private bool ValidateParticipant(Participant participant, string fileName)
{
if (!ValidationHelper.ValidateNHSNumber(participant.NhsNumber))
{
_exceptionHandler.CreateSystemExceptionLog(
new Exception($"Invalid NHS Number for participant {participant.ParticipantId}"),
participant,
fileName);
return false;
}

if (!_validateDates.ValidateAllDates(participant))
{
_exceptionHandler.CreateSystemExceptionLog(
new Exception($"Invalid effective date for participant {participant.ParticipantId}"),
participant,
fileName);
return false;
}

return true;
}

}

Loading

0 comments on commit 179511b

Please sign in to comment.