Skip to content

Commit

Permalink
tentative_fix_perte_message
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelQuetin committed May 22, 2024
1 parent 3c9fddf commit 7fffbb4
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
21 changes: 11 additions & 10 deletions src/main/java/fr/abes/kafkatosudoc/kafka/KbartListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

@Service
@Slf4j
Expand All @@ -54,7 +55,7 @@ public class KbartListener {

private final UtilsMapper mapper;


private AtomicInteger nb = new AtomicInteger(0);
private final BaconService baconService;

private final EmailService emailService;
Expand All @@ -79,28 +80,28 @@ public KbartListener(UtilsMapper mapper, BaconService baconService, EmailService
*
* @param lignesKbart : ligne trouvée dans kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart.toload}"}, groupId = "${topic.groupid.source.withppn}", containerFactory = "kafkaKbartListenerContainerFactory")
@KafkaListener(topics = {"${topic.name.source.kbart.toload}"}, groupId = "${topic.groupid.source.withppn}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "8")
public void listenKbartToCreateFromKafka(ConsumerRecord<String, LigneKbartConnect> lignesKbart) throws IOException {
log.debug("Entrée dans création à partir du kbart");
// log.debug("Entrée dans création à partir du kbart " + nb.incrementAndGet());
String filename = lignesKbart.key();
if (!this.workInProgressMap.containsKey(filename)) {
this.workInProgressMap.put(lignesKbart.key(), new WorkInProgress<>());
this.workInProgressMap.put(filename, new WorkInProgress<>());
lignesKbart.headers().forEach(header -> {
if (header.key().equals("nbLinesTotal")) { //Si on est à la dernière ligne du fichier
this.workInProgressMap.get(filename).setNbLinesTotal(Integer.parseInt(new String(header.value()))); //on indique le nb total de lignes du fichier
}
});
}

this.workInProgressMap.get(filename).incrementCurrentNbLignes();

log.debug("(" + nb.incrementAndGet() + ") Current line : " + this.workInProgressMap.get(filename).incrementCurrentNbLignes() + " / total lines : " + this.workInProgressMap.get(filename).getNbLinesTotal());
if (lignesKbart.value().getBESTPPN() != null && !lignesKbart.value().getBESTPPN().isEmpty()) {
//on alimente la liste des notices d'un package qui sera traitée intégralement
this.workInProgressMap.get(filename).addNotice(lignesKbart.value());
}

log.debug("Current line : " + this.workInProgressMap.get(filename).getCurrentNbLines() + " / total lines : " + this.workInProgressMap.get(filename).getNbLinesTotal());
//Si le nombre de lignes traitées est égal au nombre de lignes total du fichier, on est arrivé en fin de fichier, on traite dans le sudoc
if (this.workInProgressMap.get(filename).getCurrentNbLines().equals(this.workInProgressMap.get(filename).getNbLinesTotal())) {
if (this.workInProgressMap.get(filename).getCurrentNbLines().get() == (this.workInProgressMap.get(filename).getNbLinesTotal())) {
log.debug("Traitement des notices existantes dans le Sudoc à partir du kbart");
traiterPackageDansSudoc(this.workInProgressMap.get(filename).getListeNotices(), filename);
if (!this.workInProgressMap.get(filename).getErrorMessages().isEmpty())
Expand Down Expand Up @@ -268,7 +269,7 @@ private void suppression469(String ppnNoticeBouquet, String ppn, LigneKbartConne
*
* @param providerPackageDeleted enregistrement dans kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart.todelete}"}, groupId = "${topic.groupid.source.delete}", containerFactory = "kafkaDeletePackageListenerContainerFactory")
@KafkaListener(topics = {"${topic.name.source.kbart.todelete}"}, groupId = "${topic.groupid.source.delete}", containerFactory = "kafkaDeletePackageListenerContainerFactory", concurrency = "8")
public void listenKbartToDeleteFromKafka(ConsumerRecord<String, GenericRecord> providerPackageDeleted) {
String provider = providerPackageDeleted.value().get("PROVIDER").toString();
String packageName = providerPackageDeleted.value().get("PACKAGE").toString();
Expand Down Expand Up @@ -359,7 +360,7 @@ private void suppressionLien469(SudocService service, List<String> listError, in
/**
* @param ligneKbart : enregistrement dans Kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart.exnihilo}"}, groupId = "${topic.groupid.source.exnihilo}", containerFactory = "kafkaKbartListenerContainerFactory")
@KafkaListener(topics = {"${topic.name.source.kbart.exnihilo}"}, groupId = "${topic.groupid.source.exnihilo}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "8")
public void listenKbartFromKafkaExNihilo(ConsumerRecord<String, LigneKbartConnect> ligneKbart) {
log.debug("Entrée dans création ex nihilo");
String filename = ligneKbart.key();
Expand Down Expand Up @@ -440,7 +441,7 @@ private void creerNoticeExNihilo(LigneKbartConnect ligneKbartConnect, String pro
*
* @param lignesKbart : ligne kbart + ppn de la notice imprimée
*/
@KafkaListener(topics = {"${topic.name.source.kbart.imprime}"}, groupId = "${topic.groupid.source.imprime}", containerFactory = "kafkaKbartListenerContainerFactory")
@KafkaListener(topics = {"${topic.name.source.kbart.imprime}"}, groupId = "${topic.groupid.source.imprime}", containerFactory = "kafkaKbartListenerContainerFactory", concurrency = "8")
public void listenKbartFromKafkaImprime(ConsumerRecord<String, LigneKbartImprime> lignesKbart) {
String filename = lignesKbart.key();

Expand Down
9 changes: 5 additions & 4 deletions src/main/java/fr/abes/kafkatosudoc/kafka/WorkInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,22 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Getter
@Setter
public class WorkInProgress<T> {
private final List<T> listeNotices;

private Integer currentNbLines;
private AtomicInteger currentNbLines;

private Integer nbLinesTotal;

private List<ErrorMessage> errorMessages;

public WorkInProgress() {
this.listeNotices = new ArrayList<>();
this.currentNbLines = 0;
this.currentNbLines = new AtomicInteger(0);
this.nbLinesTotal = -1;
this.errorMessages = new ArrayList<>();
}
Expand All @@ -33,8 +34,8 @@ public void addNotice(T notice) {
this.listeNotices.add(notice);
}

public void incrementCurrentNbLignes() {
this.currentNbLines++;
public Integer incrementCurrentNbLignes() {
return this.currentNbLines.incrementAndGet();
}

public void addErrorMessagesConnectionCbs(String message) {
Expand Down

0 comments on commit 7fffbb4

Please sign in to comment.