Skip to content

Commit

Permalink
feat: DTOSS-6682-review-comment-and-sonarqube-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Swapnil261188 committed Jan 15, 2025
1 parent 179511b commit 60c2737
Showing 1 changed file with 72 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ public ProcessRecordsManager(
/// Thrown if the maximum number of retry attempts is reached and the file cannot be processed successfully.
/// </exception>
public async Task ProcessRecordsWithRetry(
List<ParticipantsParquetMap> participants,
List<ParticipantsParquetMap> value,
ParallelOptions options,
ScreeningService screeningService,
string fileName)
string name)
{
int retryCount = 0;
bool isSuccessful = false;
int lastProcessedIndex = await _stateStore.GetLastProcessedRecordIndex(fileName) ?? 0;
int lastProcessedIndex = await _stateStore.GetLastProcessedRecordIndex(name) ?? 0;

while (retryCount < MaxRetryAttempts && !isSuccessful)
{
Expand All @@ -70,24 +70,27 @@ public async Task ProcessRecordsWithRetry(
"Starting processing with retry attempt {RetryAttempt}, resuming from index {LastProcessedIndex}.",
retryCount + 1, lastProcessedIndex);

var remainingRecords = participants.Skip(lastProcessedIndex).ToList();
var remainingRecords = value.Skip(lastProcessedIndex).ToList();

foreach (var record in remainingRecords)
{
try
{
var participant = await _receiveCaasFileHelper.MapParticipant(record, screeningService.ScreeningId, screeningService.ScreeningName, fileName);
if (participant == null || !ValidateParticipant(participant, fileName))
var participant = await _receiveCaasFileHelper.MapParticipant(
record, screeningService.ScreeningId, screeningService.ScreeningName, name);

if (participant == null || !ValidateParticipant(participant, name))
{
continue;
}

_logger.LogInformation("Processed participant {ParticipantId}.", participant.ParticipantId);
int currentIndex = participants.IndexOf(record) + 1;
await _stateStore.UpdateLastProcessedRecordIndex(fileName, currentIndex);
int currentIndex = value.IndexOf(record) + 1;
await _stateStore.UpdateLastProcessedRecordIndex(name, currentIndex);
}
catch (Exception recordEx)
{
_logger.LogError(recordEx, "Error processing participant record in file {FileName}.", fileName);
_logger.LogError(recordEx, "Error processing participant record in file {FileName}.", name);
}
}

Expand All @@ -97,32 +100,77 @@ public async Task ProcessRecordsWithRetry(
catch (Exception ex)
{
retryCount++;
_logger.LogError(ex, "Batch processing failed on attempt {RetryAttempt}.", retryCount);

if (retryCount >= MaxRetryAttempts)
{
_logger.LogWarning("Maximum retry attempts reached. Handling failure for file {FileName}.", fileName);
await HandleFailure(fileName, ex.Message);
break;
}

await Task.Delay(2000 * retryCount);
await HandleRetryFailure(name, ex, retryCount);
}
}

if (isSuccessful)
{
await _stateStore.ClearProcessingState(fileName);
await _stateStore.ClearProcessingState(name);
}
}

private async Task HandleFailure(string fileName, string errorMessage, Participant participant = null)
private async Task HandleRetryFailure(string fileName, Exception ex, int retryCount)
{
_logger.LogError("Handling failure for file {FileName}.", fileName);
await _exceptionHandler.CreateSystemExceptionLog(
new Exception(errorMessage), participant, fileName);
_logger.LogError(ex, "Batch processing failed on attempt {RetryAttempt}.", retryCount);

if (retryCount >= MaxRetryAttempts)
{
_logger.LogWarning("Maximum retry attempts reached. Handling failure for file {FileName}.", fileName);
await HandleFileFailure(fileName, ex.Message);
}
else
{
await Task.Delay(2000 * retryCount); // Exponential backoff
}
}
private async Task HandleFileFailure(string fileName, string errorMessage, Participant participant = null)
{
try
{
// Read file data directly using the fileName
string filePath = Path.Combine(Environment.GetEnvironmentVariable("FileDirectoryPath"), fileName);
if (!File.Exists(filePath))
{
_logger.LogWarning("File {FileName} does not exist. Skipping upload to blob storage.", fileName);
return;
}

byte[] fileData = await File.ReadAllBytesAsync(filePath);
var blobFile = new BlobFile(fileData, fileName);

bool isUploaded = await _blobStorageHelper.UploadFileToBlobStorage(
connectionString: Environment.GetEnvironmentVariable("AzureBlobConnectionString"),
containerName: "FailedFilesContainer",
blobFile: blobFile,
overwrite: true);

if (isUploaded)
{
_logger.LogInformation("File {FileName} successfully uploaded to blob storage.", fileName);
}
else
{
_logger.LogWarning("Failed to upload file {FileName} to blob storage.", fileName);
}

// Log the failure to the database
await _exceptionHandler.CreateSystemExceptionLog(
new Exception(errorMessage),
participant,
fileName);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error while handling failure for file {FileName}.", fileName);

// Log the failure to the database in case of an additional exception
await _exceptionHandler.CreateSystemExceptionLog(
ex,
participant,
fileName);
}
}
private bool ValidateParticipant(Participant participant, string fileName)
{
if (!ValidationHelper.ValidateNHSNumber(participant.NhsNumber))
Expand Down

0 comments on commit 60c2737

Please sign in to comment.