Skip to content

Commit

Permalink
Fix : Ajout paramètre nbThread
Browse files Browse the repository at this point in the history
Correction sur incrementAndGet
  • Loading branch information
pierre-maraval committed May 23, 2024
1 parent 308e1bb commit 4ab5466
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 8 deletions.
13 changes: 5 additions & 8 deletions src/main/java/fr/abes/kafkatosudoc/kafka/KbartListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

@Service
@Slf4j
Expand Down Expand Up @@ -78,7 +77,7 @@ public KbartListener(UtilsMapper mapper, BaconService baconService, EmailService
*
* @param lignesKbart : ligne trouvée dans kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart.toload}"}, groupId = "${topic.groupid.source.withppn}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "8")
@KafkaListener(topics = {"${topic.name.source.kbart.toload}"}, groupId = "${topic.groupid.source.withppn}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "${spring.kafka.concurrency.nbThread}")
public void listenKbartToCreateFromKafka(ConsumerRecord<String, LigneKbartConnect> lignesKbart) throws IOException {
String filename = lignesKbart.key();
if (!this.workInProgressMap.containsKey(filename)) {
Expand All @@ -95,10 +94,8 @@ public void listenKbartToCreateFromKafka(ConsumerRecord<String, LigneKbartConnec
//on alimente la liste des notices d'un package qui sera traitée intégralement
this.workInProgressMap.get(filename).addNotice(lignesKbart.value());
}

//Si le nombre de lignes traitées est égal au nombre de lignes total du fichier, on est arrivé en fin de fichier, on traite dans le sudoc
log.debug("Current line : " + this.workInProgressMap.get(filename).incrementCurrentNbLignes() + " / total lines : " + this.workInProgressMap.get(filename).getNbLinesTotal());
if (this.workInProgressMap.get(filename).getCurrentNbLines().get() == (this.workInProgressMap.get(filename).getNbLinesTotal())) {
if (Objects.equals(this.workInProgressMap.get(filename).incrementCurrentNbLignes(), this.workInProgressMap.get(filename).getNbLinesTotal())) {
log.debug("Traitement des notices existantes dans le Sudoc à partir du kbart");
traiterPackageDansSudoc(this.workInProgressMap.get(filename).getListeNotices(), filename);
if (!this.workInProgressMap.get(filename).getErrorMessages().isEmpty())
Expand Down Expand Up @@ -266,7 +263,7 @@ private void suppression469(String ppnNoticeBouquet, String ppn, LigneKbartConne
*
* @param providerPackageDeleted enregistrement dans kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart.todelete}"}, groupId = "${topic.groupid.source.delete}", containerFactory = "kafkaDeletePackageListenerContainerFactory", concurrency = "8")
@KafkaListener(topics = {"${topic.name.source.kbart.todelete}"}, groupId = "${topic.groupid.source.delete}", containerFactory = "kafkaDeletePackageListenerContainerFactory")
public void listenKbartToDeleteFromKafka(ConsumerRecord<String, GenericRecord> providerPackageDeleted) {
String provider = providerPackageDeleted.value().get("PROVIDER").toString();
String packageName = providerPackageDeleted.value().get("PACKAGE").toString();
Expand Down Expand Up @@ -357,7 +354,7 @@ private void suppressionLien469(SudocService service, List<String> listError, in
/**
* @param ligneKbart : enregistrement dans Kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart.exnihilo}"}, groupId = "${topic.groupid.source.exnihilo}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "8")
@KafkaListener(topics = {"${topic.name.source.kbart.exnihilo}"}, groupId = "${topic.groupid.source.exnihilo}", containerFactory = "kafkaKbartListenerContainerFactory")
public void listenKbartFromKafkaExNihilo(ConsumerRecord<String, LigneKbartConnect> ligneKbart) {
log.debug("Entrée dans création ex nihilo");
String filename = ligneKbart.key();
Expand Down Expand Up @@ -438,7 +435,7 @@ private void creerNoticeExNihilo(LigneKbartConnect ligneKbartConnect, String pro
*
* @param lignesKbart : ligne kbart + ppn de la notice imprimée
*/
@KafkaListener(topics = {"${topic.name.source.kbart.imprime}"}, groupId = "${topic.groupid.source.imprime}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "8")
@KafkaListener(topics = {"${topic.name.source.kbart.imprime}"}, groupId = "${topic.groupid.source.imprime}", containerFactory = "kafkaKbartListenerContainerFactory")
public void listenKbartFromKafkaImprime(ConsumerRecord<String, LigneKbartImprime> lignesKbart) {
String filename = lignesKbart.key();

Expand Down
3 changes: 3 additions & 0 deletions src/main/java/fr/abes/kafkatosudoc/kafka/WorkInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import jakarta.json.JsonObject;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Getter
@Setter
@Slf4j
public class WorkInProgress<T> {
private final List<T> listeNotices;

Expand All @@ -35,6 +37,7 @@ public void addNotice(T notice) {
}

public Integer incrementCurrentNbLignes() {
log.debug("Thread : " + Thread.currentThread().getName() + " / Current line : " + (this.currentNbLines.get() + 1) + " / total lines : " + this.getNbLinesTotal());
return this.currentNbLines.incrementAndGet();
}

Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Consumer properties
spring.kafka.consumer.bootstrap-servers=
spring.kafka.registry.url=
spring.kafka.concurrency.nbThread=

#Connexion au sudoc
sudoc.serveur=
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application-prod.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Consumer properties
spring.kafka.consumer.bootstrap-servers=
spring.kafka.registry.url=
spring.kafka.concurrency.nbThread=

#Connexion au sudoc
sudoc.serveur=
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application-test.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Consumer properties
spring.kafka.consumer.bootstrap-servers=
spring.kafka.registry.url=
spring.kafka.concurrency.nbThread=

#Connexion au sudoc
sudoc.serveur=
Expand Down

0 comments on commit 4ab5466

Please sign in to comment.