diff --git a/.github/workflows/check_pr.yml b/.github/workflows/check_pr.yml
index 72e015c..acc40c3 100644
--- a/.github/workflows/check_pr.yml
+++ b/.github/workflows/check_pr.yml
@@ -28,6 +28,7 @@ jobs:
with:
configuration-path: '.github/auto_assign.yml'
+
check_labels:
name: Check Required Labels
# The type of runner that the job will run on
@@ -41,9 +42,28 @@ jobs:
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
- const script = require('./.github/workflows/github_scripts/check_required_labels.js')
- script({github, context, core})
-
+ var comments = await github.rest.issues.listComments({
+ issue_number: context.issue.number,
+ owner: context.repo.owner,
+ repo: context.repo.repo
+ });
+ for (const comment of comments.data) {
+ if (comment.body.includes('This pull request does not contain a valid label')){
+ github.rest.issues.deleteComment({
+ issue_number: context.issue.number,
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ comment_id: comment.id
+ })
+ }
+ }
+ github.rest.issues.createComment({
+ issue_number: context.issue.number,
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ body: 'This pull request does not contain a valid label. Please add one of the following labels: `[major, minor, patch, patch, skip]`'
+ })
+ core.setFailed('Missing required labels')
formatter:
name: Formatter
diff --git a/.github/workflows/code_review.yml b/.github/workflows/code_review.yml
index 38139e3..8be2172 100644
--- a/.github/workflows/code_review.yml
+++ b/.github/workflows/code_review.yml
@@ -41,7 +41,7 @@ jobs:
sonar_token: ${{ secrets.SONAR_TOKEN }}
project_key: ${{env.PROJECT_KEY}}
java_version: 17
- coverage_exclusions: "**/config/*,**/*Mock*,**/models/**,**/clients/model/**,**/entity/*,**/exception/**,**/App.java"
+ coverage_exclusions: "**/config/*,**/*Mock*,**/models/**,**/clients/model/**,**/entity/*,**/exception/**,**/App.java,**/wrapper/BlobServiceClientWrapperImpl.java"
cpd_exclusions: "**/models/**,**/clients/model/**,**/entity/*"
coverage_report_path: "./target/jacoco-report/jacoco.xml"
diff --git a/helm/values-prod.yaml b/helm/values-prod.yaml
index 7b7e32a..042725a 100644
--- a/helm/values-prod.yaml
+++ b/helm/values-prod.yaml
@@ -55,7 +55,7 @@ microservice-chart:
create: true
host: "weuprod.fdr.internal.platform.pagopa.it"
path: /pagopa-fdr-to-event-hub-service/(.*)
- servicePort: 80
+ servicePort: 8080
serviceAccount:
name: "fdr-workload-identity"
azure:
diff --git a/helm/values-uat.yaml b/helm/values-uat.yaml
index d9c44d1..03179f8 100644
--- a/helm/values-uat.yaml
+++ b/helm/values-uat.yaml
@@ -55,7 +55,7 @@ microservice-chart:
create: true
host: "weuuat.fdr.internal.uat.platform.pagopa.it"
path: /pagopa-fdr-to-event-hub-service/(.*)
- servicePort: 80
+ servicePort: 8080
serviceAccount:
name: "fdr-workload-identity"
azure:
diff --git a/host.json b/host.json
index 02736ff..b117454 100644
--- a/host.json
+++ b/host.json
@@ -33,6 +33,7 @@
"Host.Results": "Error",
"Function.ProcessFDR1BlobFiles": "Information",
"Function.ProcessFDR3BlobFiles": "Information",
+ "Function.HTTPBlobRecovery": "Information",
"Host.Aggregator": "Error"
},
"applicationInsights": {
diff --git a/pom.xml b/pom.xml
index 958c59e..a9ddbcd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,12 +21,15 @@
5.20.0
+ 12.29.0
com.microsoft.azure-20220215182005862
2.18.2
3.2.2
1.18.36
5.9.3
+ 5.15.2
+ 2.1.7
@@ -36,12 +39,16 @@
azure-functions-java-library
${azure.functions.java.library.version}
-
com.azure
azure-messaging-eventhubs
${azure.messaging.eventhubs.version}
+
+ com.azure
+ azure-storage-blob
+ ${azure.storage.blob.version}
+
@@ -49,13 +56,11 @@
jackson-databind
${jackson.version}
-
com.fasterxml.jackson.dataformat
jackson-dataformat-xml
${jackson.version}
-
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
@@ -68,7 +73,6 @@
modelmapper
${modelmapper.version}
-
org.projectlombok
lombok
@@ -95,18 +99,22 @@
${junit.version}
test
-
org.mockito
mockito-core
- 5.15.2
+ ${mockito.version}
test
-
org.mockito
mockito-junit-jupiter
- 5.15.2
+ ${mockito.version}
+ test
+
+
+ uk.org.webcompere
+ system-stubs-jupiter
+ ${system.stubs.version}
test
diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java
index 44ea9ad..5ac2286 100644
--- a/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java
+++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java
@@ -1,76 +1,39 @@
package it.gov.pagopa.fdr.to.eventhub;
-import com.azure.core.amqp.AmqpRetryMode;
-import com.azure.core.amqp.AmqpRetryOptions;
-import com.azure.messaging.eventhubs.EventData;
-import com.azure.messaging.eventhubs.EventDataBatch;
-import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.json.JsonMapper;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.BindingName;
import com.microsoft.azure.functions.annotation.BlobTrigger;
import com.microsoft.azure.functions.annotation.FunctionName;
import it.gov.pagopa.fdr.to.eventhub.exception.EventHubException;
-import it.gov.pagopa.fdr.to.eventhub.mapper.FlussoRendicontazioneMapper;
import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione;
-import it.gov.pagopa.fdr.to.eventhub.model.eventhub.FlowTxEventModel;
-import it.gov.pagopa.fdr.to.eventhub.model.eventhub.ReportedIUVEventModel;
-import it.gov.pagopa.fdr.to.eventhub.parser.FDR1XmlSAXParser;
+import it.gov.pagopa.fdr.to.eventhub.util.CommonUtil;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.GZIPInputStream;
-import javax.xml.parsers.ParserConfigurationException;
-import org.xml.sax.SAXException;
+import lombok.Getter;
public class BlobProcessingFunction {
- private static final String LOG_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
- private static final String SERVICE_IDENTIFIER = "serviceIdentifier";
private final String fdr1Container =
System.getenv().getOrDefault("BLOB_STORAGE_FDR1_CONTAINER", "fdr1-flows");
private final String fdr3Container =
System.getenv().getOrDefault("BLOB_STORAGE_FDR3_CONTAINER", "fdr3-flows");
- private final EventHubProducerClient eventHubClientFlowTx;
- private final EventHubProducerClient eventHubClientReportedIUV;
+ @Getter private final EventHubProducerClient eventHubClientFlowTx;
+ @Getter private final EventHubProducerClient eventHubClientReportedIUV;
public BlobProcessingFunction() {
this.eventHubClientFlowTx =
- new EventHubClientBuilder()
- .connectionString(
- System.getenv("EVENT_HUB_FLOWTX_CONNECTION_STRING"),
- System.getenv("EVENT_HUB_FLOWTX_NAME"))
- .retryOptions(
- new AmqpRetryOptions()
- .setMaxRetries(3) // Maximum number of
- // attempts
- .setDelay(Duration.ofSeconds(2)) // Delay between attempts
- .setMode(AmqpRetryMode.EXPONENTIAL)) // Backoff strategy
- .buildProducerClient();
+ CommonUtil.createEventHubClient(
+ System.getenv("EVENT_HUB_FLOWTX_CONNECTION_STRING"),
+ System.getenv("EVENT_HUB_FLOWTX_NAME"));
this.eventHubClientReportedIUV =
- new EventHubClientBuilder()
- .connectionString(
- System.getenv("EVENT_HUB_REPORTEDIUV_CONNECTION_STRING"),
- System.getenv("EVENT_HUB_REPORTEDIUV_NAME"))
- .retryOptions(
- new AmqpRetryOptions()
- .setMaxRetries(3)
- .setDelay(Duration.ofSeconds(2))
- .setMode(AmqpRetryMode.EXPONENTIAL))
- .buildProducerClient();
+ CommonUtil.createEventHubClient(
+ System.getenv("EVENT_HUB_REPORTEDIUV_CONNECTION_STRING"),
+ System.getenv("EVENT_HUB_REPORTEDIUV_NAME"));
}
// Constructor to inject the Event Hub clients
@@ -94,7 +57,7 @@ public synchronized void processFDR1BlobFiles(
final ExecutionContext context) {
// checks for the presence of the necessary metadata
- if (!validateBlobMetadata(blobMetadata)) {
+ if (!CommonUtil.validateBlobMetadata(blobMetadata)) {
context
.getLogger()
.warning(
@@ -107,7 +70,7 @@ public synchronized void processFDR1BlobFiles(
}
// verify that the file is present and that it is a compressed file
- boolean isValidGzipFile = isGzip(content);
+ boolean isValidGzipFile = CommonUtil.isGzip(content);
context
.getLogger()
@@ -115,15 +78,16 @@ public synchronized void processFDR1BlobFiles(
() ->
String.format(
"[FDR1] Triggered at: %s for Blob container: %s, name: %s, size in bytes: %d",
- LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)),
+ LocalDateTime.now()
+ .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)),
fdr1Container,
blobName,
content.length));
try (InputStream decompressedStream =
- isValidGzipFile ? decompressGzip(content) : new ByteArrayInputStream(content)) {
+ isValidGzipFile ? CommonUtil.decompressGzip(content) : new ByteArrayInputStream(content)) {
- FlussoRendicontazione flusso = parseXml(decompressedStream);
+ FlussoRendicontazione flusso = CommonUtil.parseXml(decompressedStream);
context
.getLogger()
@@ -132,7 +96,8 @@ public synchronized void processFDR1BlobFiles(
String.format(
"[FDR1] Parsed Finished at: %s for Blob container: %s, name: %s, size in"
+ " bytes: %d",
- LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)),
+ LocalDateTime.now()
+ .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)),
fdr1Container,
blobName,
content.length));
@@ -140,7 +105,9 @@ public synchronized void processFDR1BlobFiles(
flusso.setMetadata(blobMetadata);
// Waits for confirmation of sending the entire flow to the Event Hub
- boolean eventBatchSent = processXmlBlobAndSendToEventHub(flusso, context);
+ boolean eventBatchSent =
+ CommonUtil.processXmlBlobAndSendToEventHub(
+ eventHubClientFlowTx, eventHubClientReportedIUV, flusso, context);
if (!eventBatchSent) {
throw new EventHubException(
String.format(
@@ -155,7 +122,8 @@ public synchronized void processFDR1BlobFiles(
String.format(
"[FDR1] Execution Finished at: %s for Blob container: %s, name: %s, size in"
+ " bytes: %d",
- LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)),
+ LocalDateTime.now()
+ .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)),
fdr1Container,
blobName,
content.length));
@@ -198,171 +166,10 @@ public void processFDR3BlobFiles(
String.format(
"[FDR3] Execution Finished at: %s for Blob container: %s, name: %s, size: %d"
+ " bytes",
- LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)),
+ LocalDateTime.now()
+ .format(DateTimeFormatter.ofPattern(CommonUtil.LOG_DATETIME_PATTERN)),
fdr1Container,
blobName,
content.length));
}
-
- private boolean isGzip(byte[] content) {
- if (content == null || content.length == 0) {
- throw new IllegalArgumentException("Invalid input data for decompression: empty file");
- }
- return content.length > 2 && content[0] == (byte) 0x1F && content[1] == (byte) 0x8B;
- }
-
- private boolean validateBlobMetadata(Map blobMetadata) {
- if (blobMetadata == null
- || blobMetadata.isEmpty()
- || !blobMetadata.containsKey("sessionId")
- || !blobMetadata.containsKey("insertedTimestamp")) {
- throw new IllegalArgumentException(
- "Invalid blob metadata: sessionId or insertedTimestamp is missing.");
- }
- return !("false".equalsIgnoreCase(blobMetadata.get("elaborate")));
- }
-
- private InputStream decompressGzip(byte[] compressedContent) throws IOException {
- return new GZIPInputStream(new ByteArrayInputStream(compressedContent));
- }
-
- private FlussoRendicontazione parseXml(InputStream xmlStream)
- throws ParserConfigurationException, SAXException, IOException {
- return FDR1XmlSAXParser.parseXmlStream(xmlStream);
- }
-
- private boolean processXmlBlobAndSendToEventHub(
- FlussoRendicontazione flussoRendicontazione, ExecutionContext context) {
- try {
- // Convert FlussoRendicontazione to event models
- FlowTxEventModel flowEvent =
- FlussoRendicontazioneMapper.toFlowTxEventList(flussoRendicontazione);
- List reportedIUVEventList =
- FlussoRendicontazioneMapper.toReportedIUVEventList(flussoRendicontazione);
-
- // Serialize the objects to JSON
- JsonMapper objectMapper =
- JsonMapper.builder()
- .addModule(new JavaTimeModule())
- .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
- .build();
-
- String flowEventJson = objectMapper.writeValueAsString(flowEvent);
-
- // Break the list into smaller batches to avoid overshooting limit
- List reportedIUVEventJsonChunks = splitIntoChunks(reportedIUVEventList, objectMapper);
-
- context
- .getLogger()
- .fine(
- () ->
- String.format(
- "Chunk splitting process completed at: %s for flow ID: %s. Total number of"
- + " chunks: %d",
- LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)),
- flussoRendicontazione.getIdentificativoFlusso(),
- reportedIUVEventJsonChunks.size()));
-
- boolean flowEventSent =
- sendEventToHub(flowEventJson, eventHubClientFlowTx, flussoRendicontazione, context);
- boolean allEventChunksSent = true;
-
- for (String chunk : reportedIUVEventJsonChunks) {
- if (!sendEventToHub(chunk, eventHubClientReportedIUV, flussoRendicontazione, context)) {
- allEventChunksSent = false;
- break;
- }
- }
-
- return flowEventSent && allEventChunksSent;
-
- } catch (Exception e) {
- // Log the exception with context
- String errorMessage =
- String.format(
- "Error processing or sending data to event hub: %s. Details: %s",
- flussoRendicontazione.getIdentificativoFlusso(), e.getMessage());
- context.getLogger().severe(() -> errorMessage);
-
- return false;
- }
- }
-
- /** Divides the event list into smaller JSON blocks (to avoid exceeding 1MB) */
- private List splitIntoChunks(
- List eventList, JsonMapper objectMapper)
- throws JsonProcessingException {
-
- List chunks = new ArrayList<>();
- List tempBatch = new ArrayList<>();
- final int MAX_CHUNK_SIZE_BYTES = 900 * 1024; // 900 KB for security
-
- StringBuilder currentJsonBatch = new StringBuilder();
- AtomicInteger currentBatchSize = new AtomicInteger(0);
-
- for (ReportedIUVEventModel event : eventList) {
- tempBatch.add(event);
- String eventJson = objectMapper.writeValueAsString(event);
- int eventSize = eventJson.getBytes(StandardCharsets.UTF_8).length;
-
- if (currentBatchSize.addAndGet(eventSize) > MAX_CHUNK_SIZE_BYTES) {
- // If the limit is exceed, add the current batch and reset it
- chunks.add(currentJsonBatch.toString());
- currentJsonBatch.setLength(0); // Reset the StringBuilder
- currentBatchSize.set(0); // Reset the batch size
- tempBatch.clear(); // Clear the current batch
- tempBatch.add(event); // Start with the current event
- currentJsonBatch.append(objectMapper.writeValueAsString(tempBatch));
- currentBatchSize.addAndGet(eventSize);
- } else {
- // Add the event to the current batch
- currentJsonBatch.append(eventJson);
- }
- }
-
- // Add remaining items
- if (currentBatchSize.get() > 0) {
- chunks.add(currentJsonBatch.toString());
- }
-
- return chunks;
- }
-
- /** Send a message to the Event Hub */
- private boolean sendEventToHub(
- String jsonPayload,
- EventHubProducerClient eventHubClient,
- FlussoRendicontazione flusso,
- ExecutionContext context) {
- EventData eventData = new EventData(jsonPayload);
- eventData
- .getProperties()
- .put(SERVICE_IDENTIFIER, flusso.getMetadata().getOrDefault(SERVICE_IDENTIFIER, "NA"));
-
- EventDataBatch eventBatch = eventHubClient.createBatch();
- if (!eventBatch.tryAdd(eventData)) {
- context
- .getLogger()
- .warning(
- () ->
- String.format(
- "Failed to add event to batch for flow ID: %s",
- flusso.getIdentificativoFlusso()));
- return false;
- }
-
- try {
- eventHubClient.send(eventBatch);
- return true;
- } catch (Exception e) {
- context
- .getLogger()
- .warning(
- () ->
- String.format(
- "Failed to add event to batch for flow ID: %s. Details: %s",
- flusso.getIdentificativoFlusso(), e.getMessage()));
- return false;
- }
- }
}
diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java
new file mode 100644
index 0000000..26e1dc1
--- /dev/null
+++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunction.java
@@ -0,0 +1,178 @@
+package it.gov.pagopa.fdr.to.eventhub;
+
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.microsoft.azure.functions.ExecutionContext;
+import com.microsoft.azure.functions.HttpMethod;
+import com.microsoft.azure.functions.HttpRequestMessage;
+import com.microsoft.azure.functions.HttpResponseMessage;
+import com.microsoft.azure.functions.HttpStatus;
+import com.microsoft.azure.functions.annotation.AuthorizationLevel;
+import com.microsoft.azure.functions.annotation.FunctionName;
+import com.microsoft.azure.functions.annotation.HttpTrigger;
+import it.gov.pagopa.fdr.to.eventhub.model.BlobFileData;
+import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione;
+import it.gov.pagopa.fdr.to.eventhub.util.CommonUtil;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Objects;
+import java.util.Optional;
+import lombok.Getter;
+
+/** Azure Functions with Azure Http trigger. */
+public class HttpBlobRecoveryFunction {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final String CONTENT_TYPE = "Content-Type";
+ private static final String APPLICATION_JSON = "application/json";
+ private static final String JSON_FILENAME = "fileName";
+ private static final String JSON_CONTAINER = "container";
+
+ @Getter private final EventHubProducerClient eventHubClientFlowTx;
+ @Getter private final EventHubProducerClient eventHubClientReportedIUV;
+
+ public HttpBlobRecoveryFunction() {
+ this.eventHubClientFlowTx =
+ CommonUtil.createEventHubClient(
+ System.getenv("EVENT_HUB_FLOWTX_CONNECTION_STRING"),
+ System.getenv("EVENT_HUB_FLOWTX_NAME"));
+
+ this.eventHubClientReportedIUV =
+ CommonUtil.createEventHubClient(
+ System.getenv("EVENT_HUB_REPORTEDIUV_CONNECTION_STRING"),
+ System.getenv("EVENT_HUB_REPORTEDIUV_NAME"));
+ }
+
+ public HttpBlobRecoveryFunction(
+ EventHubProducerClient eventHubClientFlowTx,
+ EventHubProducerClient eventHubClientReportedIUV) {
+ this.eventHubClientFlowTx = eventHubClientFlowTx;
+ this.eventHubClientReportedIUV = eventHubClientReportedIUV;
+ }
+
+ @FunctionName("HTTPBlobRecovery")
+ public HttpResponseMessage run(
+ @HttpTrigger(
+ name = "HTTPBlobRecoveryTrigger",
+ methods = {HttpMethod.POST},
+ route = "notify/fdr",
+ authLevel = AuthorizationLevel.ANONYMOUS)
+ HttpRequestMessage> request,
+ final ExecutionContext context) {
+
+ // Check if body is present
+ Optional requestBody = request.getBody();
+ if (!requestBody.isPresent()) {
+ return badRequest(request, "Missing request body");
+ }
+
+ try {
+ JsonNode jsonNode = objectMapper.readTree(requestBody.get());
+ String fileName =
+ Optional.ofNullable(jsonNode.get(JSON_FILENAME)).map(JsonNode::asText).orElse(null);
+ String container =
+ Optional.ofNullable(jsonNode.get(JSON_CONTAINER)).map(JsonNode::asText).orElse(null);
+
+ if (fileName == null || container == null) {
+ return badRequest(request, "Missing required fields: fileName, container");
+ }
+
+ context
+ .getLogger()
+ .info(
+ () ->
+ String.format(
+ "[HTTP FDR] Triggered at: %s for Blob container: %s, name: %s",
+ LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME),
+ container,
+ fileName));
+
+ BlobFileData fileData =
+ CommonUtil.getBlobFile("FDR_SA_CONNECTION_STRING", container, fileName, context);
+
+ if (Objects.isNull(fileData)) {
+ return notFound(
+ request, String.format("File %s not found in container %s", fileName, container));
+ }
+
+ if (!CommonUtil.validateBlobMetadata(fileData.getMetadata())) {
+ return unprocessableEntity(
+ request,
+ String.format(
+ "The file %s in container %s is missing required metadata", fileName, container));
+ }
+
+ boolean isValidGzipFile = CommonUtil.isGzip(fileData.getFileContent());
+
+ try (InputStream decompressedStream =
+ isValidGzipFile
+ ? CommonUtil.decompressGzip(fileData.getFileContent())
+ : new ByteArrayInputStream(fileData.getFileContent())) {
+
+ FlussoRendicontazione flusso = CommonUtil.parseXml(decompressedStream);
+ flusso.setMetadata(fileData.getMetadata());
+
+ boolean eventBatchSent =
+ CommonUtil.processXmlBlobAndSendToEventHub(
+ eventHubClientFlowTx, eventHubClientReportedIUV, flusso, context);
+
+ if (!eventBatchSent) {
+ return serviceUnavailable(
+ request,
+ String.format(
+ "EventHub failed to confirm batch processing for flow ID %s [file %s, container"
+ + " %s]",
+ flusso.getIdentificativoFlusso(), fileName, container));
+ }
+ }
+
+ return ok(
+ request,
+ String.format(
+ "Processed recovery request for file: %s in container: %s", fileName, container));
+
+ } catch (IOException e) {
+ return badRequest(request, "Invalid JSON format");
+ } catch (Exception e) {
+ context.getLogger().severe("[HTTP FDR] Unexpected error: " + e.getMessage());
+ return serverError(request, "Internal Server Error");
+ }
+ }
+
+ private HttpResponseMessage ok(HttpRequestMessage> request, String message) {
+ return response(request, HttpStatus.OK, message);
+ }
+
+ private HttpResponseMessage badRequest(HttpRequestMessage> request, String message) {
+ return response(request, HttpStatus.BAD_REQUEST, message);
+ }
+
+ private HttpResponseMessage notFound(HttpRequestMessage> request, String message) {
+ return response(request, HttpStatus.NOT_FOUND, message);
+ }
+
+ private HttpResponseMessage unprocessableEntity(HttpRequestMessage> request, String message) {
+ return response(request, HttpStatus.UNPROCESSABLE_ENTITY, message);
+ }
+
+ private HttpResponseMessage serviceUnavailable(HttpRequestMessage> request, String message) {
+ return response(request, HttpStatus.SERVICE_UNAVAILABLE, message);
+ }
+
+ private HttpResponseMessage serverError(HttpRequestMessage> request, String message) {
+ return response(request, HttpStatus.INTERNAL_SERVER_ERROR, message);
+ }
+
+ private HttpResponseMessage response(
+ HttpRequestMessage> request, HttpStatus status, String message) {
+ return request
+ .createResponseBuilder(status)
+ .header(CONTENT_TYPE, APPLICATION_JSON)
+ .body("{\"message\": \"" + message + "\"}")
+ .build();
+ }
+}
diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java
new file mode 100644
index 0000000..0019545
--- /dev/null
+++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/model/BlobFileData.java
@@ -0,0 +1,17 @@
+package it.gov.pagopa.fdr.to.eventhub.model;
+
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class BlobFileData {
+
+ private byte[] fileContent;
+ private Map metadata;
+}
diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java
new file mode 100644
index 0000000..4fb64a1
--- /dev/null
+++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtil.java
@@ -0,0 +1,252 @@
+package it.gov.pagopa.fdr.to.eventhub.util;
+
+import com.azure.core.amqp.AmqpRetryMode;
+import com.azure.core.amqp.AmqpRetryOptions;
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.microsoft.azure.functions.ExecutionContext;
+import it.gov.pagopa.fdr.to.eventhub.mapper.FlussoRendicontazioneMapper;
+import it.gov.pagopa.fdr.to.eventhub.model.BlobFileData;
+import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione;
+import it.gov.pagopa.fdr.to.eventhub.model.eventhub.FlowTxEventModel;
+import it.gov.pagopa.fdr.to.eventhub.model.eventhub.ReportedIUVEventModel;
+import it.gov.pagopa.fdr.to.eventhub.parser.FDR1XmlSAXParser;
+import it.gov.pagopa.fdr.to.eventhub.wrapper.BlobServiceClientWrapper;
+import it.gov.pagopa.fdr.to.eventhub.wrapper.BlobServiceClientWrapperImpl;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.GZIPInputStream;
+import javax.xml.parsers.ParserConfigurationException;
+import lombok.Setter;
+import lombok.experimental.UtilityClass;
+import org.xml.sax.SAXException;
+
+@UtilityClass
+public class CommonUtil {
+
+ public static final String LOG_DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+
+ private static final String SERVICE_IDENTIFIER = "serviceIdentifier";
+
+ @Setter
+ private BlobServiceClientWrapper blobServiceClientWrapper = new BlobServiceClientWrapperImpl();
+
+ public static EventHubProducerClient createEventHubClient(
+ String connectionString, String eventHubName) {
+ return new EventHubClientBuilder()
+ .connectionString(connectionString, eventHubName)
+ .retryOptions(
+ new AmqpRetryOptions()
+ .setMaxRetries(3)
+ .setDelay(Duration.ofSeconds(2))
+ .setMode(AmqpRetryMode.EXPONENTIAL))
+ .buildProducerClient();
+ }
+
+ public static boolean validateBlobMetadata(Map blobMetadata) {
+ if (blobMetadata == null
+ || blobMetadata.isEmpty()
+ || !blobMetadata.containsKey("sessionId")
+ || !blobMetadata.containsKey("insertedTimestamp")) {
+ throw new IllegalArgumentException(
+ "Invalid blob metadata: sessionId or insertedTimestamp is missing.");
+ }
+ return !("false".equalsIgnoreCase(blobMetadata.get("elaborate")));
+ }
+
+ public static boolean isGzip(byte[] content) {
+ if (content == null || content.length == 0) {
+ throw new IllegalArgumentException("Invalid input data for decompression: empty file");
+ }
+ return content.length > 2 && content[0] == (byte) 0x1F && content[1] == (byte) 0x8B;
+ }
+
+ public static InputStream decompressGzip(byte[] compressedContent) throws IOException {
+ return new GZIPInputStream(new ByteArrayInputStream(compressedContent));
+ }
+
+ public static FlussoRendicontazione parseXml(InputStream xmlStream)
+ throws ParserConfigurationException, SAXException, IOException {
+ return FDR1XmlSAXParser.parseXmlStream(xmlStream);
+ }
+
+ public static BlobFileData getBlobFile(
+ String storageEnvVar, String containerName, String blobName, ExecutionContext context) {
+ try {
+ BlobContainerClient containerClient =
+ blobServiceClientWrapper.getBlobContainerClient(storageEnvVar, containerName);
+ BlobClient blobClient = containerClient.getBlobClient(blobName);
+
+ if (Boolean.FALSE.equals(blobClient.exists())) {
+ context.getLogger().severe(() -> "Blob not found: " + blobName);
+ return null;
+ }
+
+ Map metadata = blobClient.getProperties().getMetadata();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ blobClient.downloadStream(outputStream);
+
+ return new BlobFileData(outputStream.toByteArray(), metadata);
+
+ } catch (Exception e) {
+ context.getLogger().severe("Error accessing blob: " + e.getMessage());
+ return null;
+ }
+ }
+
+ public static boolean processXmlBlobAndSendToEventHub(
+ final EventHubProducerClient eventHubClientFlowTx,
+ final EventHubProducerClient eventHubClientReportedIUV,
+ FlussoRendicontazione flussoRendicontazione,
+ ExecutionContext context) {
+ try {
+ // Convert FlussoRendicontazione to event models
+ FlowTxEventModel flowEvent =
+ FlussoRendicontazioneMapper.toFlowTxEventList(flussoRendicontazione);
+ List reportedIUVEventList =
+ FlussoRendicontazioneMapper.toReportedIUVEventList(flussoRendicontazione);
+
+ // Serialize the objects to JSON
+ JsonMapper objectMapper =
+ JsonMapper.builder()
+ .addModule(new JavaTimeModule())
+ .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+ .build();
+
+ String flowEventJson = objectMapper.writeValueAsString(flowEvent);
+
+ // Break the list into smaller batches to avoid overshooting limit
+ List reportedIUVEventJsonChunks = splitIntoChunks(reportedIUVEventList, objectMapper);
+
+ context
+ .getLogger()
+ .fine(
+ () ->
+ String.format(
+ "Chunk splitting process completed at: %s for flow ID: %s. Total number of"
+ + " chunks: %d",
+ LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATETIME_PATTERN)),
+ flussoRendicontazione.getIdentificativoFlusso(),
+ reportedIUVEventJsonChunks.size()));
+
+ boolean flowEventSent =
+ sendEventToHub(flowEventJson, eventHubClientFlowTx, flussoRendicontazione, context);
+ boolean allEventChunksSent = true;
+
+ for (String chunk : reportedIUVEventJsonChunks) {
+ if (!sendEventToHub(chunk, eventHubClientReportedIUV, flussoRendicontazione, context)) {
+ allEventChunksSent = false;
+ break;
+ }
+ }
+
+ return flowEventSent && allEventChunksSent;
+
+ } catch (Exception e) {
+ // Log the exception with context
+ String errorMessage =
+ String.format(
+ "Error processing or sending data to event hub: %s. Details: %s",
+ flussoRendicontazione.getIdentificativoFlusso(), e.getMessage());
+ context.getLogger().severe(() -> errorMessage);
+
+ return false;
+ }
+ }
+
+ /** Divides the event list into smaller JSON blocks (to avoid exceeding 1MB) */
+ private List splitIntoChunks(
+ List eventList, JsonMapper objectMapper)
+ throws JsonProcessingException {
+
+ List chunks = new ArrayList<>();
+ List tempBatch = new ArrayList<>();
+ final int MAX_CHUNK_SIZE_BYTES = 900 * 1024; // 900 KB for security
+
+ StringBuilder currentJsonBatch = new StringBuilder();
+ AtomicInteger currentBatchSize = new AtomicInteger(0);
+
+ for (ReportedIUVEventModel event : eventList) {
+ tempBatch.add(event);
+ String eventJson = objectMapper.writeValueAsString(event);
+ int eventSize = eventJson.getBytes(StandardCharsets.UTF_8).length;
+
+ if (currentBatchSize.addAndGet(eventSize) > MAX_CHUNK_SIZE_BYTES) {
+ // If the limit is exceed, add the current batch and reset it
+ chunks.add(currentJsonBatch.toString());
+ currentJsonBatch.setLength(0); // Reset the StringBuilder
+ currentBatchSize.set(0); // Reset the batch size
+ tempBatch.clear(); // Clear the current batch
+ tempBatch.add(event); // Start with the current event
+ currentJsonBatch.append(objectMapper.writeValueAsString(tempBatch));
+ currentBatchSize.addAndGet(eventSize);
+ } else {
+ // Add the event to the current batch
+ currentJsonBatch.append(eventJson);
+ }
+ }
+
+ // Add remaining items
+ if (currentBatchSize.get() > 0) {
+ chunks.add(currentJsonBatch.toString());
+ }
+
+ return chunks;
+ }
+
+ /** Send a message to the Event Hub */
+ private boolean sendEventToHub(
+ String jsonPayload,
+ EventHubProducerClient eventHubClient,
+ FlussoRendicontazione flusso,
+ ExecutionContext context) {
+ EventData eventData = new EventData(jsonPayload);
+ eventData
+ .getProperties()
+ .put(SERVICE_IDENTIFIER, flusso.getMetadata().getOrDefault(SERVICE_IDENTIFIER, "NA"));
+
+ EventDataBatch eventBatch = eventHubClient.createBatch();
+ if (!eventBatch.tryAdd(eventData)) {
+ context
+ .getLogger()
+ .warning(
+ () ->
+ String.format(
+ "Failed to add event to batch for flow ID: %s",
+ flusso.getIdentificativoFlusso()));
+ return false;
+ }
+
+ try {
+ eventHubClient.send(eventBatch);
+ return true;
+ } catch (Exception e) {
+ context
+ .getLogger()
+ .warning(
+ () ->
+ String.format(
+ "Failed to add event to batch for flow ID: %s. Details: %s",
+ flusso.getIdentificativoFlusso(), e.getMessage()));
+ return false;
+ }
+ }
+}
diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/wrapper/BlobServiceClientWrapper.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/wrapper/BlobServiceClientWrapper.java
new file mode 100644
index 0000000..cf5b773
--- /dev/null
+++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/wrapper/BlobServiceClientWrapper.java
@@ -0,0 +1,9 @@
+package it.gov.pagopa.fdr.to.eventhub.wrapper;
+
+import com.azure.storage.blob.BlobContainerClient;
+
+// Interface needed for junit test (see:
+// https://github.com/Azure/azure-sdk-for-java/issues/5017#:~:text=As%20a%20consumer,of%20final%20classes.)
+public interface BlobServiceClientWrapper {
+ BlobContainerClient getBlobContainerClient(String storageEnvVar, String containerName);
+}
diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/wrapper/BlobServiceClientWrapperImpl.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/wrapper/BlobServiceClientWrapperImpl.java
new file mode 100644
index 0000000..54e028b
--- /dev/null
+++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/wrapper/BlobServiceClientWrapperImpl.java
@@ -0,0 +1,14 @@
+package it.gov.pagopa.fdr.to.eventhub.wrapper;
+
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+
+public class BlobServiceClientWrapperImpl implements BlobServiceClientWrapper {
+ @Override
+ public BlobContainerClient getBlobContainerClient(String storageEnvVar, String containerName) {
+ BlobServiceClient blobServiceClient =
+ new BlobServiceClientBuilder().connectionString(System.getenv(storageEnvVar)).buildClient();
+ return blobServiceClient.getBlobContainerClient(containerName);
+ }
+}
diff --git a/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java b/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java
index a2d3214..0a5c938 100644
--- a/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java
+++ b/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java
@@ -5,6 +5,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
@@ -22,9 +23,9 @@
import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione;
import it.gov.pagopa.fdr.to.eventhub.model.eventhub.FlowTxEventModel;
import it.gov.pagopa.fdr.to.eventhub.parser.FDR1XmlSAXParser;
+import it.gov.pagopa.fdr.to.eventhub.util.CommonUtil;
import it.gov.pagopa.fdr.to.eventhub.util.SampleContentFileUtil;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
@@ -34,16 +35,19 @@
import java.util.Random;
import java.util.function.Supplier;
import java.util.logging.Logger;
-import java.util.zip.GZIPOutputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, SystemStubsExtension.class})
class BlobProcessingFunctionTest {
@Mock private EventHubProducerClient eventHubClientFlowTx;
@@ -54,23 +58,17 @@ class BlobProcessingFunctionTest {
@Mock private Logger mockLogger;
+ @SystemStub private EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
private BlobProcessingFunction function;
@BeforeEach
- public void setup() {
+ void setup() {
function = new BlobProcessingFunction(eventHubClientFlowTx, eventHubClientReportedIUV);
lenient().when(eventHubClientFlowTx.createBatch()).thenReturn(mock(EventDataBatch.class));
lenient().when(eventHubClientReportedIUV.createBatch()).thenReturn(mock(EventDataBatch.class));
}
- private byte[] createGzipCompressedData(String input) throws Exception {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
- gzipOutputStream.write(input.getBytes(StandardCharsets.UTF_8));
- }
- return byteArrayOutputStream.toByteArray();
- }
-
@Test
void testFDR1BlobTriggerProcessing() throws Exception {
EventDataBatch mockEventDataBatch = mock(EventDataBatch.class);
@@ -80,7 +78,7 @@ void testFDR1BlobTriggerProcessing() throws Exception {
when(mockEventDataBatch.tryAdd(any(com.azure.messaging.eventhubs.EventData.class)))
.thenReturn(Boolean.TRUE);
String sampleXml = SampleContentFileUtil.getSampleXml("sample.xml");
- byte[] compressedData = createGzipCompressedData(sampleXml);
+ byte[] compressedData = SampleContentFileUtil.createGzipCompressedData(sampleXml);
Map metadata = new HashMap<>();
metadata.put("sessionId", "1234");
metadata.put("insertedTimestamp", "2025-01-30T10:15:30");
@@ -108,6 +106,9 @@ void testFDR1BlobTriggerProcessing() throws Exception {
// it is verified that the distinct on the dates has left the only expected
// date for all payments
assertEquals(1, flowEvent.getAllDates().size());
+
+ ArgumentCaptor> logCaptor = ArgumentCaptor.forClass(Supplier.class);
+ verify(mockLogger, atMost(2)).info(logCaptor.capture());
}
@Test
@@ -119,7 +120,7 @@ void testFDR1BigBlobTriggerProcessing() throws Exception {
when(mockEventDataBatch.tryAdd(any(com.azure.messaging.eventhubs.EventData.class)))
.thenReturn(Boolean.TRUE);
String sampleXml = SampleContentFileUtil.getSampleXml("big_sample.xml");
- byte[] compressedData = createGzipCompressedData(sampleXml);
+ byte[] compressedData = SampleContentFileUtil.createGzipCompressedData(sampleXml);
Map metadata = new HashMap<>();
metadata.put("sessionId", "1234");
metadata.put("insertedTimestamp", "2025-01-30T10:15:30");
@@ -163,7 +164,7 @@ void testFDR1ProcessBlobWithEmptyXml() throws Exception {
metadata.put("sessionId", "1234");
metadata.put("insertedTimestamp", "2025-01-30T10:15:30");
metadata.put("elaborate", "true");
- byte[] compressedData = createGzipCompressedData("");
+ byte[] compressedData = SampleContentFileUtil.createGzipCompressedData("");
function.processFDR1BlobFiles(compressedData, "sampleBlob", metadata, context);
verify(eventHubClientFlowTx, never()).send(any(ArrayList.class));
@@ -179,7 +180,7 @@ void testFDR1ProcessBlobWithMalformedXml() throws Exception {
metadata.put("sessionId", "1234");
metadata.put("insertedTimestamp", "2025-01-30T10:15:30");
metadata.put("elaborate", "true");
- byte[] compressedData = createGzipCompressedData("malformed");
+ byte[] compressedData = SampleContentFileUtil.createGzipCompressedData("malformed");
function.processFDR1BlobFiles(compressedData, "sampleBlob", metadata, context);
verify(eventHubClientFlowTx, never()).send(any(EventDataBatch.class));
@@ -273,7 +274,7 @@ void testFDR1BlobTriggerProcessingError() throws Exception {
new AmqpException(
Boolean.TRUE, "Failed to add event data", mock(AmqpErrorContext.class)));
String sampleXml = SampleContentFileUtil.getSampleXml("sample.xml");
- byte[] compressedData = createGzipCompressedData(sampleXml);
+ byte[] compressedData = SampleContentFileUtil.createGzipCompressedData(sampleXml);
Map metadata = new HashMap<>();
metadata.put("sessionId", "1234");
metadata.put("insertedTimestamp", "2025-01-30T10:15:30");
@@ -342,7 +343,7 @@ void testFDR1BigBlobTriggerProcessingCheckAllDates() throws Exception {
.when(() -> FDR1XmlSAXParser.parseXmlStream(any(InputStream.class)))
.thenReturn(flussoRendicontazione);
- byte[] compressedData = createGzipCompressedData(sampleXml);
+ byte[] compressedData = SampleContentFileUtil.createGzipCompressedData(sampleXml);
Map metadata = new HashMap<>();
metadata.put("sessionId", "1234");
metadata.put("insertedTimestamp", "2025-01-30T10:15:30");
@@ -367,7 +368,7 @@ void testFDR1BigBlobTriggerProcessingCheckAllDates() throws Exception {
void testFDR3BlobTriggerProcessing() throws Exception {
when(context.getLogger()).thenReturn(mockLogger);
String sampleXml = SampleContentFileUtil.getSampleXml("sample.xml");
- byte[] compressedData = createGzipCompressedData(sampleXml);
+ byte[] compressedData = SampleContentFileUtil.createGzipCompressedData(sampleXml);
Map metadata = new HashMap<>();
metadata.put("sessionId", "1234");
metadata.put("insertedTimestamp", "2025-01-30T10:15:30");
@@ -377,4 +378,39 @@ void testFDR3BlobTriggerProcessing() throws Exception {
ArgumentCaptor> logCaptor = ArgumentCaptor.forClass(Supplier.class);
verify(mockLogger, atLeastOnce()).info(logCaptor.capture());
}
+
+ @Test
+ void testConstructorInitializesClients() {
+
+ try (MockedStatic mockedCommonUtil = Mockito.mockStatic(CommonUtil.class)) {
+
+ // Simulate environment variables
+ environmentVariables.set("EVENT_HUB_FLOWTX_CONNECTION_STRING", "fake-flowtx-conn-string");
+ environmentVariables.set("EVENT_HUB_FLOWTX_NAME", "fake-flowtx-name");
+ environmentVariables.set(
+ "EVENT_HUB_REPORTEDIUV_CONNECTION_STRING", "fake-reportediuv-conn-string");
+ environmentVariables.set("EVENT_HUB_REPORTEDIUV_NAME", "fake-reportediuv-name");
+
+ EventHubProducerClient mockClient1 = mock(EventHubProducerClient.class);
+ EventHubProducerClient mockClient2 = mock(EventHubProducerClient.class);
+ mockedCommonUtil
+ .when(
+ () -> CommonUtil.createEventHubClient("fake-flowtx-conn-string", "fake-flowtx-name"))
+ .thenReturn(mockClient1);
+ mockedCommonUtil
+ .when(
+ () ->
+ CommonUtil.createEventHubClient(
+ "fake-reportediuv-conn-string", "fake-reportediuv-name"))
+ .thenReturn(mockClient2);
+
+ // Instantiate the class
+ BlobProcessingFunction blobProcessingFunction = new BlobProcessingFunction();
+
+ assertNotNull(blobProcessingFunction.getEventHubClientFlowTx());
+ assertNotNull(blobProcessingFunction.getEventHubClientReportedIUV());
+ assertEquals(mockClient1, blobProcessingFunction.getEventHubClientFlowTx());
+ assertEquals(mockClient2, blobProcessingFunction.getEventHubClientReportedIUV());
+ }
+ }
}
diff --git a/src/test/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunctionTest.java b/src/test/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunctionTest.java
new file mode 100644
index 0000000..b12073d
--- /dev/null
+++ b/src/test/java/it/gov/pagopa/fdr/to/eventhub/HttpBlobRecoveryFunctionTest.java
@@ -0,0 +1,246 @@
+package it.gov.pagopa.fdr.to.eventhub;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.microsoft.azure.functions.ExecutionContext;
+import com.microsoft.azure.functions.HttpRequestMessage;
+import com.microsoft.azure.functions.HttpResponseMessage;
+import com.microsoft.azure.functions.HttpStatus;
+import it.gov.pagopa.fdr.to.eventhub.model.BlobFileData;
+import it.gov.pagopa.fdr.to.eventhub.model.FlussoRendicontazione;
+import it.gov.pagopa.fdr.to.eventhub.util.CommonUtil;
+import it.gov.pagopa.fdr.to.eventhub.util.SampleContentFileUtil;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
+
+@ExtendWith({MockitoExtension.class, SystemStubsExtension.class})
+class HttpBlobRecoveryFunctionTest {
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ @Mock private EventHubProducerClient mockEventHubClientFlowTx;
+ @Mock private EventHubProducerClient mockEventHubClientReportedIUV;
+ @Mock private ExecutionContext mockContext;
+ @Mock private HttpRequestMessage> mockRequest;
+
+ @SystemStub private EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
+ private HttpBlobRecoveryFunction function;
+
+ private HttpResponseMessage.Builder mockResponseBuilder;
+ private HttpResponseMessage mockResponse;
+ private final AtomicReference statusToReturn = new AtomicReference<>();
+
+ @BeforeEach
+ void setUp() {
+ function =
+ new HttpBlobRecoveryFunction(mockEventHubClientFlowTx, mockEventHubClientReportedIUV);
+ Logger logger = mock(Logger.class);
+ lenient().when(mockContext.getLogger()).thenReturn(logger);
+
+ mockResponseBuilder = mock(HttpResponseMessage.Builder.class);
+ mockResponse = mock(HttpResponseMessage.class);
+
+ lenient()
+ .when(mockResponseBuilder.header(anyString(), anyString()))
+ .thenReturn(mockResponseBuilder);
+ lenient().when(mockResponseBuilder.body(any())).thenReturn(mockResponseBuilder);
+ lenient()
+ .when(mockResponseBuilder.build())
+ .thenAnswer(
+ invocation -> {
+ when(mockResponse.getStatus()).thenReturn(statusToReturn.get());
+ return mockResponse;
+ });
+
+ lenient()
+ .when(mockRequest.createResponseBuilder(any(HttpStatus.class)))
+ .thenReturn(mockResponseBuilder);
+ }
+
+ @Test
+ void testMissingRequestBody() {
+
+ statusToReturn.set(HttpStatus.BAD_REQUEST);
+
+ when(mockRequest.getBody()).thenReturn(Optional.empty());
+ HttpResponseMessage response = function.run(mockRequest, mockContext);
+ assertEquals(HttpStatus.BAD_REQUEST, response.getStatus());
+ }
+
+ @Test
+ void testInvalidJsonFormat() {
+
+ statusToReturn.set(HttpStatus.BAD_REQUEST);
+
+ when(mockRequest.getBody()).thenReturn(Optional.of("invalid-json"));
+ HttpResponseMessage response = function.run(mockRequest, mockContext);
+ assertEquals(HttpStatus.BAD_REQUEST, response.getStatus());
+ }
+
+ @Test
+ void testFileNotFound() throws Exception {
+
+ statusToReturn.set(HttpStatus.NOT_FOUND);
+
+ String requestBody =
+ objectMapper.writeValueAsString(
+ Map.of("fileName", "test.xml", "container", "test-container"));
+ when(mockRequest.getBody()).thenReturn(Optional.of(requestBody));
+
+ try (MockedStatic mockedUtil = mockStatic(CommonUtil.class)) {
+ mockedUtil
+ .when(() -> CommonUtil.getBlobFile(anyString(), anyString(), anyString(), any()))
+ .thenReturn(null);
+
+ HttpResponseMessage response = function.run(mockRequest, mockContext);
+ assertEquals(HttpStatus.NOT_FOUND, response.getStatus());
+ }
+ }
+
+ @Test
+ void testMissingMetadata() throws Exception {
+
+ statusToReturn.set(HttpStatus.UNPROCESSABLE_ENTITY);
+
+ String requestBody =
+ objectMapper.writeValueAsString(
+ Map.of("fileName", "test.xml", "container", "test-container"));
+ when(mockRequest.getBody()).thenReturn(Optional.of(requestBody));
+
+ BlobFileData mockBlobFileData = new BlobFileData(new byte[] {}, new HashMap<>());
+
+ try (MockedStatic mockedUtil = mockStatic(CommonUtil.class)) {
+ mockedUtil
+ .when(() -> CommonUtil.getBlobFile(anyString(), anyString(), anyString(), any()))
+ .thenReturn(mockBlobFileData);
+ mockedUtil.when(() -> CommonUtil.validateBlobMetadata(any())).thenReturn(false);
+
+ HttpResponseMessage response = function.run(mockRequest, mockContext);
+ assertEquals(HttpStatus.UNPROCESSABLE_ENTITY, response.getStatus());
+ }
+ }
+
+ @Test
+ void testSuccessfulProcessing() throws Exception {
+
+ statusToReturn.set(HttpStatus.OK);
+
+ String requestBody =
+ objectMapper.writeValueAsString(
+ Map.of("fileName", "test.xml", "container", "test-container"));
+ when(mockRequest.getBody()).thenReturn(Optional.of(requestBody));
+
+ Map metadata = new HashMap<>();
+ metadata.put("key", "value");
+ BlobFileData mockBlobFileData =
+ new BlobFileData(
+ SampleContentFileUtil.createGzipCompressedData(new byte[] {1, 2, 3}.toString()),
+ metadata);
+ FlussoRendicontazione mockFlusso = mock(FlussoRendicontazione.class);
+
+ try (MockedStatic mockedUtil = mockStatic(CommonUtil.class)) {
+ mockedUtil
+ .when(() -> CommonUtil.getBlobFile(anyString(), anyString(), anyString(), any()))
+ .thenReturn(mockBlobFileData);
+ mockedUtil.when(() -> CommonUtil.validateBlobMetadata(any())).thenReturn(true);
+ mockedUtil.when(() -> CommonUtil.parseXml(any())).thenReturn(mockFlusso);
+ mockedUtil
+ .when(() -> CommonUtil.processXmlBlobAndSendToEventHub(any(), any(), any(), any()))
+ .thenReturn(true);
+
+ HttpResponseMessage response = function.run(mockRequest, mockContext);
+ assertEquals(HttpStatus.OK, response.getStatus());
+ }
+ }
+
+ @Test
+ void testEventHubProcessingFailure() throws Exception {
+
+ statusToReturn.set(HttpStatus.SERVICE_UNAVAILABLE);
+
+ String requestBody =
+ objectMapper.writeValueAsString(
+ Map.of("fileName", "test.xml", "container", "test-container"));
+ when(mockRequest.getBody()).thenReturn(Optional.of(requestBody));
+
+ Map metadata = new HashMap<>();
+ metadata.put("key", "value");
+ BlobFileData mockBlobFileData =
+ new BlobFileData(
+ SampleContentFileUtil.createGzipCompressedData(new byte[] {1, 2, 3}.toString()),
+ metadata);
+ FlussoRendicontazione mockFlusso = mock(FlussoRendicontazione.class);
+
+ try (MockedStatic mockedUtil = mockStatic(CommonUtil.class)) {
+ mockedUtil
+ .when(() -> CommonUtil.getBlobFile(anyString(), anyString(), anyString(), any()))
+ .thenReturn(mockBlobFileData);
+ mockedUtil.when(() -> CommonUtil.validateBlobMetadata(any())).thenReturn(true);
+ mockedUtil.when(() -> CommonUtil.parseXml(any())).thenReturn(mockFlusso);
+ mockedUtil
+ .when(() -> CommonUtil.processXmlBlobAndSendToEventHub(any(), any(), any(), any()))
+ .thenReturn(false);
+
+ HttpResponseMessage response = function.run(mockRequest, mockContext);
+ assertEquals(HttpStatus.SERVICE_UNAVAILABLE, response.getStatus());
+ }
+ }
+
+ @Test
+ void testConstructorInitializesClients() {
+
+ try (MockedStatic mockedCommonUtil = Mockito.mockStatic(CommonUtil.class)) {
+
+ // Simulate environment variables
+ environmentVariables.set("EVENT_HUB_FLOWTX_CONNECTION_STRING", "fake-flowtx-conn-string");
+ environmentVariables.set("EVENT_HUB_FLOWTX_NAME", "fake-flowtx-name");
+ environmentVariables.set(
+ "EVENT_HUB_REPORTEDIUV_CONNECTION_STRING", "fake-reportediuv-conn-string");
+ environmentVariables.set("EVENT_HUB_REPORTEDIUV_NAME", "fake-reportediuv-name");
+
+ EventHubProducerClient mockClient1 = mock(EventHubProducerClient.class);
+ EventHubProducerClient mockClient2 = mock(EventHubProducerClient.class);
+ mockedCommonUtil
+ .when(
+ () -> CommonUtil.createEventHubClient("fake-flowtx-conn-string", "fake-flowtx-name"))
+ .thenReturn(mockClient1);
+ mockedCommonUtil
+ .when(
+ () ->
+ CommonUtil.createEventHubClient(
+ "fake-reportediuv-conn-string", "fake-reportediuv-name"))
+ .thenReturn(mockClient2);
+
+ // Instantiate the class
+ HttpBlobRecoveryFunction httpBlobRecoveryFunction = new HttpBlobRecoveryFunction();
+
+ assertNotNull(httpBlobRecoveryFunction.getEventHubClientFlowTx());
+ assertNotNull(httpBlobRecoveryFunction.getEventHubClientReportedIUV());
+ assertEquals(mockClient1, httpBlobRecoveryFunction.getEventHubClientFlowTx());
+ assertEquals(mockClient2, httpBlobRecoveryFunction.getEventHubClientReportedIUV());
+ }
+ }
+}
diff --git a/src/test/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtilTest.java b/src/test/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtilTest.java
new file mode 100644
index 0000000..15dbf52
--- /dev/null
+++ b/src/test/java/it/gov/pagopa/fdr/to/eventhub/util/CommonUtilTest.java
@@ -0,0 +1,116 @@
+package it.gov.pagopa.fdr.to.eventhub.util;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobProperties;
+import com.microsoft.azure.functions.ExecutionContext;
+import it.gov.pagopa.fdr.to.eventhub.model.BlobFileData;
+import it.gov.pagopa.fdr.to.eventhub.wrapper.BlobServiceClientWrapper;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class CommonUtilTest {
+
+ private static final String STORAGE_ENV_VAR = "STORAGE_ENV_VAR";
+ private static final String CONTAINER_NAME = "test-container";
+ private static final String BLOB_NAME = "test-blob.xml";
+
+ @Mock private BlobServiceClientWrapper mockBlobServiceClientWrapper;
+ @Mock private BlobServiceClient mockBlobServiceClient;
+ @Mock private BlobContainerClient mockBlobContainerClient;
+ @Mock private BlobClient mockBlobClient;
+ @Mock private BlobProperties mockBlobProperties;
+ @Mock private ExecutionContext mockContext;
+ @Mock private Logger mockLogger;
+ @Mock private BlobServiceClientBuilder mockBuilder;
+
+ @BeforeEach
+ void setUp() {
+ lenient().when(mockContext.getLogger()).thenReturn(mockLogger);
+ CommonUtil.setBlobServiceClientWrapper(mockBlobServiceClientWrapper);
+ when(mockBlobServiceClientWrapper.getBlobContainerClient(anyString(), anyString()))
+ .thenReturn(mockBlobContainerClient);
+ when(mockBlobContainerClient.getBlobClient(anyString())).thenReturn(mockBlobClient);
+ }
+
+ @Test
+ void testBlobFileNotFound() {
+ when(mockBlobClient.exists()).thenReturn(false);
+
+ BlobFileData result =
+ CommonUtil.getBlobFile(STORAGE_ENV_VAR, CONTAINER_NAME, BLOB_NAME, mockContext);
+
+ assertNull(result);
+ ArgumentCaptor> logCaptor = ArgumentCaptor.forClass(Supplier.class);
+ verify(mockLogger, atLeastOnce()).severe(logCaptor.capture());
+
+ logCaptor.getAllValues().stream()
+ .map(Supplier::get)
+ .anyMatch(log -> log.contains("Blob not found"));
+ }
+
+ @Test
+ void testBlobFileRetrievalSuccess() {
+ byte[] mockData = "test data".getBytes();
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(mockData);
+
+ Map metadata = new HashMap<>();
+ metadata.put("key1", "value1");
+
+ when(mockBlobClient.exists()).thenReturn(true);
+ when(mockBlobClient.getProperties()).thenReturn(mockBlobProperties);
+ when(mockBlobProperties.getMetadata()).thenReturn(metadata);
+ doAnswer(
+ invocation -> {
+ ByteArrayOutputStream actualOutputStream = invocation.getArgument(0);
+ inputStream.transferTo(actualOutputStream);
+ return null;
+ })
+ .when(mockBlobClient)
+ .downloadStream(any(ByteArrayOutputStream.class));
+
+ BlobFileData result =
+ CommonUtil.getBlobFile(STORAGE_ENV_VAR, CONTAINER_NAME, BLOB_NAME, mockContext);
+
+ assertNotNull(result);
+ assertArrayEquals(mockData, result.getFileContent());
+ assertEquals(metadata, result.getMetadata());
+ }
+
+ @Test
+ void testBlobFileRetrievalFailure() {
+ when(mockBlobContainerClient.getBlobClient(anyString()))
+ .thenThrow(new RuntimeException("Storage error"));
+
+ BlobFileData result =
+ CommonUtil.getBlobFile(STORAGE_ENV_VAR, CONTAINER_NAME, BLOB_NAME, mockContext);
+
+ assertNull(result);
+ verify(mockLogger).severe("Error accessing blob: Storage error");
+ }
+}
diff --git a/src/test/java/it/gov/pagopa/fdr/to/eventhub/util/SampleContentFileUtil.java b/src/test/java/it/gov/pagopa/fdr/to/eventhub/util/SampleContentFileUtil.java
index 1c4b913..db5c13c 100644
--- a/src/test/java/it/gov/pagopa/fdr/to/eventhub/util/SampleContentFileUtil.java
+++ b/src/test/java/it/gov/pagopa/fdr/to/eventhub/util/SampleContentFileUtil.java
@@ -1,11 +1,13 @@
package it.gov.pagopa.fdr.to.eventhub.util;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.zip.GZIPOutputStream;
import lombok.experimental.UtilityClass;
@UtilityClass
@@ -20,4 +22,12 @@ public static InputStream getSamplePomProperties() throws Exception {
Path path = Paths.get(ClassLoader.getSystemResource("pom.properties").toURI());
return new ByteArrayInputStream(Files.readString(path).getBytes(StandardCharsets.UTF_8));
}
+
+ public static byte[] createGzipCompressedData(String input) throws Exception {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
+ gzipOutputStream.write(input.getBytes(StandardCharsets.UTF_8));
+ }
+ return byteArrayOutputStream.toByteArray();
+ }
}