From 179511b62a4054464caa033f82ffb6cebac4602a Mon Sep 17 00:00:00 2001 From: Swapnil261188 Date: Wed, 15 Jan 2025 14:30:50 +0000 Subject: [PATCH] feat: DTOSS-6682-review-comment-updates --- .../IProcessRecordsManager.cs | 12 ++ .../ProcessFileClasses/ProcessCaasFile.cs | 199 +----------------- .../ProcessRecordsManager.cs | 150 +++++++++++++ .../receiveCaasFile/Program.cs | 4 + .../receiveCaasFile/receiveCaasFile.cs | 10 +- .../processCaasFileTests.cs | 14 +- .../ReceiveCaasFileTests.cs | 5 +- 7 files changed, 191 insertions(+), 203 deletions(-) create mode 100644 application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/IProcessRecordsManager.cs create mode 100644 application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessRecordsManager.cs diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/IProcessRecordsManager.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/IProcessRecordsManager.cs new file mode 100644 index 000000000..7ad9067ca --- /dev/null +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/IProcessRecordsManager.cs @@ -0,0 +1,12 @@ +namespace NHS.Screening.ReceiveCaasFile; + +using Model; + +public interface IProcessRecordsManager +{ + Task ProcessRecordsWithRetry( + List values, + ParallelOptions options, + ScreeningService screeningService, + string name); +} diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs index 41de4a9d5..c7cf5dc5a 100644 --- a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessCaasFile.cs @@ -1,7 +1,6 @@ namespace NHS.Screening.ReceiveCaasFile; using System.Text.Json; -using Azure.Storage.Blobs; using System.Threading.Tasks.Dataflow; using Common; using Common.Interfaces; @@ -13,8 +12,6 @@ public class ProcessCaasFile : IProcessCaasFile { private readonly ILogger _logger; - private readonly IBlobStorageHelper _blobStorageHelper; - private readonly IStateStore _stateStore; private readonly ICallFunction _callFunction; private readonly IReceiveCaasFileHelper _receiveCaasFileHelper; @@ -32,7 +29,7 @@ public class ProcessCaasFile : IProcessCaasFile public ProcessCaasFile(ILogger logger, ICallFunction callFunction, ICheckDemographic checkDemographic, ICreateBasicParticipantData createBasicParticipantData, IExceptionHandler handleException, IAddBatchToQueue addBatchToQueue, IReceiveCaasFileHelper receiveCaasFileHelper, IExceptionHandler exceptionHandler - , IRecordsProcessedTracker recordsProcessedTracker, IValidateDates validateDates, IBlobStorageHelper blobStorageHelper, IStateStore stateStore + , IRecordsProcessedTracker recordsProcessedTracker, IValidateDates validateDates ) { _logger = logger; @@ -45,202 +42,8 @@ public ProcessCaasFile(ILogger logger, ICallFunction callFuncti _exceptionHandler = exceptionHandler; _recordsProcessTracker = recordsProcessedTracker; _validateDates = validateDates; - _blobStorageHelper = blobStorageHelper; - _stateStore = stateStore; } - /// - /// Processes a file containing participant data with retry logic in case of failures. - /// Tracks the processing state to ensure records are not reprocessed unnecessarily. - /// - /// The path to the file being processed. - /// The list of participant records to be processed. - /// Options for parallel processing. - /// The service used for participant screening and validation. - /// The name of the file being processed. - /// A representing the asynchronous operation. - public async Task ProcessRecordsWithRetry( - string filePath, - List values, - ParallelOptions options, - ScreeningService screeningService, - string fileName) - { - const int MaxRetryAttempts = 3; - int retryCount = 0; - bool isSuccessful = false; - int lastProcessedIndex = await _stateStore.GetLastProcessedRecordIndex(fileName) ?? 0; - - while (retryCount < MaxRetryAttempts && !isSuccessful) - { - try - { - _logger.LogInformation( - "Starting to process file {FilePath}, attempt {RetryAttempt}, resuming from index {LastProcessedIndex}", - filePath, retryCount + 1, lastProcessedIndex); - - var remainingRecords = values.Skip(lastProcessedIndex).ToList(); - await ProcessRemainingRecords(remainingRecords, screeningService, fileName); - - _logger.LogInformation("File {FilePath} processed successfully.", filePath); - isSuccessful = true; - } - catch (Exception ex) - { - await HandleBatchProcessingFailure(filePath, fileName, ex, ++retryCount, MaxRetryAttempts); - } - } - - if (isSuccessful) - { - await _stateStore.ClearProcessingState(fileName); - } - } - - /// - /// Processes the remaining records in the file. - /// - /// The list of remaining records to process. - /// The screening service for participant validation. - /// The name of the file being processed. - /// A representing the asynchronous operation. - private async Task ProcessRemainingRecords( - List remainingRecords, - ScreeningService screeningService, - string fileName) - { - foreach (var record in remainingRecords) - { - try - { - var participant = await MapParticipant(record, screeningService, fileName); - - if (participant == null || !ValidateParticipant(participant, fileName)) - { - continue; - } - - _logger.LogInformation("Successfully processed record for participant {ParticipantId}.", participant.ParticipantId); - int currentIndex = remainingRecords.IndexOf(record) + 1; - await _stateStore.UpdateLastProcessedRecordIndex(fileName, currentIndex); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error processing record for file {FileName}.", fileName); - } - } - } - - /// - /// Handles batch processing failure and retries. - /// - /// The path to the file being processed. - /// The name of the file being processed. - /// The exception that caused the failure. - /// The current retry attempt. - /// The maximum number of retry attempts allowed. - /// A representing the asynchronous operation. - private async Task HandleBatchProcessingFailure( - string filePath, - string fileName, - Exception exception, - int retryCount, - int maxRetries) - { - _logger.LogError(exception, "Error occurred while processing file {FilePath}. Attempt {RetryAttempt} of {MaxRetries}", filePath, retryCount, maxRetries); - - if (retryCount >= maxRetries) - { - _logger.LogWarning("Max retry attempts reached for file {FileName}. Handling failure.", fileName); - await HandleFileFailure(filePath, fileName, exception.Message); - } - else - { - int retryDelay = BaseDelayMilliseconds * (int)Math.Pow(2, retryCount - 1); - _logger.LogInformation("Retrying in {RetryDelay} milliseconds...", retryDelay); - await Task.Delay(retryDelay); - } - } - - /// - /// Maps a record to a participant object. - /// - /// The record to map. - /// The screening service for participant validation. - /// The name of the file being processed. - /// The mapped participant object, or null if mapping fails. - private async Task MapParticipant(ParticipantsParquetMap record, ScreeningService screeningService, string fileName) - { - return await _receiveCaasFileHelper.MapParticipant(record, screeningService.ScreeningId, screeningService.ScreeningName, fileName); - } - - /// - /// Validates a participant object. - /// - /// The participant to validate. - /// The name of the file being processed. - /// True if the participant is valid, otherwise false. - private bool ValidateParticipant(Participant participant, string fileName) - { - if (!ValidationHelper.ValidateNHSNumber(participant.NhsNumber)) - { - _exceptionHandler.CreateSystemExceptionLog( - new Exception($"Invalid NHS Number for participant {participant.ParticipantId}"), - participant, - fileName); - return false; - } - - if (!_validateDates.ValidateAllDates(participant)) - { - _exceptionHandler.CreateSystemExceptionLog( - new Exception($"Invalid effective date for participant {participant.ParticipantId}"), - participant, - fileName); - return false; - } - - return true; - } - private async Task HandleFileFailure(string filePath, string fileName, string errorMessage, Participant participant = null) - { - try - { - byte[] fileData = await File.ReadAllBytesAsync(filePath); - - var blobFile = new BlobFile(fileData, fileName); - - bool isUploaded = await _blobStorageHelper.UploadFileToBlobStorage( - connectionString: Environment.GetEnvironmentVariable("AzureBlobConnectionString"), - containerName: "FailedFilesContainer", - blobFile: blobFile, - overwrite: true); - - if (isUploaded) - { - _logger.LogInformation("File {FileName} successfully moved to blob storage after max retries.", fileName); - } - else - { - _logger.LogWarning("Failed to move file {FileName} to blob storage after max retries.", fileName); - } - - await _exceptionHandler.CreateSystemExceptionLog( - new Exception(errorMessage), - participant, - fileName); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error while handling file failure for FilePath: {FilePath}, FileName: {FileName}", filePath, fileName); - await _exceptionHandler.CreateSystemExceptionLog( - ex, - participant, - fileName); - } - } - - /// /// process a given batch and send it the queue /// diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessRecordsManager.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessRecordsManager.cs new file mode 100644 index 000000000..110d65c0c --- /dev/null +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/ProcessFileClasses/ProcessRecordsManager.cs @@ -0,0 +1,150 @@ +namespace NHS.Screening.ReceiveCaasFile; + + using System.Text.Json; + using Azure.Storage.Blobs; + using System.Threading.Tasks.Dataflow; + using Common; + using Common.Interfaces; + using Microsoft.Extensions.Logging; + using Model; + using receiveCaasFile; + + public class ProcessRecordsManager : IProcessRecordsManager + { + private const int MaxRetryAttempts = 3; + private readonly ILogger _logger; + private readonly IBlobStorageHelper _blobStorageHelper; + private readonly IStateStore _stateStore; + private readonly IExceptionHandler _exceptionHandler; + private readonly IReceiveCaasFileHelper _receiveCaasFileHelper; + private readonly IValidateDates _validateDates; + private const int BaseDelayMilliseconds = 2000; + private readonly int _maxRetryAttempts = 3; + + public ProcessRecordsManager( + ILogger logger, + IStateStore stateStore, + IExceptionHandler exceptionHandler, + IReceiveCaasFileHelper receiveCaasFileHelper, + IValidateDates validateDates, + IBlobStorageHelper blobStorageHelper) + { + _logger = logger; + _stateStore = stateStore; + _exceptionHandler = exceptionHandler; + _receiveCaasFileHelper = receiveCaasFileHelper; + _validateDates = validateDates; + _blobStorageHelper = blobStorageHelper; + } + + /// + /// Processes a list of participant records with retry logic for handling failures. + /// + /// The list of participant records to process. + /// Parallel options for configuring parallel execution. + /// The screening service used for mapping participants. + /// The name of the file being processed. + /// A representing the asynchronous operation. + /// + /// This method retries processing up to the specified maximum number of attempts if an exception occurs. + /// The state of the last processed record is maintained to avoid reprocessing already successful records. + /// + /// + /// Thrown if the maximum number of retry attempts is reached and the file cannot be processed successfully. + /// + public async Task ProcessRecordsWithRetry( + List participants, + ParallelOptions options, + ScreeningService screeningService, + string fileName) + { + int retryCount = 0; + bool isSuccessful = false; + int lastProcessedIndex = await _stateStore.GetLastProcessedRecordIndex(fileName) ?? 0; + + while (retryCount < MaxRetryAttempts && !isSuccessful) + { + try + { + _logger.LogInformation( + "Starting processing with retry attempt {RetryAttempt}, resuming from index {LastProcessedIndex}.", + retryCount + 1, lastProcessedIndex); + + var remainingRecords = participants.Skip(lastProcessedIndex).ToList(); + foreach (var record in remainingRecords) + { + try + { + var participant = await _receiveCaasFileHelper.MapParticipant(record, screeningService.ScreeningId, screeningService.ScreeningName, fileName); + if (participant == null || !ValidateParticipant(participant, fileName)) + { + continue; + } + + _logger.LogInformation("Processed participant {ParticipantId}.", participant.ParticipantId); + int currentIndex = participants.IndexOf(record) + 1; + await _stateStore.UpdateLastProcessedRecordIndex(fileName, currentIndex); + } + catch (Exception recordEx) + { + _logger.LogError(recordEx, "Error processing participant record in file {FileName}.", fileName); + } + } + + _logger.LogInformation("File processed successfully."); + isSuccessful = true; + } + catch (Exception ex) + { + retryCount++; + _logger.LogError(ex, "Batch processing failed on attempt {RetryAttempt}.", retryCount); + + if (retryCount >= MaxRetryAttempts) + { + _logger.LogWarning("Maximum retry attempts reached. Handling failure for file {FileName}.", fileName); + await HandleFailure(fileName, ex.Message); + break; + } + + await Task.Delay(2000 * retryCount); + } + } + + if (isSuccessful) + { + await _stateStore.ClearProcessingState(fileName); + } + } + + private async Task HandleFailure(string fileName, string errorMessage, Participant participant = null) + { + _logger.LogError("Handling failure for file {FileName}.", fileName); + await _exceptionHandler.CreateSystemExceptionLog( + new Exception(errorMessage), participant, fileName); + } + + private bool ValidateParticipant(Participant participant, string fileName) + { + if (!ValidationHelper.ValidateNHSNumber(participant.NhsNumber)) + { + _exceptionHandler.CreateSystemExceptionLog( + new Exception($"Invalid NHS Number for participant {participant.ParticipantId}"), + participant, + fileName); + return false; + } + + if (!_validateDates.ValidateAllDates(participant)) + { + _exceptionHandler.CreateSystemExceptionLog( + new Exception($"Invalid effective date for participant {participant.ParticipantId}"), + participant, + fileName); + return false; + } + + return true; + } + + } + diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/Program.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/Program.cs index d48419920..c255c1be9 100644 --- a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/Program.cs +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/Program.cs @@ -31,6 +31,10 @@ services.AddScoped(); services.AddScoped(); //Do not change the lifetime of this. services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); }) .AddAzureQueues() .AddExceptionHandler() diff --git a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/receiveCaasFile.cs b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/receiveCaasFile.cs index 309812af7..5a1376376 100644 --- a/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/receiveCaasFile.cs +++ b/application/CohortManager/src/Functions/CaasIntegration/receiveCaasFile/receiveCaasFile.cs @@ -16,19 +16,22 @@ public class ReceiveCaasFile private readonly ILogger _logger; private readonly IReceiveCaasFileHelper _receiveCaasFileHelper; private readonly IProcessCaasFile _processCaasFile; + private readonly IProcessRecordsManager _processRecordsManager; private readonly IScreeningServiceData _screeningServiceData; public ReceiveCaasFile( ILogger logger, IReceiveCaasFileHelper receiveCaasFileHelper, IProcessCaasFile processCaasFile, - IScreeningServiceData screeningServiceData + IScreeningServiceData screeningServiceData, + IProcessRecordsManager processRecordsManager ) { _logger = logger; _receiveCaasFileHelper = receiveCaasFileHelper; _processCaasFile = processCaasFile; _screeningServiceData = screeningServiceData; + _processRecordsManager = processRecordsManager; } [Function(nameof(ReceiveCaasFile))] @@ -86,6 +89,9 @@ public async Task Run([BlobTrigger("inbound/{name}", Connection = "caasfolder_ST allTasks.Add( _processCaasFile.ProcessRecords(batch, options, screeningService, name) ); + allTasks.Add( + _processRecordsManager.ProcessRecordsWithRetry(batch, options, screeningService, name) + ); } // process each batches Task.WaitAll(allTasks.ToArray()); @@ -109,7 +115,7 @@ public async Task Run([BlobTrigger("inbound/{name}", Connection = "caasfolder_ST { _logger.LogInformation("All rows processed for file named {Name}. time {time}", name, DateTime.Now); } - //We want to release the file from temporary storage no matter what + //We want to release the file from temporary storage no matter what if (File.Exists(downloadFilePath)) File.Delete(downloadFilePath); } } diff --git a/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs b/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs index 99c959c8c..20095f80e 100644 --- a/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs +++ b/tests/UnitTests/CaasIntegrationTests/processCaasFileTest/processCaasFileTests.cs @@ -17,6 +17,7 @@ public class ProcessCaasFileTests private Mock> _loggerMock; private Mock _blobStorageHelper; private Mock _stateStore; + private Mock> _loggerRecordsMock; private Mock _callFunctionMock; private Mock _receiveCaasFileHelperMock; private Mock _checkDemographicMock; @@ -28,10 +29,12 @@ public class ProcessCaasFileTests private Mock _validateDates; private ProcessCaasFile _processCaasFile; + private ProcessRecordsManager _processRecordsManager; public ProcessCaasFileTests() { _loggerMock = new Mock>(); + _loggerRecordsMock = new Mock>(); _callFunctionMock = new Mock(); _receiveCaasFileHelperMock = new Mock(); _checkDemographicMock = new Mock(); @@ -54,9 +57,16 @@ public ProcessCaasFileTests() _receiveCaasFileHelperMock.Object, _exceptionHandlerMock.Object, _recordsProcessedTrackerMock.Object, + _validateDates.Object + ); + + _processRecordsManager = new ProcessRecordsManager( + _loggerRecordsMock.Object, + _stateStore.Object, + _exceptionHandlerMock.Object, + _receiveCaasFileHelperMock.Object, _validateDates.Object, - _blobStorageHelper.Object, - _stateStore.Object + _blobStorageHelper.Object ); } diff --git a/tests/UnitTests/CaasIntegrationTests/receiveCaasFileTest/ReceiveCaasFileTests.cs b/tests/UnitTests/CaasIntegrationTests/receiveCaasFileTest/ReceiveCaasFileTests.cs index 25f3e0a74..193cd8d9f 100644 --- a/tests/UnitTests/CaasIntegrationTests/receiveCaasFileTest/ReceiveCaasFileTests.cs +++ b/tests/UnitTests/CaasIntegrationTests/receiveCaasFileTest/ReceiveCaasFileTests.cs @@ -24,6 +24,7 @@ public class ReceiveCaasFileTests private readonly ParticipantsParquetMap _participantsParquetMap; private readonly string _blobName; private readonly Mock _mockProcessCaasFile = new(); + private readonly Mock _mockProcessRecordsManager = new(); private readonly Mock _mockScreeningServiceData = new(); public ReceiveCaasFileTests() @@ -33,7 +34,7 @@ public ReceiveCaasFileTests() _mockIReceiveCaasFileHelper = new Mock(); Environment.SetEnvironmentVariable("BatchSize", "2000"); - _receiveCaasFileInstance = new ReceiveCaasFile(_mockLogger.Object, _mockIReceiveCaasFileHelper.Object, _mockProcessCaasFile.Object, _mockScreeningServiceData.Object); + _receiveCaasFileInstance = new ReceiveCaasFile(_mockLogger.Object, _mockIReceiveCaasFileHelper.Object, _mockProcessCaasFile.Object, _mockScreeningServiceData.Object, _mockProcessRecordsManager.Object); _blobName = "add_1_-_CAAS_BREAST_SCREENING_COHORT.parquet"; _participant = new Participant() @@ -189,6 +190,8 @@ public async Task Run_ProcessesChunksInParallel() // Assert _mockProcessCaasFile.Verify(x => x.ProcessRecords(It.Is>(list => list.Count == batchSize), It.IsAny(), It.IsAny(), _blobName), Times.Exactly(1)); + _mockProcessRecordsManager.Verify(x => x.ProcessRecordsWithRetry(It.Is>(list => list.Count == batchSize), It.IsAny(), It.IsAny(), _blobName), Times.Exactly(1)); + _mockLogger.Verify(x => x.Log(It.Is(l => l == LogLevel.Information), It.IsAny(), It.Is((v, t) => v.ToString().Contains($"All rows processed for file named {_blobName}.")),