Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multithread #25

Merged
merged 4 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import java.io.Serializable;
import java.util.Date;

@AllArgsConstructor
@Entity
@Table(name = "PROVIDER_PACKAGE")
@Getter @Setter
@AllArgsConstructor
@Getter
@Setter
@NoArgsConstructor
public class ProviderPackage implements Serializable, Comparable<ProviderPackage> {
@Id
Expand Down
43 changes: 29 additions & 14 deletions src/main/java/fr/abes/kafkatosudoc/kafka/KbartListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public class KbartListener {

private String filename = "";

private int currentNbLines = 0;
private int nbLinesTotal = -1;

public KbartListener(UtilsMapper mapper, SudocService service, BaconService baconService, EmailService emailService) {
this.mapper = mapper;
this.service = service;
Expand All @@ -57,24 +60,36 @@ public KbartListener(UtilsMapper mapper, SudocService service, BaconService baco
* @param lignesKbart : ligne trouvée dans kafka
*/
@KafkaListener(topics = {"${topic.name.source.kbart.toload}"}, groupId = "${topic.groupid.source.withppn}", containerFactory = "kafkaKbartListenerContainerFactory")
public void listenKbartToCreateFromKafka(ConsumerRecord<String, LigneKbartConnect> lignesKbart) throws IllegalDateException {
this.filename = getFileNameFromHeader(lignesKbart.headers());
String provider = CheckFiles.getProviderFromFilename(this.filename);
String packageName = CheckFiles.getPackageFromFilename(this.filename);
Date dateFromFile = CheckFiles.extractDate(this.filename);
PackageKbartDto packageKbartDto = new PackageKbartDto(packageName, dateFromFile, provider);
public void listenKbartToCreateFromKafka(ConsumerRecord<String, LigneKbartConnect> lignesKbart) {
try {
this.filename = getFileNameFromHeader(lignesKbart.headers());
String provider = CheckFiles.getProviderFromFilename(this.filename);
String packageName = CheckFiles.getPackageFromFilename(this.filename);
Date dateFromFile = CheckFiles.extractDate(this.filename);
PackageKbartDto packageKbartDto = new PackageKbartDto(packageName, dateFromFile, provider);

if (lignesKbart.value().getBESTPPN() != null && !lignesKbart.value().getBESTPPN().isEmpty()) {
//on alimente la liste des notices d'un package qui sera traitée intégralement
this.listeNotices.add(lignesKbart.value());
}
for (Header header : lignesKbart.headers().toArray()) {
if (header.key().equals("OK") && new String(header.value()).equals("true")) {
if (lignesKbart.value().getBESTPPN() != null && !lignesKbart.value().getBESTPPN().isEmpty()) {
//on alimente la liste des notices d'un package qui sera traitée intégralement
this.listeNotices.add(lignesKbart.value());
}
this.currentNbLines += 1;
for (Header header : lignesKbart.headers().toArray()) {
if (header.key().equals("nbLinesTotal")) { //Si on est à la dernière ligne du fichier
log.info("nombre total de lignes du fichier :" + new String(header.value()));
this.nbLinesTotal = Integer.parseInt(new String(header.value())); //on indique le nb total de lignes du fichier
}
}
//Si le nombre de lignes traitées est égal au nombre de lignes total du fichier, on est arrivé en fin de fichier, on traite dans le sudoc
if(this.currentNbLines == this.nbLinesTotal){
log.debug("Traitement des notices existantes dans le Sudoc");
traiterPackageDansSudoc(listeNotices, packageKbartDto);
this.listeNotices.clear();
this.filename = "";
break;
this.nbLinesTotal = -1;
this.currentNbLines = 0;
}
} catch (IllegalDateException ex) {
log.error("Erreur dans le format de date sur le fichier " + filename);
}
}

Expand Down Expand Up @@ -283,7 +298,7 @@ public void listenKbartFromKafkaImprime(ConsumerRecord<String, LigneKbartImprime
//Ajout provider display name en 214 $c 2è occurrence
String providerDisplay = baconService.getProviderDisplayName(provider);
if (providerDisplay != null) {
List<Zone> zones214 = noticeElec.getNoticeBiblio().findZones("214").stream().filter(zone -> Arrays.toString(zone.getIndicateurs()).toString().equals("[#, 2]")).toList();
List<Zone> zones214 = noticeElec.getNoticeBiblio().findZones("214").stream().filter(zone -> Arrays.toString(zone.getIndicateurs()).equals("[#, 2]")).toList();
for (Zone zone : zones214)
zone.addSubLabel("c", providerDisplay);
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/fr/abes/kafkatosudoc/service/EmailService.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public void sendErrorMailConnect(String filename, LigneKbartConnect kbart, Excep
}

public void sendErrorMailAuthentification(String filename, PackageKbartDto packageKbartDto, CBSException e) {
String body = "Une erreur s'est produite lors de l'authentification sur le CBS" + "Provider : " + packageKbartDto.getProvider() +
"Package : " + packageKbartDto.getPackageName() +
"Date : " + packageKbartDto.getDatePackage();
String body = "Une erreur s'est produite lors de l'authentification sur le CBS " + "Provider : " + packageKbartDto.getProvider() +
" Package : " + packageKbartDto.getPackageName() +
" Date : " + packageKbartDto.getDatePackage();
// Création du mail
String requestJson = mailToJSON(this.recipient, "[CONVERGENCE]["+env.toUpperCase()+"] Erreur lors du traitement sur le fichier " + filename, body);
// Envoi du message par mail
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/fr/abes/kafkatosudoc/utils/NoticeMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand All @@ -35,10 +34,12 @@ public NoticeConcrete convert(MappingContext<LigneKbartConnect, NoticeConcrete>
//ajout zone type de document
noticeBiblio.addZone("008", "$a", "Oax3");
//ajout ISBN
if ((Utils.extractOnlineIdentifier(kbart.getONLINEIDENTIFIER().toString()).length() == 10)) {
noticeBiblio.addZone("010", "$a", kbart.getONLINEIDENTIFIER().toString());
} else {
noticeBiblio.addZone("010", "$A", kbart.getONLINEIDENTIFIER().toString());
if (!kbart.getONLINEIDENTIFIER().isEmpty()) {
if ((Utils.extractOnlineIdentifier(kbart.getONLINEIDENTIFIER().toString()).length() == 10)) {
noticeBiblio.addZone("010", "$a", kbart.getONLINEIDENTIFIER().toString());
} else {
noticeBiblio.addZone("010", "$A", kbart.getONLINEIDENTIFIER().toString());
}
}

//DOI
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ logging.config=classpath:log4j2-all.xml
auto.create.topics.enable=false
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.properties.isolation.level= read_committed
spring.kafka.consumer.properties.isolation.level=read_committed

topic.name.source.kbart.toload=bacon.kbart.withppn.toload
topic.name.source.kbart.todelete=bacon.kbart.todelete.PROVIDER_PACKAGE_DELETED
Expand Down
1 change: 0 additions & 1 deletion src/main/resources/log4j2-all.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
<Property name="logFileName">${bundle:application:application.name}</Property>
<Property name="errorFileName">${bundle:application:application.name}_error</Property>
<Property name="debugFileName">${bundle:application:application.name}_debug</Property>
<Property name="kafkaServer">${env:SPRING_KAFKA_PRODUCER_BOOTSTRAP_SERVERS}</Property>

<!-- Politique de gestion des fichiers -->
<Property name="sizeTriggerPolicy">100 MB</Property>
Expand Down
Loading