diff --git a/.github/workflows/deploy_with_github_runner.yml b/.github/workflows/deploy_with_github_runner.yml index d8dd119..ee7b011 100644 --- a/.github/workflows/deploy_with_github_runner.yml +++ b/.github/workflows/deploy_with_github_runner.yml @@ -62,7 +62,7 @@ jobs: cluster_name: ${{ vars.CLUSTER_NAME }} resource_group: ${{ vars.CLUSTER_RESOURCE_GROUP }} app_name: ${{ env.APP_NAME }}${{inputs.suffix_name}} - helm_upgrade_options: '--debug --set microservice-chart.azure.workloadIdentityClientId=${{vars.WORKLOAD_IDENTITY_ID}} ${{inputs.helm_options}}' + helm_upgrade_options: '--debug --set microservice-chart.azure.workloadIdentityClientId=${{vars.WORKLOAD_IDENTITY_ID}}' timeout: '10m0s' - name: Remove deployment diff --git a/.github/workflows/github_scripts/check_required_labels.js b/.github/workflows/github_scripts/check_required_labels.js new file mode 100644 index 0000000..a739341 --- /dev/null +++ b/.github/workflows/github_scripts/check_required_labels.js @@ -0,0 +1,24 @@ +module.exports = async ({github, context, core}) => { + const comments = await github.rest.issues.listComments({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo + }); + for (const comment of comments.data) { + if (comment.body.includes('This pull request does not contain a valid label')) { + github.rest.issues.deleteComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + comment_id: comment.id + }) + } + } + github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: 'This pull request does not contain a valid label. Please add one of the following labels: `[major, minor, patch, patch, skip]`' + }) + core.setFailed('Missing required labels') +} diff --git a/.github/workflows/github_scripts/check_size.js b/.github/workflows/github_scripts/check_size.js new file mode 100644 index 0000000..e8132d0 --- /dev/null +++ b/.github/workflows/github_scripts/check_size.js @@ -0,0 +1,99 @@ +module.exports = async ({github, context, core}) => { + const additions = context.payload.pull_request.additions || 0 + const deletions = context.payload.pull_request.deletions || 0 + let changes = additions + deletions; + console.log('additions: ' + additions + ' + deletions: ' + deletions + ' = total changes: ' + changes); + + const {IGNORED_FILES, BRANCH_NAME} = process.env + const ignored_files = IGNORED_FILES.trim().split(',').filter(word => word.length > 0); + if (ignored_files.length > 0) { + var ignored = 0 + const execSync = require('child_process').execSync; + for (const file of IGNORED_FILES.trim().split(',')) { + + const ignored_additions_str = execSync('git --no-pager diff --numstat origin/main..origin/' + BRANCH_NAME + ' | grep ' + file + ' | cut -f 1', {encoding: 'utf-8'}) + const ignored_deletions_str = execSync('git --no-pager diff --numstat origin/main..origin/' + BRANCH_NAME + ' | grep ' + file + ' | cut -f 2', {encoding: 'utf-8'}) + + const ignored_additions = ignored_additions_str.split('\n').map(elem => parseInt(elem || 0)).reduce( + (accumulator, currentValue) => accumulator + currentValue, + 0); + const ignored_deletions = ignored_deletions_str.split('\n').map(elem => parseInt(elem || 0)).reduce( + (accumulator, currentValue) => accumulator + currentValue, + 0); + + ignored += ignored_additions + ignored_deletions; + } + changes -= ignored + console.log('ignored lines: ' + ignored + ' , consider changes: ' + changes); + } + + var labels = await github.rest.issues.listLabelsOnIssue({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo + }); + + if (changes <= 400) { + if (labels.data.find(label => label.name === 'size/large')) { + github.rest.issues.removeLabel({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + name: 'size/large' + }) + } + } + + if (changes >= 200) { + if (labels.data.find(label => label.name === 'size/small')) { + github.rest.issues.removeLabel({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + name: 'size/small' + }) + } + } + + var comments = await github.rest.issues.listComments({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo + }); + for (const comment of comments.data) { + if (comment.body.includes('This PR exceeds the recommended size')) { + github.rest.issues.deleteComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + comment_id: comment.id + }) + } + } + + if (changes < 200) { + github.rest.issues.addLabels({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + labels: ['size/small'] + }) + } + + if (changes > 400) { + github.rest.issues.addLabels({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + labels: ['size/large'] + }) + + github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: 'This PR exceeds the recommended size of 400 lines. Please make sure you are NOT addressing multiple issues with one PR. _Note this PR might be rejected due to its size._' + }) + + } +} diff --git a/.identity/00_data.tf b/.identity/00_data.tf index 640a443..1b0d25a 100644 --- a/.identity/00_data.tf +++ b/.identity/00_data.tf @@ -24,7 +24,7 @@ data "azurerm_key_vault" "key_vault" { data "azurerm_user_assigned_identity" "identity_cd" { - name = "${local.product}-${local.domain}-01-github-cd-identity" + name = "${local.product}-${local.domain}-job-01-github-cd-identity" resource_group_name = "${local.product}-identity-rg" } @@ -58,8 +58,8 @@ data "azurerm_key_vault_secret" "key_vault_cucumber_token" { key_vault_id = data.azurerm_key_vault.key_vault.id } -# data "azurerm_user_assigned_identity" "workload_identity_clientid" { -# name = "ebollo-workload-identity" -# resource_group_name = "pagopa-${var.env_short}-${var.env}-aks-rg" -# } +data "azurerm_user_assigned_identity" "workload_identity_clientid" { + name = "fdr-workload-identity" + resource_group_name = "pagopa-${var.env_short}-weu-${var.env}-aks-rg" +} diff --git a/.identity/01_github_environment.tf b/.identity/01_github_environment.tf index 8b2b493..08e975c 100644 --- a/.identity/01_github_environment.tf +++ b/.identity/01_github_environment.tf @@ -32,7 +32,7 @@ locals { "CLUSTER_NAME" : local.aks_cluster.name, "CLUSTER_RESOURCE_GROUP" : local.aks_cluster.resource_group_name, "NAMESPACE" : local.domain, - # "WORKLOAD_IDENTITY_ID": data.azurerm_user_assigned_identity.workload_identity_clientid.client_id + "WORKLOAD_IDENTITY_ID": data.azurerm_user_assigned_identity.workload_identity_clientid.client_id } repo_secrets = { "SONAR_TOKEN" : data.azurerm_key_vault_secret.key_vault_sonar.value, diff --git a/helm/Chart.lock b/helm/Chart.lock deleted file mode 100644 index ec10db3..0000000 --- a/helm/Chart.lock +++ /dev/null @@ -1,6 +0,0 @@ -dependencies: -- name: microservice-chart - repository: https://pagopa.github.io/aks-microservice-chart-blueprint - version: 1.21.0 -digest: sha256:e3deccb7ac0b5d85af0c726f28316ebe7a3795cbf54522330c33474b0bae309a -generated: "2022-10-06T17:44:35.49088+02:00" diff --git a/helm/Chart.yaml b/helm/Chart.yaml index f39b58a..964988c 100644 --- a/helm/Chart.yaml +++ b/helm/Chart.yaml @@ -1,10 +1,10 @@ apiVersion: v2 -name: pagopa-functions-template -description: Microservice description +name: pagopa-fdr-to-event-hub +description: Microservice fdr to event hub type: application -version: 0.27.0 -appVersion: 0.0.2 +version: 0.43.0 +appVersion: 0.0.2-16-PAGOPA-2645-tuning-fdr-to-eventhub dependencies: - name: microservice-chart - version: 1.21.0 + version: 7.1.1 repository: "https://pagopa.github.io/aks-microservice-chart-blueprint" diff --git a/helm/values-dev.yaml b/helm/values-dev.yaml index 7735d57..ddd09e6 100644 --- a/helm/values-dev.yaml +++ b/helm/values-dev.yaml @@ -1,40 +1,65 @@ microservice-chart: namespace: "fdr" - nameOverride: "" - fullnameOverride: "" + nameOverride: "pagopa-fdr-2-event" + fullnameOverride: "pagopa-fdr-2-event-hub" image: repository: ghcr.io/pagopa/pagopa-fdr-2-event-hub - tag: "0.0.2" + tag: "0.0.2-16-PAGOPA-2645-tuning-fdr-to-eventhub" pullPolicy: Always # https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs + # livenessProbe: + # httpGet: + # path: /info + # port: 80 + # initialDelaySeconds: 60 + # failureThreshold: 6 + # periodSeconds: 10 + # readinessProbe: + # httpGet: + # path: /info + # port: 80 + # initialDelaySeconds: 60 + # failureThreshold: 6 + # periodSeconds: 10 livenessProbe: - httpGet: - path: /info - port: 80 - initialDelaySeconds: 60 - failureThreshold: 6 - periodSeconds: 10 + handlerType: tcpSocket + tcpSocket: + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 30 + failureThreshold: 10 readinessProbe: - httpGet: - path: /info - port: 80 - initialDelaySeconds: 60 - failureThreshold: 6 - periodSeconds: 10 + handlerType: tcpSocket + tcpSocket: + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 30 + failureThreshold: 10 deployment: create: true - service: + serviceMonitor: create: true + endpoints: + - interval: 10s #jmx-exporter + targetPort: 12345 + path: /metrics + ports: + - 12345 #jmx-exporter + - 8080 + service: type: ClusterIP - port: 80 + ports: + - 8080 + - 12345 #jmx-exporter ingress: create: true host: "weudev.fdr.internal.dev.platform.pagopa.it" path: /pagopa-fdr-to-event-hub-service/(.*) + servicePort: 80 serviceAccount: - create: false - annotations: {} - name: "" + name: "fdr-workload-identity" + azure: + workloadIdentityClientId: podAnnotations: {} podSecurityContext: seccompProfile: @@ -68,6 +93,16 @@ microservice-chart: BLOB_STORAGE_FDR3_CONTAINER: "fdr3-flows" EVENT_HUB_FLOWTX_NAME: "fdr-qi-flows" EVENT_HUB_REPORTEDIUV_NAME: "fdr-qi-reported-iuv" + ASPNETCORE_URLS: "http://*:8080" + # AzureFunctionsJobHost__logging__logLevel__default: "Debug" + # AzureFunctionsJobHost__logging__logLevel__Host__Results: "Debug" + # AzureFunctionsJobHost__logging__logLevel__Host__Aggregator: "Debug" + # AzureFunctionsJobHost__logging__logLevel__Function__ProcessFDR1BlobFiles: "Debug" + # AzureFunctionsJobHost__logging__logLevel__Function__ProcessFDR3BlobFiles: "Debug" + FUNCTIONS_SECRETS_PATH: "/tmp/secrets" + envFieldRef: + APP_NAME: "metadata.labels['app.kubernetes.io/instance']" + APP_VERSION: "metadata.labels['app.kubernetes.io/version']" envSecret: APPLICATIONINSIGHTS_CONNECTION_STRING: "ai-connection-string" FDR_SA_CONNECTION_STRING: "fdr-sa-connection-string" @@ -78,7 +113,19 @@ microservice-chart: name: "pagopa-d-fdr-kv" tenantId: "7788edaf-0346-4068-9d79-c868aed15b3d" nodeSelector: {} - tolerations: [] - affinity: {} + tolerations: + - key: dedicated + operator: Equal + value: "nodo" + effect: NoSchedule + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: nodo + operator: In + values: + - "true" canaryDelivery: create: false diff --git a/helm/values-prod.yaml b/helm/values-prod.yaml index af11462..107f2c4 100644 --- a/helm/values-prod.yaml +++ b/helm/values-prod.yaml @@ -1,40 +1,65 @@ microservice-chart: namespace: "fdr" - nameOverride: "" - fullnameOverride: "" + nameOverride: "pagopa-fdr-2-event" + fullnameOverride: "pagopa-fdr-2-event-hub" image: repository: ghcr.io/pagopa/pagopa-fdr-2-event-hub - tag: "0.0.2" + tag: "0.0.2-16-PAGOPA-2645-tuning-fdr-to-eventhub" pullPolicy: Always # https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs + # livenessProbe: + # httpGet: + # path: /info + # port: 80 + # initialDelaySeconds: 60 + # failureThreshold: 6 + # periodSeconds: 10 + # readinessProbe: + # httpGet: + # path: /info + # port: 80 + # initialDelaySeconds: 60 + # failureThreshold: 6 + # periodSeconds: 10 livenessProbe: - httpGet: - path: /info - port: 80 - initialDelaySeconds: 60 - failureThreshold: 6 - periodSeconds: 10 + handlerType: tcpSocket + tcpSocket: + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 30 + failureThreshold: 10 readinessProbe: - httpGet: - path: /info - port: 80 - initialDelaySeconds: 60 - failureThreshold: 6 - periodSeconds: 10 + handlerType: tcpSocket + tcpSocket: + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 30 + failureThreshold: 10 deployment: create: true - service: + serviceMonitor: create: true + endpoints: + - interval: 10s #jmx-exporter + targetPort: 12345 + path: /metrics + ports: + - 12345 #jmx-exporter + - 8080 + service: type: ClusterIP - port: 80 + ports: + - 8080 + - 12345 #jmx-exporter ingress: create: true host: "weuprod.fdr.internal.platform.pagopa.it" path: /pagopa-fdr-to-event-hub-service/(.*) + servicePort: 80 serviceAccount: - create: false - annotations: {} - name: "" + name: "fdr-workload-identity" + azure: + workloadIdentityClientId: podAnnotations: {} podSecurityContext: seccompProfile: @@ -43,15 +68,15 @@ microservice-chart: allowPrivilegeEscalation: false resources: requests: - memory: "512Mi" - cpu: "0.25" + memory: "768Mi" + cpu: "0.5" limits: - memory: "1.25Gi" - cpu: "1" + memory: "4Gi" + cpu: "3" autoscaling: enable: true - minReplica: 2 - maxReplica: 5 + minReplica: 1 + maxReplica: 1 pollingInterval: 10 # seconds cooldownPeriod: 50 # seconds triggers: @@ -66,13 +91,18 @@ microservice-chart: type: Utilization # Allowed types are 'Utilization' or 'AverageValue' value: "75" envConfig: - JAVA_OPTS: "-XX:MaxHeapSize=640m -XX:MinHeapSize=192m" + JAVA_OPTS: "-XX:MaxHeapSize=2304m -XX:MinHeapSize=256m" WEBSITE_SITE_NAME: "pagopafdrtoeventhub" # required to show cloud role name in application insights FUNCTIONS_WORKER_RUNTIME: "java" BLOB_STORAGE_FDR1_CONTAINER: "fdr1-flows" BLOB_STORAGE_FDR3_CONTAINER: "fdr3-flows" EVENT_HUB_FLOWTX_NAME: "fdr-qi-flows" EVENT_HUB_REPORTEDIUV_NAME: "fdr-qi-reported-iuv" + ASPNETCORE_URLS: "http://*:8080" + FUNCTIONS_SECRETS_PATH: "/tmp/secrets" + envFieldRef: + APP_NAME: "metadata.labels['app.kubernetes.io/instance']" + APP_VERSION: "metadata.labels['app.kubernetes.io/version']" envSecret: APPLICATIONINSIGHTS_CONNECTION_STRING: "ai-connection-string" FDR_SA_CONNECTION_STRING: "fdr-sa-connection-string" @@ -83,7 +113,19 @@ microservice-chart: name: "pagopa-p-fdr-kv" tenantId: "7788edaf-0346-4068-9d79-c868aed15b3d" nodeSelector: {} - tolerations: [] - affinity: {} + tolerations: + - key: dedicated + operator: Equal + value: "nodo" + effect: NoSchedule + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: nodo + operator: In + values: + - "true" canaryDelivery: create: false diff --git a/helm/values-uat.yaml b/helm/values-uat.yaml index f8b064b..4fe1d7a 100644 --- a/helm/values-uat.yaml +++ b/helm/values-uat.yaml @@ -1,40 +1,65 @@ microservice-chart: namespace: "fdr" - nameOverride: "" - fullnameOverride: "" + nameOverride: "pagopa-fdr-2-event" + fullnameOverride: "pagopa-fdr-2-event-hub" image: repository: ghcr.io/pagopa/pagopa-fdr-2-event-hub - tag: "0.0.2" + tag: "0.0.2-16-PAGOPA-2645-tuning-fdr-to-eventhub" pullPolicy: Always # https://github.com/Azure/azure-functions-host/blob/dev/src/WebJobs.Script.WebHost/Controllers/HostController.cs + # livenessProbe: + # httpGet: + # path: /info + # port: 80 + # initialDelaySeconds: 60 + # failureThreshold: 6 + # periodSeconds: 10 + # readinessProbe: + # httpGet: + # path: /info + # port: 80 + # initialDelaySeconds: 60 + # failureThreshold: 6 + # periodSeconds: 10 livenessProbe: - httpGet: - path: /info - port: 80 - initialDelaySeconds: 60 - failureThreshold: 6 - periodSeconds: 10 + handlerType: tcpSocket + tcpSocket: + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 30 + failureThreshold: 10 readinessProbe: - httpGet: - path: /info - port: 80 - initialDelaySeconds: 60 - failureThreshold: 6 - periodSeconds: 10 + handlerType: tcpSocket + tcpSocket: + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 30 + failureThreshold: 10 deployment: create: true - service: + serviceMonitor: create: true + endpoints: + - interval: 10s #jmx-exporter + targetPort: 12345 + path: /metrics + ports: + - 12345 #jmx-exporter + - 8080 + service: type: ClusterIP - port: 80 + ports: + - 8080 + - 12345 #jmx-exporter ingress: create: true host: "weuuat.fdr.internal.uat.platform.pagopa.it" path: /pagopa-fdr-to-event-hub-service/(.*) + servicePort: 80 serviceAccount: - create: false - annotations: {} - name: "" + name: "fdr-workload-identity" + azure: + workloadIdentityClientId: podAnnotations: {} podSecurityContext: seccompProfile: @@ -43,15 +68,15 @@ microservice-chart: allowPrivilegeEscalation: false resources: requests: - memory: "512Mi" + memory: "768Mi" cpu: "0.5" limits: - memory: "2Gi" - cpu: "1.5" + memory: "4Gi" + cpu: "3" autoscaling: enable: true - minReplica: 2 - maxReplica: 7 + minReplica: 1 + maxReplica: 1 pollingInterval: 10 # seconds cooldownPeriod: 50 # seconds triggers: @@ -66,13 +91,18 @@ microservice-chart: type: Utilization # Allowed types are 'Utilization' or 'AverageValue' value: "75" envConfig: - JAVA_OPTS: "-XX:MaxHeapSize=1024m -XX:MinHeapSize=192m" + JAVA_OPTS: "-XX:MaxHeapSize=2304m -XX:MinHeapSize=256m" WEBSITE_SITE_NAME: "pagopafdrtoeventhub" # required to show cloud role name in application insights FUNCTIONS_WORKER_RUNTIME: "java" BLOB_STORAGE_FDR1_CONTAINER: "fdr1-flows" BLOB_STORAGE_FDR3_CONTAINER: "fdr3-flows" EVENT_HUB_FLOWTX_NAME: "fdr-qi-flows" EVENT_HUB_REPORTEDIUV_NAME: "fdr-qi-reported-iuv" + ASPNETCORE_URLS: "http://*:8080" + FUNCTIONS_SECRETS_PATH: "/tmp/secrets" + envFieldRef: + APP_NAME: "metadata.labels['app.kubernetes.io/instance']" + APP_VERSION: "metadata.labels['app.kubernetes.io/version']" envSecret: APPLICATIONINSIGHTS_CONNECTION_STRING: "ai-connection-string" FDR_SA_CONNECTION_STRING: "fdr-sa-connection-string" @@ -83,7 +113,19 @@ microservice-chart: name: "pagopa-u-fdr-kv" tenantId: "7788edaf-0346-4068-9d79-c868aed15b3d" nodeSelector: {} - tolerations: [] - affinity: {} + tolerations: + - key: dedicated + operator: Equal + value: "nodo" + effect: NoSchedule + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: nodo + operator: In + values: + - "true" canaryDelivery: create: false diff --git a/host.json b/host.json index c10bf0d..02736ff 100644 --- a/host.json +++ b/host.json @@ -16,6 +16,12 @@ "maxDequeueCount": 3, "newBatchThreshold": 1 }, + "retry": { + "strategy": "exponentialBackoff", + "maxRetryCount": 3, + "minimumInterval": "00:00:30", + "maximumInterval": "00:02:00" + }, "http": { "routePrefix": "" } diff --git a/pom.xml b/pom.xml index 95f1230..4b8b980 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ it.gov.pagopa.fdr.to.eventhub pagopa-fdr-to-event-hub - 0.0.2 + 0.0.2-16-PAGOPA-2645-tuning-fdr-to-eventhub jar FDR To Event Hub diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java index 280a4eb..44ea9ad 100644 --- a/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunction.java @@ -54,7 +54,8 @@ public BlobProcessingFunction() { System.getenv("EVENT_HUB_FLOWTX_NAME")) .retryOptions( new AmqpRetryOptions() - .setMaxRetries(3) // Maximum number of attempts + .setMaxRetries(3) // Maximum number of + // attempts .setDelay(Duration.ofSeconds(2)) // Delay between attempts .setMode(AmqpRetryMode.EXPONENTIAL)) // Backoff strategy .buildProducerClient(); @@ -81,7 +82,7 @@ public BlobProcessingFunction( } @FunctionName("ProcessFDR1BlobFiles") - public void processFDR1BlobFiles( + public synchronized void processFDR1BlobFiles( @BlobTrigger( name = "Fdr1BlobTrigger", dataType = "binary", @@ -137,7 +138,15 @@ public void processFDR1BlobFiles( content.length)); flusso.setMetadata(blobMetadata); - processXmlBlobAndSendToEventHub(flusso, context); + + // Waits for confirmation of sending the entire flow to the Event Hub + boolean eventBatchSent = processXmlBlobAndSendToEventHub(flusso, context); + if (!eventBatchSent) { + throw new EventHubException( + String.format( + "EventHub has not confirmed sending the entire batch of events for flow ID: %s", + flusso.getIdentificativoFlusso())); + } context .getLogger() @@ -222,9 +231,8 @@ private FlussoRendicontazione parseXml(InputStream xmlStream) return FDR1XmlSAXParser.parseXmlStream(xmlStream); } - private void processXmlBlobAndSendToEventHub( - FlussoRendicontazione flussoRendicontazione, ExecutionContext context) - throws EventHubException { + private boolean processXmlBlobAndSendToEventHub( + FlussoRendicontazione flussoRendicontazione, ExecutionContext context) { try { // Convert FlussoRendicontazione to event models FlowTxEventModel flowEvent = @@ -255,12 +263,19 @@ private void processXmlBlobAndSendToEventHub( flussoRendicontazione.getIdentificativoFlusso(), reportedIUVEventJsonChunks.size())); - sendEventToHub(flowEventJson, eventHubClientFlowTx, flussoRendicontazione); + boolean flowEventSent = + sendEventToHub(flowEventJson, eventHubClientFlowTx, flussoRendicontazione, context); + boolean allEventChunksSent = true; for (String chunk : reportedIUVEventJsonChunks) { - sendEventToHub(chunk, eventHubClientReportedIUV, flussoRendicontazione); + if (!sendEventToHub(chunk, eventHubClientReportedIUV, flussoRendicontazione, context)) { + allEventChunksSent = false; + break; + } } + return flowEventSent && allEventChunksSent; + } catch (Exception e) { // Log the exception with context String errorMessage = @@ -269,8 +284,7 @@ private void processXmlBlobAndSendToEventHub( flussoRendicontazione.getIdentificativoFlusso(), e.getMessage()); context.getLogger().severe(() -> errorMessage); - // Rethrow custom exception with context - throw new EventHubException(errorMessage, e); + return false; } } @@ -315,20 +329,40 @@ private List splitIntoChunks( } /** Send a message to the Event Hub */ - private void sendEventToHub( - String jsonPayload, EventHubProducerClient eventHubClient, FlussoRendicontazione flusso) { + private boolean sendEventToHub( + String jsonPayload, + EventHubProducerClient eventHubClient, + FlussoRendicontazione flusso, + ExecutionContext context) { EventData eventData = new EventData(jsonPayload); eventData .getProperties() - .put( - SERVICE_IDENTIFIER, - flusso.getMetadata().get(SERVICE_IDENTIFIER) != null - ? flusso.getMetadata().get(SERVICE_IDENTIFIER) - : "NA"); + .put(SERVICE_IDENTIFIER, flusso.getMetadata().getOrDefault(SERVICE_IDENTIFIER, "NA")); EventDataBatch eventBatch = eventHubClient.createBatch(); - eventBatch.tryAdd(eventData); + if (!eventBatch.tryAdd(eventData)) { + context + .getLogger() + .warning( + () -> + String.format( + "Failed to add event to batch for flow ID: %s", + flusso.getIdentificativoFlusso())); + return false; + } - eventHubClient.send(eventBatch); + try { + eventHubClient.send(eventBatch); + return true; + } catch (Exception e) { + context + .getLogger() + .warning( + () -> + String.format( + "Failed to add event to batch for flow ID: %s. Details: %s", + flusso.getIdentificativoFlusso(), e.getMessage())); + return false; + } } } diff --git a/src/main/java/it/gov/pagopa/fdr/to/eventhub/mapper/FlussoRendicontazioneMapper.java b/src/main/java/it/gov/pagopa/fdr/to/eventhub/mapper/FlussoRendicontazioneMapper.java index 5d7cbec..1405568 100644 --- a/src/main/java/it/gov/pagopa/fdr/to/eventhub/mapper/FlussoRendicontazioneMapper.java +++ b/src/main/java/it/gov/pagopa/fdr/to/eventhub/mapper/FlussoRendicontazioneMapper.java @@ -9,6 +9,7 @@ import java.time.format.DateTimeFormatterBuilder; import java.time.format.DateTimeParseException; import java.time.temporal.ChronoField; +import java.util.ArrayList; import java.util.List; import lombok.experimental.UtilityClass; import org.modelmapper.ModelMapper; @@ -66,10 +67,17 @@ public static FlowTxEventModel toFlowTxEventList(FlussoRendicontazione flusso) { .insertedTimestamp(parseDate(flusso.getMetadata().get("insertedTimestamp"))) .psp(flusso.getIdentificativoPSP()) .causal(flusso.getFlussoRiversamento().getIdentificativoUnivocoRegolamento()) - .allDates( - flusso.getFlussoRiversamento().getDatiSingoliPagamenti().stream() - .map(dsp -> dsp.getDataEsitoSingoloPagamento()) - .toList()) + // This field should include all payment dates from the Flusso + // Riversamento. + // For very large files (>100,000 payments), the resulting JSON + // exceeds the 1024KB limit allowed for an Event Hub event. Pending + // further guidance, an empty list will be used instead. + .allDates(new ArrayList<>()) + // + // .allDates( + // flusso.getFlussoRiversamento().getDatiSingoliPagamenti().stream() + // .map(dsp -> dsp.getDataEsitoSingoloPagamento()) .toList()) + // .build(); } diff --git a/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java b/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java index 353209f..56e88cf 100644 --- a/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java +++ b/src/test/java/it/gov/pagopa/fdr/to/eventhub/BlobProcessingFunctionTest.java @@ -4,12 +4,15 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.azure.core.amqp.exception.AmqpErrorContext; +import com.azure.core.amqp.exception.AmqpException; import com.azure.messaging.eventhubs.EventDataBatch; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.microsoft.azure.functions.ExecutionContext; @@ -59,7 +62,12 @@ private byte[] createGzipCompressedData(String input) throws Exception { @Test void testFDR1BlobTriggerProcessing() throws Exception { + EventDataBatch mockEventDataBatch = mock(EventDataBatch.class); when(context.getLogger()).thenReturn(mockLogger); + when(eventHubClientFlowTx.createBatch()).thenReturn(mockEventDataBatch); + when(eventHubClientReportedIUV.createBatch()).thenReturn(mockEventDataBatch); + when(mockEventDataBatch.tryAdd(any(com.azure.messaging.eventhubs.EventData.class))) + .thenReturn(Boolean.TRUE); String sampleXml = SampleContentFileUtil.getSampleXml("sample.xml"); byte[] compressedData = createGzipCompressedData(sampleXml); Map metadata = new HashMap<>(); @@ -75,7 +83,12 @@ void testFDR1BlobTriggerProcessing() throws Exception { @Test void testFDR1BigBlobTriggerProcessing() throws Exception { + EventDataBatch mockEventDataBatch = mock(EventDataBatch.class); when(context.getLogger()).thenReturn(mockLogger); + when(eventHubClientFlowTx.createBatch()).thenReturn(mockEventDataBatch); + when(eventHubClientReportedIUV.createBatch()).thenReturn(mockEventDataBatch); + when(mockEventDataBatch.tryAdd(any(com.azure.messaging.eventhubs.EventData.class))) + .thenReturn(Boolean.TRUE); String sampleXml = SampleContentFileUtil.getSampleXml("big_sample.xml"); byte[] compressedData = createGzipCompressedData(sampleXml); Map metadata = new HashMap<>(); @@ -220,6 +233,53 @@ void testFDR1ValidateBlobMetadata_ElaborateFalse() { : "The log does not contain the expected message for 'elaborate' false"; } + @Test + void testFDR1BlobTriggerProcessingError() throws Exception { + EventDataBatch mockEventDataBatch = mock(EventDataBatch.class); + when(context.getLogger()).thenReturn(mockLogger); + when(eventHubClientFlowTx.createBatch()).thenReturn(mockEventDataBatch); + // precondition for tryAdd fail + when(mockEventDataBatch.tryAdd(any(com.azure.messaging.eventhubs.EventData.class))) + .thenThrow( + new AmqpException( + Boolean.TRUE, "Failed to add event data", mock(AmqpErrorContext.class))); + String sampleXml = SampleContentFileUtil.getSampleXml("sample.xml"); + byte[] compressedData = createGzipCompressedData(sampleXml); + Map metadata = new HashMap<>(); + metadata.put("sessionId", "1234"); + metadata.put("insertedTimestamp", "2025-01-30T10:15:30"); + metadata.put("elaborate", "true"); + + function.processFDR1BlobFiles(compressedData, "sampleBlob", metadata, context); + + ArgumentCaptor> logCaptor = ArgumentCaptor.forClass(Supplier.class); + verify(mockLogger, atLeastOnce()).severe(logCaptor.capture()); + + logCaptor.getAllValues().stream() + .map(Supplier::get) + .anyMatch(log -> log.contains("Error processing Blob")); + + verify(eventHubClientFlowTx, never()).send(any(EventDataBatch.class)); + verify(eventHubClientReportedIUV, never()).send(any(EventDataBatch.class)); + + // precondition for send fail + when(mockEventDataBatch.tryAdd(any(com.azure.messaging.eventhubs.EventData.class))) + .thenReturn(Boolean.TRUE); + doThrow(NullPointerException.class).when(eventHubClientFlowTx).send(any(EventDataBatch.class)); + + function.processFDR1BlobFiles(compressedData, "sampleBlob", metadata, context); + + logCaptor = ArgumentCaptor.forClass(Supplier.class); + verify(mockLogger, atLeastOnce()).severe(logCaptor.capture()); + + logCaptor.getAllValues().stream() + .map(Supplier::get) + .anyMatch(log -> log.contains("Error processing Blob")); + + verify(eventHubClientFlowTx, atLeastOnce()).send(any(EventDataBatch.class)); + verify(eventHubClientReportedIUV, never()).send(any(EventDataBatch.class)); + } + @Test void testFDR3BlobTriggerProcessing() throws Exception { when(context.getLogger()).thenReturn(mockLogger);