From 4ab546629d7a286fd6eceb3a2255521f00fd252c Mon Sep 17 00:00:00 2001 From: pierre-maraval Date: Thu, 23 May 2024 10:46:57 +0200 Subject: [PATCH] =?UTF-8?q?Fix=20:=20Ajout=20param=C3=A8tre=20nbThread=20C?= =?UTF-8?q?orrection=20sur=20incrementAndGet?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fr/abes/kafkatosudoc/kafka/KbartListener.java | 13 +++++-------- .../fr/abes/kafkatosudoc/kafka/WorkInProgress.java | 3 +++ src/main/resources/application-dev.properties | 1 + src/main/resources/application-prod.properties | 1 + src/main/resources/application-test.properties | 1 + 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/main/java/fr/abes/kafkatosudoc/kafka/KbartListener.java b/src/main/java/fr/abes/kafkatosudoc/kafka/KbartListener.java index 0163a50..6316a72 100644 --- a/src/main/java/fr/abes/kafkatosudoc/kafka/KbartListener.java +++ b/src/main/java/fr/abes/kafkatosudoc/kafka/KbartListener.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; @Service @Slf4j @@ -78,7 +77,7 @@ public KbartListener(UtilsMapper mapper, BaconService baconService, EmailService * * @param lignesKbart : ligne trouvée dans kafka */ - @KafkaListener(topics = {"${topic.name.source.kbart.toload}"}, groupId = "${topic.groupid.source.withppn}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "8") + @KafkaListener(topics = {"${topic.name.source.kbart.toload}"}, groupId = "${topic.groupid.source.withppn}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}") public void listenKbartToCreateFromKafka(ConsumerRecord lignesKbart) throws IOException { String filename = lignesKbart.key(); if (!this.workInProgressMap.containsKey(filename)) { @@ -95,10 +94,8 @@ public void listenKbartToCreateFromKafka(ConsumerRecord providerPackageDeleted) { String provider = providerPackageDeleted.value().get("PROVIDER").toString(); String packageName = providerPackageDeleted.value().get("PACKAGE").toString(); @@ -357,7 +354,7 @@ private void suppressionLien469(SudocService service, List listError, in /** * @param ligneKbart : enregistrement dans Kafka */ - @KafkaListener(topics = {"${topic.name.source.kbart.exnihilo}"}, groupId = "${topic.groupid.source.exnihilo}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "8") + @KafkaListener(topics = {"${topic.name.source.kbart.exnihilo}"}, groupId = "${topic.groupid.source.exnihilo}", containerFactory = "kafkaKbartListenerContainerFactory") public void listenKbartFromKafkaExNihilo(ConsumerRecord ligneKbart) { log.debug("Entrée dans création ex nihilo"); String filename = ligneKbart.key(); @@ -438,7 +435,7 @@ private void creerNoticeExNihilo(LigneKbartConnect ligneKbartConnect, String pro * * @param lignesKbart : ligne kbart + ppn de la notice imprimée */ - @KafkaListener(topics = {"${topic.name.source.kbart.imprime}"}, groupId = "${topic.groupid.source.imprime}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "8") + @KafkaListener(topics = {"${topic.name.source.kbart.imprime}"}, groupId = "${topic.groupid.source.imprime}", containerFactory = "kafkaKbartListenerContainerFactory") public void listenKbartFromKafkaImprime(ConsumerRecord lignesKbart) { String filename = lignesKbart.key(); diff --git a/src/main/java/fr/abes/kafkatosudoc/kafka/WorkInProgress.java b/src/main/java/fr/abes/kafkatosudoc/kafka/WorkInProgress.java index cdcb046..1e34960 100644 --- a/src/main/java/fr/abes/kafkatosudoc/kafka/WorkInProgress.java +++ b/src/main/java/fr/abes/kafkatosudoc/kafka/WorkInProgress.java @@ -7,6 +7,7 @@ import jakarta.json.JsonObject; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; @@ -14,6 +15,7 @@ @Getter @Setter +@Slf4j public class WorkInProgress { private final List listeNotices; @@ -35,6 +37,7 @@ public void addNotice(T notice) { } public Integer incrementCurrentNbLignes() { + log.debug("Thread : " + Thread.currentThread().getName() + " / Current line : " + (this.currentNbLines.get() + 1) + " / total lines : " + this.getNbLinesTotal()); return this.currentNbLines.incrementAndGet(); } diff --git a/src/main/resources/application-dev.properties b/src/main/resources/application-dev.properties index 70dfbcd..af7ae00 100644 --- a/src/main/resources/application-dev.properties +++ b/src/main/resources/application-dev.properties @@ -1,6 +1,7 @@ # Consumer properties spring.kafka.consumer.bootstrap-servers= spring.kafka.registry.url= +spring.kafka.concurrency.nbThread= #Connexion au sudoc sudoc.serveur= diff --git a/src/main/resources/application-prod.properties b/src/main/resources/application-prod.properties index 70dfbcd..af7ae00 100644 --- a/src/main/resources/application-prod.properties +++ b/src/main/resources/application-prod.properties @@ -1,6 +1,7 @@ # Consumer properties spring.kafka.consumer.bootstrap-servers= spring.kafka.registry.url= +spring.kafka.concurrency.nbThread= #Connexion au sudoc sudoc.serveur= diff --git a/src/main/resources/application-test.properties b/src/main/resources/application-test.properties index d1a5fbc..6a2f201 100644 --- a/src/main/resources/application-test.properties +++ b/src/main/resources/application-test.properties @@ -1,6 +1,7 @@ # Consumer properties spring.kafka.consumer.bootstrap-servers= spring.kafka.registry.url= +spring.kafka.concurrency.nbThread= #Connexion au sudoc sudoc.serveur=