From 2c17c90dc5509ba8d3b0f4972c6613a8027d387d Mon Sep 17 00:00:00 2001 From: aacitelli Date: Thu, 20 Feb 2025 17:19:56 +0100 Subject: [PATCH 1/5] [PAGOPA-2619] First impl http recovery function and code refactoring --- pom.xml | 7 + .../to/eventhub/BlobProcessingFunction.java | 238 ++---------------- .../to/eventhub/HttpBlobRecoveryFunction.java | 210 ++++++++++++++++ .../fdr/to/eventhub/model/BlobFileData.java | 17 ++ .../fdr/to/eventhub/util/CommonUtil.java | 218 ++++++++++++++++ 5 files changed, 474 insertions(+), 216 deletions(-) create mode 100644 src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java create mode 100644 src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java create mode 100644 src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java diff --git a/pom.xml b/pom.xml index 09eec16..ce49380 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ 5.20.0 + 12.29.0 com.microsoft.azure-20220215182005862 2.18.2 @@ -43,6 +44,12 @@ ${azure.messaging.eventhubs.version} + + com.azure + azure-storage-blob + ${azure.storage.blob.version} + + com.fasterxml.jackson.core diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java index 44ea9ad..930b3fe 100644 --- a/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java @@ -1,44 +1,21 @@ package it.gov.pagopa.fdr.to.eventhub; -import com.azure.core.amqp.AmqpRetryMode; -import com.azure.core.amqp.AmqpRetryOptions; -import com.azure.messaging.eventhubs.EventData; -import com.azure.messaging.eventhubs.EventDataBatch; -import com.azure.messaging.eventhubs.EventHubClientBuilder; import com.azure.messaging.eventhubs.EventHubProducerClient; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.json.JsonMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.microsoft.azure.functions.ExecutionContext; import com.microsoft.azure.functions.annotation.BindingName; import com.microsoft.azure.functions.annotation.BlobTrigger; import com.microsoft.azure.functions.annotation.FunctionName; import it.gov.pagopa.fdr.to.eventhub.exception.EventHubException; -import it.gov.pagopa.fdr.to.eventhub.mapper.FlussoRendicontazioneMapper; import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione; -import it.gov.pagopa.fdr.to.eventhub.model.eventhub.FlowTxEventModel; -import it.gov.pagopa.fdr.to.eventhub.model.eventhub.ReportedIUVEventModel; -import it.gov.pagopa.fdr.to.eventhub.parser.FDR1XmlSAXParser; +import it.gov.pagopa.fdr.to.eventhub.util.CommonUtil; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.zip.GZIPInputStream; -import javax.xml.parsers.ParserConfigurationException; -import org.xml.sax.SAXException; public class BlobProcessingFunction { - private static final String LOG_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; - private static final String SERVICE_IDENTIFIER = "serviceIdentifier"; private final String fdr1Container = System.getenv().getOrDefault("BLOB_STORAGE_FDR1_CONTAINER", "fdr1-flows"); private final String fdr3Container = @@ -48,29 +25,14 @@ public class BlobProcessingFunction { public BlobProcessingFunction() { this.eventHubClientFlowTx = - new EventHubClientBuilder() - .connectionString( - System.getenv("EVENT_HUB_FLOWTX_CONNECTION_STRING"), - System.getenv("EVENT_HUB_FLOWTX_NAME")) - .retryOptions( - new AmqpRetryOptions() - .setMaxRetries(3) // Maximum number of - // attempts - .setDelay(Duration.ofSeconds(2)) // Delay between attempts - .setMode(AmqpRetryMode.EXPONENTIAL)) // Backoff strategy - .buildProducerClient(); + CommonUtil.createEventHubClient( + System.getenv("EVENT_HUB_FLOWTX_CONNECTION_STRING"), + System.getenv("EVENT_HUB_FLOWTX_NAME")); this.eventHubClientReportedIUV = - new EventHubClientBuilder() - .connectionString( - System.getenv("EVENT_HUB_REPORTEDIUV_CONNECTION_STRING"), - System.getenv("EVENT_HUB_REPORTEDIUV_NAME")) - .retryOptions( - new AmqpRetryOptions() - .setMaxRetries(3) - .setDelay(Duration.ofSeconds(2)) - .setMode(AmqpRetryMode.EXPONENTIAL)) - .buildProducerClient(); + CommonUtil.createEventHubClient( + System.getenv("EVENT_HUB_REPORTEDIUV_CONNECTION_STRING"), + System.getenv("EVENT_HUB_REPORTEDIUV_NAME")); } // Constructor to inject the Event Hub clients @@ -94,7 +56,7 @@ public synchronized void processFDR1BlobFiles( final ExecutionContext context) { // checks for the presence of the necessary metadata - if (!validateBlobMetadata(blobMetadata)) { + if (!CommonUtil.validateBlobMetadata(blobMetadata)) { context .getLogger() .warning( @@ -107,7 +69,7 @@ public synchronized void processFDR1BlobFiles( } // verify that the file is present and that it is a compressed file - boolean isValidGzipFile = isGzip(content); + boolean isValidGzipFile = CommonUtil.isGzip(content); context .getLogger() @@ -115,15 +77,16 @@ public synchronized void processFDR1BlobFiles( () -> String.format( "[FDR1] Triggered at: %s for Blob container: %s, name: %s, size in bytes: %d", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + LocalDateTime.now() + .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)), fdr1Container, blobName, content.length)); try (InputStream decompressedStream = - isValidGzipFile ? decompressGzip(content) : new ByteArrayInputStream(content)) { + isValidGzipFile ? CommonUtil.decompressGzip(content) : new ByteArrayInputStream(content)) { - FlussoRendicontazione flusso = parseXml(decompressedStream); + FlussoRendicontazione flusso = CommonUtil.parseXml(decompressedStream); context .getLogger() @@ -132,7 +95,8 @@ public synchronized void processFDR1BlobFiles( String.format( "[FDR1] Parsed Finished at: %s for Blob container: %s, name: %s, size in" + " bytes: %d", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + LocalDateTime.now() + .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)), fdr1Container, blobName, content.length)); @@ -140,7 +104,9 @@ public synchronized void processFDR1BlobFiles( flusso.setMetadata(blobMetadata); // Waits for confirmation of sending the entire flow to the Event Hub - boolean eventBatchSent = processXmlBlobAndSendToEventHub(flusso, context); + boolean eventBatchSent = + CommonUtil.processXmlBlobAndSendToEventHub( + eventHubClientFlowTx, eventHubClientReportedIUV, flusso, context); if (!eventBatchSent) { throw new EventHubException( String.format( @@ -155,7 +121,8 @@ public synchronized void processFDR1BlobFiles( String.format( "[FDR1] Execution Finished at: %s for Blob container: %s, name: %s, size in" + " bytes: %d", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + LocalDateTime.now() + .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)), fdr1Container, blobName, content.length)); @@ -198,171 +165,10 @@ public void processFDR3BlobFiles( String.format( "[FDR3] Execution Finished at: %s for Blob container: %s, name: %s, size: %d" + " bytes", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + LocalDateTime.now() + .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)), fdr1Container, blobName, content.length)); } - - private boolean isGzip(byte[] content) { - if (content == null || content.length == 0) { - throw new IllegalArgumentException("Invalid input data for decompression: empty file"); - } - return content.length > 2 && content[0] == (byte) 0x1F && content[1] == (byte) 0x8B; - } - - private boolean validateBlobMetadata(Map blobMetadata) { - if (blobMetadata == null - || blobMetadata.isEmpty() - || !blobMetadata.containsKey("sessionId") - || !blobMetadata.containsKey("insertedTimestamp")) { - throw new IllegalArgumentException( - "Invalid blob metadata: sessionId or insertedTimestamp is missing."); - } - return !("false".equalsIgnoreCase(blobMetadata.get("elaborate"))); - } - - private InputStream decompressGzip(byte[] compressedContent) throws IOException { - return new GZIPInputStream(new ByteArrayInputStream(compressedContent)); - } - - private FlussoRendicontazione parseXml(InputStream xmlStream) - throws ParserConfigurationException, SAXException, IOException { - return FDR1XmlSAXParser.parseXmlStream(xmlStream); - } - - private boolean processXmlBlobAndSendToEventHub( - FlussoRendicontazione flussoRendicontazione, ExecutionContext context) { - try { - // Convert FlussoRendicontazione to event models - FlowTxEventModel flowEvent = - FlussoRendicontazioneMapper.toFlowTxEventList(flussoRendicontazione); - List reportedIUVEventList = - FlussoRendicontazioneMapper.toReportedIUVEventList(flussoRendicontazione); - - // Serialize the objects to JSON - JsonMapper objectMapper = - JsonMapper.builder() - .addModule(new JavaTimeModule()) - .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) - .build(); - - String flowEventJson = objectMapper.writeValueAsString(flowEvent); - - // Break the list into smaller batches to avoid overshooting limit - List reportedIUVEventJsonChunks = splitIntoChunks(reportedIUVEventList, objectMapper); - - context - .getLogger() - .fine( - () -> - String.format( - "Chunk splitting process completed at: %s for flow ID: %s. Total number of" - + " chunks: %d", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), - flussoRendicontazione.getIdentificativoFlusso(), - reportedIUVEventJsonChunks.size())); - - boolean flowEventSent = - sendEventToHub(flowEventJson, eventHubClientFlowTx, flussoRendicontazione, context); - boolean allEventChunksSent = true; - - for (String chunk : reportedIUVEventJsonChunks) { - if (!sendEventToHub(chunk, eventHubClientReportedIUV, flussoRendicontazione, context)) { - allEventChunksSent = false; - break; - } - } - - return flowEventSent && allEventChunksSent; - - } catch (Exception e) { - // Log the exception with context - String errorMessage = - String.format( - "Error processing or sending data to event hub: %s. Details: %s", - flussoRendicontazione.getIdentificativoFlusso(), e.getMessage()); - context.getLogger().severe(() -> errorMessage); - - return false; - } - } - - /** Divides the event list into smaller JSON blocks (to avoid exceeding 1MB) */ - private List splitIntoChunks( - List eventList, JsonMapper objectMapper) - throws JsonProcessingException { - - List chunks = new ArrayList<>(); - List tempBatch = new ArrayList<>(); - final int MAX_CHUNK_SIZE_BYTES = 900 * 1024; // 900 KB for security - - StringBuilder currentJsonBatch = new StringBuilder(); - AtomicInteger currentBatchSize = new AtomicInteger(0); - - for (ReportedIUVEventModel event : eventList) { - tempBatch.add(event); - String eventJson = objectMapper.writeValueAsString(event); - int eventSize = eventJson.getBytes(StandardCharsets.UTF_8).length; - - if (currentBatchSize.addAndGet(eventSize) > MAX_CHUNK_SIZE_BYTES) { - // If the limit is exceed, add the current batch and reset it - chunks.add(currentJsonBatch.toString()); - currentJsonBatch.setLength(0); // Reset the StringBuilder - currentBatchSize.set(0); // Reset the batch size - tempBatch.clear(); // Clear the current batch - tempBatch.add(event); // Start with the current event - currentJsonBatch.append(objectMapper.writeValueAsString(tempBatch)); - currentBatchSize.addAndGet(eventSize); - } else { - // Add the event to the current batch - currentJsonBatch.append(eventJson); - } - } - - // Add remaining items - if (currentBatchSize.get() > 0) { - chunks.add(currentJsonBatch.toString()); - } - - return chunks; - } - - /** Send a message to the Event Hub */ - private boolean sendEventToHub( - String jsonPayload, - EventHubProducerClient eventHubClient, - FlussoRendicontazione flusso, - ExecutionContext context) { - EventData eventData = new EventData(jsonPayload); - eventData - .getProperties() - .put(SERVICE_IDENTIFIER, flusso.getMetadata().getOrDefault(SERVICE_IDENTIFIER, "NA")); - - EventDataBatch eventBatch = eventHubClient.createBatch(); - if (!eventBatch.tryAdd(eventData)) { - context - .getLogger() - .warning( - () -> - String.format( - "Failed to add event to batch for flow ID: %s", - flusso.getIdentificativoFlusso())); - return false; - } - - try { - eventHubClient.send(eventBatch); - return true; - } catch (Exception e) { - context - .getLogger() - .warning( - () -> - String.format( - "Failed to add event to batch for flow ID: %s. Details: %s", - flusso.getIdentificativoFlusso(), e.getMessage())); - return false; - } - } } diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java new file mode 100644 index 0000000..156b707 --- /dev/null +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java @@ -0,0 +1,210 @@ +package it.gov.pagopa.fdr.to.eventhub; + +import com.azure.messaging.eventhubs.EventHubProducerClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.HttpStatus; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import it.gov.pagopa.fdr.to.eventhub.model.BlobFileData; +import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione; +import it.gov.pagopa.fdr.to.eventhub.util.CommonUtil; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Azure Functions with Azure Http trigger. */ +public class HttpBlobRecoveryFunction { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String CONTENT_TYPE = "Content-Type"; + private static final String APPLICATION_JSON = "application/json"; + private static final String JSON_FILENAME = "fileName"; + private static final String JSON_CONTAINER = "container"; + + private final EventHubProducerClient eventHubClientFlowTx; + private final EventHubProducerClient eventHubClientReportedIUV; + + public HttpBlobRecoveryFunction() { + this.eventHubClientFlowTx = + CommonUtil.createEventHubClient( + System.getenv("EVENT_HUB_FLOWTX_CONNECTION_STRING"), + System.getenv("EVENT_HUB_FLOWTX_NAME")); + + this.eventHubClientReportedIUV = + CommonUtil.createEventHubClient( + System.getenv("EVENT_HUB_REPORTEDIUV_CONNECTION_STRING"), + System.getenv("EVENT_HUB_REPORTEDIUV_NAME")); + } + + public HttpBlobRecoveryFunction( + EventHubProducerClient eventHubClientFlowTx, + EventHubProducerClient eventHubClientReportedIUV) { + this.eventHubClientFlowTx = eventHubClientFlowTx; + this.eventHubClientReportedIUV = eventHubClientReportedIUV; + } + + @FunctionName("HTTPBlobRecovery") + public HttpResponseMessage run( + @HttpTrigger( + name = "HTTPBlobRecoveryTrigger", + methods = {HttpMethod.POST}, + route = "fdr/notify", + authLevel = AuthorizationLevel.ANONYMOUS) + HttpRequestMessage> request, + final ExecutionContext context) { + + // Check if body is present + Optional requestBody = request.getBody(); + if (!requestBody.isPresent()) { + return badRequest(request, "Missing request body"); + } + + try { + JsonNode jsonNode = objectMapper.readTree(requestBody.get()); + String fileName = + Optional.ofNullable(jsonNode.get(JSON_FILENAME)).map(JsonNode::asText).orElse(null); + String container = + Optional.ofNullable(jsonNode.get(JSON_CONTAINER)).map(JsonNode::asText).orElse(null); + + if (fileName == null || container == null) { + return badRequest(request, "Missing required fields: fileName, container"); + } + + context + .getLogger() + .info( + () -> + String.format( + "[HTTP FDR] Triggered at: %s for Blob container: %s, name: %s", + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), + container, + fileName)); + + BlobFileData fileData = getBlobFile(container, fileName, context); + + if (Objects.isNull(fileData)) { + return notFound( + request, String.format("File %s not found in container %s", fileName, container)); + } + + if (!CommonUtil.validateBlobMetadata(fileData.getMetadata())) { + return unprocessableEntity( + request, + String.format( + "The file %s in container %s is missing required metadata", fileName, container)); + } + + boolean isValidGzipFile = CommonUtil.isGzip(fileData.getFileContent()); + + try (InputStream decompressedStream = + isValidGzipFile + ? CommonUtil.decompressGzip(fileData.getFileContent()) + : new ByteArrayInputStream(fileData.getFileContent())) { + + FlussoRendicontazione flusso = CommonUtil.parseXml(decompressedStream); + flusso.setMetadata(fileData.getMetadata()); + + boolean eventBatchSent = + CommonUtil.processXmlBlobAndSendToEventHub( + eventHubClientFlowTx, eventHubClientReportedIUV, flusso, context); + + if (!eventBatchSent) { + return serviceUnavailable( + request, + String.format( + "EventHub failed to confirm batch processing for flow ID %s [file %s, container" + + " %s]", + flusso.getIdentificativoFlusso(), fileName, container)); + } + } + + return ok( + request, + String.format( + "Processed recovery request for file: %s in container: %s", fileName, container)); + + } catch (IOException e) { + return badRequest(request, "Invalid JSON format"); + } catch (Exception e) { + context.getLogger().severe("[HTTP FDR] Unexpected error: " + e.getMessage()); + return serverError(request, "Internal Server Error"); + } + } + + private BlobFileData getBlobFile( + String containerName, String blobName, ExecutionContext context) { + try { + BlobServiceClient blobServiceClient = + new BlobServiceClientBuilder() + .connectionString(System.getenv("FDR_SA_CONNECTION_STRING")) + .buildClient(); + + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(containerName); + BlobClient blobClient = containerClient.getBlobClient(blobName); + + if (Boolean.FALSE.equals(blobClient.exists())) { + context.getLogger().severe(() -> "[HTTP FDR] Blob not found: " + blobName); + return null; + } + + Map metadata = blobClient.getProperties().getMetadata(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + blobClient.downloadStream(outputStream); + + return new BlobFileData(outputStream.toByteArray(), metadata); + + } catch (Exception e) { + context.getLogger().severe("[HTTP FDR] Error accessing blob: " + e.getMessage()); + return null; + } + } + + private HttpResponseMessage ok(HttpRequestMessage request, String message) { + return response(request, HttpStatus.OK, message); + } + + private HttpResponseMessage badRequest(HttpRequestMessage request, String message) { + return response(request, HttpStatus.BAD_REQUEST, message); + } + + private HttpResponseMessage notFound(HttpRequestMessage request, String message) { + return response(request, HttpStatus.NOT_FOUND, message); + } + + private HttpResponseMessage unprocessableEntity(HttpRequestMessage request, String message) { + return response(request, HttpStatus.UNPROCESSABLE_ENTITY, message); + } + + private HttpResponseMessage serviceUnavailable(HttpRequestMessage request, String message) { + return response(request, HttpStatus.SERVICE_UNAVAILABLE, message); + } + + private HttpResponseMessage serverError(HttpRequestMessage request, String message) { + return response(request, HttpStatus.INTERNAL_SERVER_ERROR, message); + } + + private HttpResponseMessage response( + HttpRequestMessage request, HttpStatus status, String message) { + return request + .createResponseBuilder(status) + .header(CONTENT_TYPE, APPLICATION_JSON) + .body("{\"message\": \"" + message + "\"}") + .build(); + } +} diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java new file mode 100644 index 0000000..0019545 --- /dev/null +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java @@ -0,0 +1,17 @@ +package it.gov.pagopa.fdr.to.eventhub.model; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class BlobFileData { + + private byte[] fileContent; + private Map metadata; +} diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java new file mode 100644 index 0000000..9b5d1b1 --- /dev/null +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java @@ -0,0 +1,218 @@ +package it.gov.pagopa.fdr.to.eventhub.util; + +import com.azure.core.amqp.AmqpRetryMode; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubProducerClient; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.microsoft.azure.functions.ExecutionContext; +import it.gov.pagopa.fdr.to.eventhub.mapper.FlussoRendicontazioneMapper; +import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione; +import it.gov.pagopa.fdr.to.eventhub.model.eventhub.FlowTxEventModel; +import it.gov.pagopa.fdr.to.eventhub.model.eventhub.ReportedIUVEventModel; +import it.gov.pagopa.fdr.to.eventhub.parser.FDR1XmlSAXParser; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.GZIPInputStream; +import javax.xml.parsers.ParserConfigurationException; +import lombok.experimental.UtilityClass; +import org.xml.sax.SAXException; + +@UtilityClass +public class CommonUtil { + + public static final String LOG_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; + + private static final String SERVICE_IDENTIFIER = "serviceIdentifier"; + + public static EventHubProducerClient createEventHubClient( + String connectionString, String eventHubName) { + return new EventHubClientBuilder() + .connectionString(connectionString, eventHubName) + .retryOptions( + new AmqpRetryOptions() + .setMaxRetries(3) + .setDelay(Duration.ofSeconds(2)) + .setMode(AmqpRetryMode.EXPONENTIAL)) + .buildProducerClient(); + } + + public static boolean validateBlobMetadata(Map blobMetadata) { + if (blobMetadata == null + || blobMetadata.isEmpty() + || !blobMetadata.containsKey("sessionId") + || !blobMetadata.containsKey("insertedTimestamp")) { + throw new IllegalArgumentException( + "Invalid blob metadata: sessionId or insertedTimestamp is missing."); + } + return !("false".equalsIgnoreCase(blobMetadata.get("elaborate"))); + } + + public static boolean isGzip(byte[] content) { + if (content == null || content.length == 0) { + throw new IllegalArgumentException("Invalid input data for decompression: empty file"); + } + return content.length > 2 && content[0] == (byte) 0x1F && content[1] == (byte) 0x8B; + } + + public static InputStream decompressGzip(byte[] compressedContent) throws IOException { + return new GZIPInputStream(new ByteArrayInputStream(compressedContent)); + } + + public static FlussoRendicontazione parseXml(InputStream xmlStream) + throws ParserConfigurationException, SAXException, IOException { + return FDR1XmlSAXParser.parseXmlStream(xmlStream); + } + + public static boolean processXmlBlobAndSendToEventHub( + final EventHubProducerClient eventHubClientFlowTx, + final EventHubProducerClient eventHubClientReportedIUV, + FlussoRendicontazione flussoRendicontazione, + ExecutionContext context) { + try { + // Convert FlussoRendicontazione to event models + FlowTxEventModel flowEvent = + FlussoRendicontazioneMapper.toFlowTxEventList(flussoRendicontazione); + List reportedIUVEventList = + FlussoRendicontazioneMapper.toReportedIUVEventList(flussoRendicontazione); + + // Serialize the objects to JSON + JsonMapper objectMapper = + JsonMapper.builder() + .addModule(new JavaTimeModule()) + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + .build(); + + String flowEventJson = objectMapper.writeValueAsString(flowEvent); + + // Break the list into smaller batches to avoid overshooting limit + List reportedIUVEventJsonChunks = splitIntoChunks(reportedIUVEventList, objectMapper); + + context + .getLogger() + .fine( + () -> + String.format( + "Chunk splitting process completed at: %s for flow ID: %s. Total number of" + + " chunks: %d", + LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + flussoRendicontazione.getIdentificativoFlusso(), + reportedIUVEventJsonChunks.size())); + + boolean flowEventSent = + sendEventToHub(flowEventJson, eventHubClientFlowTx, flussoRendicontazione, context); + boolean allEventChunksSent = true; + + for (String chunk : reportedIUVEventJsonChunks) { + if (!sendEventToHub(chunk, eventHubClientReportedIUV, flussoRendicontazione, context)) { + allEventChunksSent = false; + break; + } + } + + return flowEventSent && allEventChunksSent; + + } catch (Exception e) { + // Log the exception with context + String errorMessage = + String.format( + "Error processing or sending data to event hub: %s. Details: %s", + flussoRendicontazione.getIdentificativoFlusso(), e.getMessage()); + context.getLogger().severe(() -> errorMessage); + + return false; + } + } + + /** Divides the event list into smaller JSON blocks (to avoid exceeding 1MB) */ + private List splitIntoChunks( + List eventList, JsonMapper objectMapper) + throws JsonProcessingException { + + List chunks = new ArrayList<>(); + List tempBatch = new ArrayList<>(); + final int MAX_CHUNK_SIZE_BYTES = 900 * 1024; // 900 KB for security + + StringBuilder currentJsonBatch = new StringBuilder(); + AtomicInteger currentBatchSize = new AtomicInteger(0); + + for (ReportedIUVEventModel event : eventList) { + tempBatch.add(event); + String eventJson = objectMapper.writeValueAsString(event); + int eventSize = eventJson.getBytes(StandardCharsets.UTF_8).length; + + if (currentBatchSize.addAndGet(eventSize) > MAX_CHUNK_SIZE_BYTES) { + // If the limit is exceed, add the current batch and reset it + chunks.add(currentJsonBatch.toString()); + currentJsonBatch.setLength(0); // Reset the StringBuilder + currentBatchSize.set(0); // Reset the batch size + tempBatch.clear(); // Clear the current batch + tempBatch.add(event); // Start with the current event + currentJsonBatch.append(objectMapper.writeValueAsString(tempBatch)); + currentBatchSize.addAndGet(eventSize); + } else { + // Add the event to the current batch + currentJsonBatch.append(eventJson); + } + } + + // Add remaining items + if (currentBatchSize.get() > 0) { + chunks.add(currentJsonBatch.toString()); + } + + return chunks; + } + + /** Send a message to the Event Hub */ + private boolean sendEventToHub( + String jsonPayload, + EventHubProducerClient eventHubClient, + FlussoRendicontazione flusso, + ExecutionContext context) { + EventData eventData = new EventData(jsonPayload); + eventData + .getProperties() + .put(SERVICE_IDENTIFIER, flusso.getMetadata().getOrDefault(SERVICE_IDENTIFIER, "NA")); + + EventDataBatch eventBatch = eventHubClient.createBatch(); + if (!eventBatch.tryAdd(eventData)) { + context + .getLogger() + .warning( + () -> + String.format( + "Failed to add event to batch for flow ID: %s", + flusso.getIdentificativoFlusso())); + return false; + } + + try { + eventHubClient.send(eventBatch); + return true; + } catch (Exception e) { + context + .getLogger() + .warning( + () -> + String.format( + "Failed to add event to batch for flow ID: %s. Details: %s", + flusso.getIdentificativoFlusso(), e.getMessage())); + return false; + } + } +} From b5418107ac5d7431bd15642cbed1c06c9c434b9a Mon Sep 17 00:00:00 2001 From: aacitelli Date: Thu, 20 Feb 2025 17:19:56 +0100 Subject: [PATCH 2/5] [PAGOPA-2619] First impl http recovery function and code refactoring --- .github/workflows/check_pr.yml | 26 +- pom.xml | 7 + .../to/eventhub/BlobProcessingFunction.java | 238 ++---------------- .../to/eventhub/HttpBlobRecoveryFunction.java | 210 ++++++++++++++++ .../fdr/to/eventhub/model/BlobFileData.java | 17 ++ .../fdr/to/eventhub/util/CommonUtil.java | 218 ++++++++++++++++ 6 files changed, 497 insertions(+), 219 deletions(-) create mode 100644 src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java create mode 100644 src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java create mode 100644 src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java diff --git a/.github/workflows/check_pr.yml b/.github/workflows/check_pr.yml index 72e015c..acc40c3 100644 --- a/.github/workflows/check_pr.yml +++ b/.github/workflows/check_pr.yml @@ -28,6 +28,7 @@ jobs: with: configuration-path: '.github/auto_assign.yml' + check_labels: name: Check Required Labels # The type of runner that the job will run on @@ -41,9 +42,28 @@ jobs: with: github-token: ${{ secrets.GITHUB_TOKEN }} script: | - const script = require('./.github/workflows/github_scripts/check_required_labels.js') - script({github, context, core}) - + var comments = await github.rest.issues.listComments({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo + }); + for (const comment of comments.data) { + if (comment.body.includes('This pull request does not contain a valid label')){ + github.rest.issues.deleteComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + comment_id: comment.id + }) + } + } + github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: 'This pull request does not contain a valid label. Please add one of the following labels: `[major, minor, patch, patch, skip]`' + }) + core.setFailed('Missing required labels') formatter: name: Formatter diff --git a/pom.xml b/pom.xml index 09eec16..ce49380 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ 5.20.0 + 12.29.0 com.microsoft.azure-20220215182005862 2.18.2 @@ -43,6 +44,12 @@ ${azure.messaging.eventhubs.version} + + com.azure + azure-storage-blob + ${azure.storage.blob.version} + + com.fasterxml.jackson.core diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java index 44ea9ad..930b3fe 100644 --- a/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java @@ -1,44 +1,21 @@ package it.gov.pagopa.fdr.to.eventhub; -import com.azure.core.amqp.AmqpRetryMode; -import com.azure.core.amqp.AmqpRetryOptions; -import com.azure.messaging.eventhubs.EventData; -import com.azure.messaging.eventhubs.EventDataBatch; -import com.azure.messaging.eventhubs.EventHubClientBuilder; import com.azure.messaging.eventhubs.EventHubProducerClient; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.json.JsonMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.microsoft.azure.functions.ExecutionContext; import com.microsoft.azure.functions.annotation.BindingName; import com.microsoft.azure.functions.annotation.BlobTrigger; import com.microsoft.azure.functions.annotation.FunctionName; import it.gov.pagopa.fdr.to.eventhub.exception.EventHubException; -import it.gov.pagopa.fdr.to.eventhub.mapper.FlussoRendicontazioneMapper; import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione; -import it.gov.pagopa.fdr.to.eventhub.model.eventhub.FlowTxEventModel; -import it.gov.pagopa.fdr.to.eventhub.model.eventhub.ReportedIUVEventModel; -import it.gov.pagopa.fdr.to.eventhub.parser.FDR1XmlSAXParser; +import it.gov.pagopa.fdr.to.eventhub.util.CommonUtil; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.zip.GZIPInputStream; -import javax.xml.parsers.ParserConfigurationException; -import org.xml.sax.SAXException; public class BlobProcessingFunction { - private static final String LOG_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; - private static final String SERVICE_IDENTIFIER = "serviceIdentifier"; private final String fdr1Container = System.getenv().getOrDefault("BLOB_STORAGE_FDR1_CONTAINER", "fdr1-flows"); private final String fdr3Container = @@ -48,29 +25,14 @@ public class BlobProcessingFunction { public BlobProcessingFunction() { this.eventHubClientFlowTx = - new EventHubClientBuilder() - .connectionString( - System.getenv("EVENT_HUB_FLOWTX_CONNECTION_STRING"), - System.getenv("EVENT_HUB_FLOWTX_NAME")) - .retryOptions( - new AmqpRetryOptions() - .setMaxRetries(3) // Maximum number of - // attempts - .setDelay(Duration.ofSeconds(2)) // Delay between attempts - .setMode(AmqpRetryMode.EXPONENTIAL)) // Backoff strategy - .buildProducerClient(); + CommonUtil.createEventHubClient( + System.getenv("EVENT_HUB_FLOWTX_CONNECTION_STRING"), + System.getenv("EVENT_HUB_FLOWTX_NAME")); this.eventHubClientReportedIUV = - new EventHubClientBuilder() - .connectionString( - System.getenv("EVENT_HUB_REPORTEDIUV_CONNECTION_STRING"), - System.getenv("EVENT_HUB_REPORTEDIUV_NAME")) - .retryOptions( - new AmqpRetryOptions() - .setMaxRetries(3) - .setDelay(Duration.ofSeconds(2)) - .setMode(AmqpRetryMode.EXPONENTIAL)) - .buildProducerClient(); + CommonUtil.createEventHubClient( + System.getenv("EVENT_HUB_REPORTEDIUV_CONNECTION_STRING"), + System.getenv("EVENT_HUB_REPORTEDIUV_NAME")); } // Constructor to inject the Event Hub clients @@ -94,7 +56,7 @@ public synchronized void processFDR1BlobFiles( final ExecutionContext context) { // checks for the presence of the necessary metadata - if (!validateBlobMetadata(blobMetadata)) { + if (!CommonUtil.validateBlobMetadata(blobMetadata)) { context .getLogger() .warning( @@ -107,7 +69,7 @@ public synchronized void processFDR1BlobFiles( } // verify that the file is present and that it is a compressed file - boolean isValidGzipFile = isGzip(content); + boolean isValidGzipFile = CommonUtil.isGzip(content); context .getLogger() @@ -115,15 +77,16 @@ public synchronized void processFDR1BlobFiles( () -> String.format( "[FDR1] Triggered at: %s for Blob container: %s, name: %s, size in bytes: %d", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + LocalDateTime.now() + .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)), fdr1Container, blobName, content.length)); try (InputStream decompressedStream = - isValidGzipFile ? decompressGzip(content) : new ByteArrayInputStream(content)) { + isValidGzipFile ? CommonUtil.decompressGzip(content) : new ByteArrayInputStream(content)) { - FlussoRendicontazione flusso = parseXml(decompressedStream); + FlussoRendicontazione flusso = CommonUtil.parseXml(decompressedStream); context .getLogger() @@ -132,7 +95,8 @@ public synchronized void processFDR1BlobFiles( String.format( "[FDR1] Parsed Finished at: %s for Blob container: %s, name: %s, size in" + " bytes: %d", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + LocalDateTime.now() + .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)), fdr1Container, blobName, content.length)); @@ -140,7 +104,9 @@ public synchronized void processFDR1BlobFiles( flusso.setMetadata(blobMetadata); // Waits for confirmation of sending the entire flow to the Event Hub - boolean eventBatchSent = processXmlBlobAndSendToEventHub(flusso, context); + boolean eventBatchSent = + CommonUtil.processXmlBlobAndSendToEventHub( + eventHubClientFlowTx, eventHubClientReportedIUV, flusso, context); if (!eventBatchSent) { throw new EventHubException( String.format( @@ -155,7 +121,8 @@ public synchronized void processFDR1BlobFiles( String.format( "[FDR1] Execution Finished at: %s for Blob container: %s, name: %s, size in" + " bytes: %d", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + LocalDateTime.now() + .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)), fdr1Container, blobName, content.length)); @@ -198,171 +165,10 @@ public void processFDR3BlobFiles( String.format( "[FDR3] Execution Finished at: %s for Blob container: %s, name: %s, size: %d" + " bytes", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + LocalDateTime.now() + .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)), fdr1Container, blobName, content.length)); } - - private boolean isGzip(byte[] content) { - if (content == null || content.length == 0) { - throw new IllegalArgumentException("Invalid input data for decompression: empty file"); - } - return content.length > 2 && content[0] == (byte) 0x1F && content[1] == (byte) 0x8B; - } - - private boolean validateBlobMetadata(Map blobMetadata) { - if (blobMetadata == null - || blobMetadata.isEmpty() - || !blobMetadata.containsKey("sessionId") - || !blobMetadata.containsKey("insertedTimestamp")) { - throw new IllegalArgumentException( - "Invalid blob metadata: sessionId or insertedTimestamp is missing."); - } - return !("false".equalsIgnoreCase(blobMetadata.get("elaborate"))); - } - - private InputStream decompressGzip(byte[] compressedContent) throws IOException { - return new GZIPInputStream(new ByteArrayInputStream(compressedContent)); - } - - private FlussoRendicontazione parseXml(InputStream xmlStream) - throws ParserConfigurationException, SAXException, IOException { - return FDR1XmlSAXParser.parseXmlStream(xmlStream); - } - - private boolean processXmlBlobAndSendToEventHub( - FlussoRendicontazione flussoRendicontazione, ExecutionContext context) { - try { - // Convert FlussoRendicontazione to event models - FlowTxEventModel flowEvent = - FlussoRendicontazioneMapper.toFlowTxEventList(flussoRendicontazione); - List reportedIUVEventList = - FlussoRendicontazioneMapper.toReportedIUVEventList(flussoRendicontazione); - - // Serialize the objects to JSON - JsonMapper objectMapper = - JsonMapper.builder() - .addModule(new JavaTimeModule()) - .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) - .build(); - - String flowEventJson = objectMapper.writeValueAsString(flowEvent); - - // Break the list into smaller batches to avoid overshooting limit - List reportedIUVEventJsonChunks = splitIntoChunks(reportedIUVEventList, objectMapper); - - context - .getLogger() - .fine( - () -> - String.format( - "Chunk splitting process completed at: %s for flow ID: %s. Total number of" - + " chunks: %d", - LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), - flussoRendicontazione.getIdentificativoFlusso(), - reportedIUVEventJsonChunks.size())); - - boolean flowEventSent = - sendEventToHub(flowEventJson, eventHubClientFlowTx, flussoRendicontazione, context); - boolean allEventChunksSent = true; - - for (String chunk : reportedIUVEventJsonChunks) { - if (!sendEventToHub(chunk, eventHubClientReportedIUV, flussoRendicontazione, context)) { - allEventChunksSent = false; - break; - } - } - - return flowEventSent && allEventChunksSent; - - } catch (Exception e) { - // Log the exception with context - String errorMessage = - String.format( - "Error processing or sending data to event hub: %s. Details: %s", - flussoRendicontazione.getIdentificativoFlusso(), e.getMessage()); - context.getLogger().severe(() -> errorMessage); - - return false; - } - } - - /** Divides the event list into smaller JSON blocks (to avoid exceeding 1MB) */ - private List splitIntoChunks( - List eventList, JsonMapper objectMapper) - throws JsonProcessingException { - - List chunks = new ArrayList<>(); - List tempBatch = new ArrayList<>(); - final int MAX_CHUNK_SIZE_BYTES = 900 * 1024; // 900 KB for security - - StringBuilder currentJsonBatch = new StringBuilder(); - AtomicInteger currentBatchSize = new AtomicInteger(0); - - for (ReportedIUVEventModel event : eventList) { - tempBatch.add(event); - String eventJson = objectMapper.writeValueAsString(event); - int eventSize = eventJson.getBytes(StandardCharsets.UTF_8).length; - - if (currentBatchSize.addAndGet(eventSize) > MAX_CHUNK_SIZE_BYTES) { - // If the limit is exceed, add the current batch and reset it - chunks.add(currentJsonBatch.toString()); - currentJsonBatch.setLength(0); // Reset the StringBuilder - currentBatchSize.set(0); // Reset the batch size - tempBatch.clear(); // Clear the current batch - tempBatch.add(event); // Start with the current event - currentJsonBatch.append(objectMapper.writeValueAsString(tempBatch)); - currentBatchSize.addAndGet(eventSize); - } else { - // Add the event to the current batch - currentJsonBatch.append(eventJson); - } - } - - // Add remaining items - if (currentBatchSize.get() > 0) { - chunks.add(currentJsonBatch.toString()); - } - - return chunks; - } - - /** Send a message to the Event Hub */ - private boolean sendEventToHub( - String jsonPayload, - EventHubProducerClient eventHubClient, - FlussoRendicontazione flusso, - ExecutionContext context) { - EventData eventData = new EventData(jsonPayload); - eventData - .getProperties() - .put(SERVICE_IDENTIFIER, flusso.getMetadata().getOrDefault(SERVICE_IDENTIFIER, "NA")); - - EventDataBatch eventBatch = eventHubClient.createBatch(); - if (!eventBatch.tryAdd(eventData)) { - context - .getLogger() - .warning( - () -> - String.format( - "Failed to add event to batch for flow ID: %s", - flusso.getIdentificativoFlusso())); - return false; - } - - try { - eventHubClient.send(eventBatch); - return true; - } catch (Exception e) { - context - .getLogger() - .warning( - () -> - String.format( - "Failed to add event to batch for flow ID: %s. Details: %s", - flusso.getIdentificativoFlusso(), e.getMessage())); - return false; - } - } } diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java new file mode 100644 index 0000000..156b707 --- /dev/null +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java @@ -0,0 +1,210 @@ +package it.gov.pagopa.fdr.to.eventhub; + +import com.azure.messaging.eventhubs.EventHubProducerClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.HttpStatus; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import it.gov.pagopa.fdr.to.eventhub.model.BlobFileData; +import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione; +import it.gov.pagopa.fdr.to.eventhub.util.CommonUtil; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Azure Functions with Azure Http trigger. */ +public class HttpBlobRecoveryFunction { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String CONTENT_TYPE = "Content-Type"; + private static final String APPLICATION_JSON = "application/json"; + private static final String JSON_FILENAME = "fileName"; + private static final String JSON_CONTAINER = "container"; + + private final EventHubProducerClient eventHubClientFlowTx; + private final EventHubProducerClient eventHubClientReportedIUV; + + public HttpBlobRecoveryFunction() { + this.eventHubClientFlowTx = + CommonUtil.createEventHubClient( + System.getenv("EVENT_HUB_FLOWTX_CONNECTION_STRING"), + System.getenv("EVENT_HUB_FLOWTX_NAME")); + + this.eventHubClientReportedIUV = + CommonUtil.createEventHubClient( + System.getenv("EVENT_HUB_REPORTEDIUV_CONNECTION_STRING"), + System.getenv("EVENT_HUB_REPORTEDIUV_NAME")); + } + + public HttpBlobRecoveryFunction( + EventHubProducerClient eventHubClientFlowTx, + EventHubProducerClient eventHubClientReportedIUV) { + this.eventHubClientFlowTx = eventHubClientFlowTx; + this.eventHubClientReportedIUV = eventHubClientReportedIUV; + } + + @FunctionName("HTTPBlobRecovery") + public HttpResponseMessage run( + @HttpTrigger( + name = "HTTPBlobRecoveryTrigger", + methods = {HttpMethod.POST}, + route = "fdr/notify", + authLevel = AuthorizationLevel.ANONYMOUS) + HttpRequestMessage> request, + final ExecutionContext context) { + + // Check if body is present + Optional requestBody = request.getBody(); + if (!requestBody.isPresent()) { + return badRequest(request, "Missing request body"); + } + + try { + JsonNode jsonNode = objectMapper.readTree(requestBody.get()); + String fileName = + Optional.ofNullable(jsonNode.get(JSON_FILENAME)).map(JsonNode::asText).orElse(null); + String container = + Optional.ofNullable(jsonNode.get(JSON_CONTAINER)).map(JsonNode::asText).orElse(null); + + if (fileName == null || container == null) { + return badRequest(request, "Missing required fields: fileName, container"); + } + + context + .getLogger() + .info( + () -> + String.format( + "[HTTP FDR] Triggered at: %s for Blob container: %s, name: %s", + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME), + container, + fileName)); + + BlobFileData fileData = getBlobFile(container, fileName, context); + + if (Objects.isNull(fileData)) { + return notFound( + request, String.format("File %s not found in container %s", fileName, container)); + } + + if (!CommonUtil.validateBlobMetadata(fileData.getMetadata())) { + return unprocessableEntity( + request, + String.format( + "The file %s in container %s is missing required metadata", fileName, container)); + } + + boolean isValidGzipFile = CommonUtil.isGzip(fileData.getFileContent()); + + try (InputStream decompressedStream = + isValidGzipFile + ? CommonUtil.decompressGzip(fileData.getFileContent()) + : new ByteArrayInputStream(fileData.getFileContent())) { + + FlussoRendicontazione flusso = CommonUtil.parseXml(decompressedStream); + flusso.setMetadata(fileData.getMetadata()); + + boolean eventBatchSent = + CommonUtil.processXmlBlobAndSendToEventHub( + eventHubClientFlowTx, eventHubClientReportedIUV, flusso, context); + + if (!eventBatchSent) { + return serviceUnavailable( + request, + String.format( + "EventHub failed to confirm batch processing for flow ID %s [file %s, container" + + " %s]", + flusso.getIdentificativoFlusso(), fileName, container)); + } + } + + return ok( + request, + String.format( + "Processed recovery request for file: %s in container: %s", fileName, container)); + + } catch (IOException e) { + return badRequest(request, "Invalid JSON format"); + } catch (Exception e) { + context.getLogger().severe("[HTTP FDR] Unexpected error: " + e.getMessage()); + return serverError(request, "Internal Server Error"); + } + } + + private BlobFileData getBlobFile( + String containerName, String blobName, ExecutionContext context) { + try { + BlobServiceClient blobServiceClient = + new BlobServiceClientBuilder() + .connectionString(System.getenv("FDR_SA_CONNECTION_STRING")) + .buildClient(); + + BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(containerName); + BlobClient blobClient = containerClient.getBlobClient(blobName); + + if (Boolean.FALSE.equals(blobClient.exists())) { + context.getLogger().severe(() -> "[HTTP FDR] Blob not found: " + blobName); + return null; + } + + Map metadata = blobClient.getProperties().getMetadata(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + blobClient.downloadStream(outputStream); + + return new BlobFileData(outputStream.toByteArray(), metadata); + + } catch (Exception e) { + context.getLogger().severe("[HTTP FDR] Error accessing blob: " + e.getMessage()); + return null; + } + } + + private HttpResponseMessage ok(HttpRequestMessage request, String message) { + return response(request, HttpStatus.OK, message); + } + + private HttpResponseMessage badRequest(HttpRequestMessage request, String message) { + return response(request, HttpStatus.BAD_REQUEST, message); + } + + private HttpResponseMessage notFound(HttpRequestMessage request, String message) { + return response(request, HttpStatus.NOT_FOUND, message); + } + + private HttpResponseMessage unprocessableEntity(HttpRequestMessage request, String message) { + return response(request, HttpStatus.UNPROCESSABLE_ENTITY, message); + } + + private HttpResponseMessage serviceUnavailable(HttpRequestMessage request, String message) { + return response(request, HttpStatus.SERVICE_UNAVAILABLE, message); + } + + private HttpResponseMessage serverError(HttpRequestMessage request, String message) { + return response(request, HttpStatus.INTERNAL_SERVER_ERROR, message); + } + + private HttpResponseMessage response( + HttpRequestMessage request, HttpStatus status, String message) { + return request + .createResponseBuilder(status) + .header(CONTENT_TYPE, APPLICATION_JSON) + .body("{\"message\": \"" + message + "\"}") + .build(); + } +} diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java new file mode 100644 index 0000000..0019545 --- /dev/null +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java @@ -0,0 +1,17 @@ +package it.gov.pagopa.fdr.to.eventhub.model; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class BlobFileData { + + private byte[] fileContent; + private Map metadata; +} diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java new file mode 100644 index 0000000..9b5d1b1 --- /dev/null +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java @@ -0,0 +1,218 @@ +package it.gov.pagopa.fdr.to.eventhub.util; + +import com.azure.core.amqp.AmqpRetryMode; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubProducerClient; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.microsoft.azure.functions.ExecutionContext; +import it.gov.pagopa.fdr.to.eventhub.mapper.FlussoRendicontazioneMapper; +import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione; +import it.gov.pagopa.fdr.to.eventhub.model.eventhub.FlowTxEventModel; +import it.gov.pagopa.fdr.to.eventhub.model.eventhub.ReportedIUVEventModel; +import it.gov.pagopa.fdr.to.eventhub.parser.FDR1XmlSAXParser; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.GZIPInputStream; +import javax.xml.parsers.ParserConfigurationException; +import lombok.experimental.UtilityClass; +import org.xml.sax.SAXException; + +@UtilityClass +public class CommonUtil { + + public static final String LOG_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; + + private static final String SERVICE_IDENTIFIER = "serviceIdentifier"; + + public static EventHubProducerClient createEventHubClient( + String connectionString, String eventHubName) { + return new EventHubClientBuilder() + .connectionString(connectionString, eventHubName) + .retryOptions( + new AmqpRetryOptions() + .setMaxRetries(3) + .setDelay(Duration.ofSeconds(2)) + .setMode(AmqpRetryMode.EXPONENTIAL)) + .buildProducerClient(); + } + + public static boolean validateBlobMetadata(Map blobMetadata) { + if (blobMetadata == null + || blobMetadata.isEmpty() + || !blobMetadata.containsKey("sessionId") + || !blobMetadata.containsKey("insertedTimestamp")) { + throw new IllegalArgumentException( + "Invalid blob metadata: sessionId or insertedTimestamp is missing."); + } + return !("false".equalsIgnoreCase(blobMetadata.get("elaborate"))); + } + + public static boolean isGzip(byte[] content) { + if (content == null || content.length == 0) { + throw new IllegalArgumentException("Invalid input data for decompression: empty file"); + } + return content.length > 2 && content[0] == (byte) 0x1F && content[1] == (byte) 0x8B; + } + + public static InputStream decompressGzip(byte[] compressedContent) throws IOException { + return new GZIPInputStream(new ByteArrayInputStream(compressedContent)); + } + + public static FlussoRendicontazione parseXml(InputStream xmlStream) + throws ParserConfigurationException, SAXException, IOException { + return FDR1XmlSAXParser.parseXmlStream(xmlStream); + } + + public static boolean processXmlBlobAndSendToEventHub( + final EventHubProducerClient eventHubClientFlowTx, + final EventHubProducerClient eventHubClientReportedIUV, + FlussoRendicontazione flussoRendicontazione, + ExecutionContext context) { + try { + // Convert FlussoRendicontazione to event models + FlowTxEventModel flowEvent = + FlussoRendicontazioneMapper.toFlowTxEventList(flussoRendicontazione); + List reportedIUVEventList = + FlussoRendicontazioneMapper.toReportedIUVEventList(flussoRendicontazione); + + // Serialize the objects to JSON + JsonMapper objectMapper = + JsonMapper.builder() + .addModule(new JavaTimeModule()) + .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + .build(); + + String flowEventJson = objectMapper.writeValueAsString(flowEvent); + + // Break the list into smaller batches to avoid overshooting limit + List reportedIUVEventJsonChunks = splitIntoChunks(reportedIUVEventList, objectMapper); + + context + .getLogger() + .fine( + () -> + String.format( + "Chunk splitting process completed at: %s for flow ID: %s. Total number of" + + " chunks: %d", + LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)), + flussoRendicontazione.getIdentificativoFlusso(), + reportedIUVEventJsonChunks.size())); + + boolean flowEventSent = + sendEventToHub(flowEventJson, eventHubClientFlowTx, flussoRendicontazione, context); + boolean allEventChunksSent = true; + + for (String chunk : reportedIUVEventJsonChunks) { + if (!sendEventToHub(chunk, eventHubClientReportedIUV, flussoRendicontazione, context)) { + allEventChunksSent = false; + break; + } + } + + return flowEventSent && allEventChunksSent; + + } catch (Exception e) { + // Log the exception with context + String errorMessage = + String.format( + "Error processing or sending data to event hub: %s. Details: %s", + flussoRendicontazione.getIdentificativoFlusso(), e.getMessage()); + context.getLogger().severe(() -> errorMessage); + + return false; + } + } + + /** Divides the event list into smaller JSON blocks (to avoid exceeding 1MB) */ + private List splitIntoChunks( + List eventList, JsonMapper objectMapper) + throws JsonProcessingException { + + List chunks = new ArrayList<>(); + List tempBatch = new ArrayList<>(); + final int MAX_CHUNK_SIZE_BYTES = 900 * 1024; // 900 KB for security + + StringBuilder currentJsonBatch = new StringBuilder(); + AtomicInteger currentBatchSize = new AtomicInteger(0); + + for (ReportedIUVEventModel event : eventList) { + tempBatch.add(event); + String eventJson = objectMapper.writeValueAsString(event); + int eventSize = eventJson.getBytes(StandardCharsets.UTF_8).length; + + if (currentBatchSize.addAndGet(eventSize) > MAX_CHUNK_SIZE_BYTES) { + // If the limit is exceed, add the current batch and reset it + chunks.add(currentJsonBatch.toString()); + currentJsonBatch.setLength(0); // Reset the StringBuilder + currentBatchSize.set(0); // Reset the batch size + tempBatch.clear(); // Clear the current batch + tempBatch.add(event); // Start with the current event + currentJsonBatch.append(objectMapper.writeValueAsString(tempBatch)); + currentBatchSize.addAndGet(eventSize); + } else { + // Add the event to the current batch + currentJsonBatch.append(eventJson); + } + } + + // Add remaining items + if (currentBatchSize.get() > 0) { + chunks.add(currentJsonBatch.toString()); + } + + return chunks; + } + + /** Send a message to the Event Hub */ + private boolean sendEventToHub( + String jsonPayload, + EventHubProducerClient eventHubClient, + FlussoRendicontazione flusso, + ExecutionContext context) { + EventData eventData = new EventData(jsonPayload); + eventData + .getProperties() + .put(SERVICE_IDENTIFIER, flusso.getMetadata().getOrDefault(SERVICE_IDENTIFIER, "NA")); + + EventDataBatch eventBatch = eventHubClient.createBatch(); + if (!eventBatch.tryAdd(eventData)) { + context + .getLogger() + .warning( + () -> + String.format( + "Failed to add event to batch for flow ID: %s", + flusso.getIdentificativoFlusso())); + return false; + } + + try { + eventHubClient.send(eventBatch); + return true; + } catch (Exception e) { + context + .getLogger() + .warning( + () -> + String.format( + "Failed to add event to batch for flow ID: %s. Details: %s", + flusso.getIdentificativoFlusso(), e.getMessage())); + return false; + } + } +} From 10c14cce26ecbe940585a3642d0730d8f90ad6a3 Mon Sep 17 00:00:00 2001 From: aacitelli Date: Tue, 25 Feb 2025 11:01:26 +0100 Subject: [PATCH 3/5] [PAGOPA-2619] FDR1 recovery blob: sonar issue --- .../gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java b/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java index de901c9..0a5c938 100644 --- a/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java +++ b/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java @@ -63,7 +63,7 @@ class BlobProcessingFunctionTest { private BlobProcessingFunction function; @BeforeEach - public void setup() { + void setup() { function = new BlobProcessingFunction(eventHubClientFlowTx, eventHubClientReportedIUV); lenient().when(eventHubClientFlowTx.createBatch()).thenReturn(mock(EventDataBatch.class)); lenient().when(eventHubClientReportedIUV.createBatch()).thenReturn(mock(EventDataBatch.class)); From ad39ff8b7d007cbc702120880a87d4f99fe0189c Mon Sep 17 00:00:00 2001 From: aacitelli Date: Tue, 25 Feb 2025 11:01:26 +0100 Subject: [PATCH 4/5] [PAGOPA-2619] FDR1 recovery blob: service port update --- helm/values-dev.yaml | 2 +- helm/values-prod.yaml | 2 +- helm/values-uat.yaml | 2 +- .../gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/helm/values-dev.yaml b/helm/values-dev.yaml index 52f4c7d..16e0254 100644 --- a/helm/values-dev.yaml +++ b/helm/values-dev.yaml @@ -55,7 +55,7 @@ microservice-chart: create: true host: "weudev.fdr.internal.dev.platform.pagopa.it" path: /pagopa-fdr-to-event-hub-service/(.*) - servicePort: 80 + servicePort: 8080 serviceAccount: name: "fdr-workload-identity" azure: diff --git a/helm/values-prod.yaml b/helm/values-prod.yaml index b7e001e..485b72d 100644 --- a/helm/values-prod.yaml +++ b/helm/values-prod.yaml @@ -55,7 +55,7 @@ microservice-chart: create: true host: "weuprod.fdr.internal.platform.pagopa.it" path: /pagopa-fdr-to-event-hub-service/(.*) - servicePort: 80 + servicePort: 8080 serviceAccount: name: "fdr-workload-identity" azure: diff --git a/helm/values-uat.yaml b/helm/values-uat.yaml index ea30feb..e88171a 100644 --- a/helm/values-uat.yaml +++ b/helm/values-uat.yaml @@ -55,7 +55,7 @@ microservice-chart: create: true host: "weuuat.fdr.internal.uat.platform.pagopa.it" path: /pagopa-fdr-to-event-hub-service/(.*) - servicePort: 80 + servicePort: 8080 serviceAccount: name: "fdr-workload-identity" azure: diff --git a/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java b/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java index de901c9..0a5c938 100644 --- a/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java +++ b/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java @@ -63,7 +63,7 @@ class BlobProcessingFunctionTest { private BlobProcessingFunction function; @BeforeEach - public void setup() { + void setup() { function = new BlobProcessingFunction(eventHubClientFlowTx, eventHubClientReportedIUV); lenient().when(eventHubClientFlowTx.createBatch()).thenReturn(mock(EventDataBatch.class)); lenient().when(eventHubClientReportedIUV.createBatch()).thenReturn(mock(EventDataBatch.class)); From f56ffddf865fe523bdf3fdb54eeb5d93937b1437 Mon Sep 17 00:00:00 2001 From: pagopa-github-bot Date: Tue, 25 Feb 2025 10:42:22 +0000 Subject: [PATCH 5/5] Bump to version 0.0.4-1-PAGOPA-2619-FDR1-recovery-blob [skip ci] --- helm/Chart.yaml | 4 ++-- helm/values-dev.yaml | 2 +- helm/values-prod.yaml | 2 +- helm/values-uat.yaml | 2 +- pom.xml | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/helm/Chart.yaml b/helm/Chart.yaml index e4c2b4b..2851388 100644 --- a/helm/Chart.yaml +++ b/helm/Chart.yaml @@ -2,8 +2,8 @@ apiVersion: v2 name: pagopa-fdr-to-event-hub description: Microservice fdr to event hub type: application -version: 0.45.0 -appVersion: 0.0.4 +version: 0.46.0 +appVersion: 0.0.4-1-PAGOPA-2619-FDR1-recovery-blob dependencies: - name: microservice-chart version: 7.1.1 diff --git a/helm/values-dev.yaml b/helm/values-dev.yaml index 16e0254..8ebbda7 100644 --- a/helm/values-dev.yaml +++ b/helm/values-dev.yaml @@ -4,7 +4,7 @@ microservice-chart: fullnameOverride: "pagopa-fdr-2-event-hub" image: repository: ghcr.io/pagopa/pagopa-fdr-2-event-hub - tag: "0.0.4" + tag: "0.0.4-1-PAGOPA-2619-FDR1-recovery-blob" pullPolicy: Always # https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs # livenessProbe: diff --git a/helm/values-prod.yaml b/helm/values-prod.yaml index 485b72d..0551fe5 100644 --- a/helm/values-prod.yaml +++ b/helm/values-prod.yaml @@ -4,7 +4,7 @@ microservice-chart: fullnameOverride: "pagopa-fdr-2-event-hub" image: repository: ghcr.io/pagopa/pagopa-fdr-2-event-hub - tag: "0.0.4" + tag: "0.0.4-1-PAGOPA-2619-FDR1-recovery-blob" pullPolicy: Always # https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs # livenessProbe: diff --git a/helm/values-uat.yaml b/helm/values-uat.yaml index e88171a..0cace47 100644 --- a/helm/values-uat.yaml +++ b/helm/values-uat.yaml @@ -4,7 +4,7 @@ microservice-chart: fullnameOverride: "pagopa-fdr-2-event-hub" image: repository: ghcr.io/pagopa/pagopa-fdr-2-event-hub - tag: "0.0.4" + tag: "0.0.4-1-PAGOPA-2619-FDR1-recovery-blob" pullPolicy: Always # https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs # livenessProbe: diff --git a/pom.xml b/pom.xml index cd21dbc..e1abc5a 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ it.gov.pagopa.fdr.to.eventhub pagopa-fdr-to-event-hub - 0.0.4 + 0.0.4-1-PAGOPA-2619-FDR1-recovery-blob jar FDR To Event Hub