diff --git a/consumer/src/main/java/org/healthmap/service/BasicInfoSaveConsumer.java b/consumer/src/main/java/org/healthmap/service/BasicInfoSaveConsumer.java new file mode 100644 index 0000000..ed6f9f2 --- /dev/null +++ b/consumer/src/main/java/org/healthmap/service/BasicInfoSaveConsumer.java @@ -0,0 +1,80 @@ +package org.healthmap.service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.healthmap.config.KafkaProperties; +import org.healthmap.db.medicalfacility.MedicalFacilityEntity; +import org.healthmap.db.medicalfacility.MedicalFacilityRepository; +import org.healthmap.dto.BasicInfoDto; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionTemplate; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +@Service +@RequiredArgsConstructor +public class BasicInfoSaveConsumer { + private final KafkaTemplate kafkaTemplate; + private final TransactionTemplate transactionTemplate; + private final KafkaProperties kafkaProperties; + private final MedicalFacilityRepository medicalFacilityRepository; + private final ExecutorService executorService = Executors.newFixedThreadPool(20); + private AtomicInteger count = new AtomicInteger(0); // 동작 확인용 + + @KafkaListener(topics = "${kafka-config.consumer.update-topic}", + groupId = "${kafka-config.consumer.save-groupId}", + containerFactory = "saveBasicInfoContainerFactory") + public void saveBasicInfo(BasicInfoDto dto, Acknowledgment ack) { + CompletableFuture.supplyAsync(() -> { + Boolean transaction = transactionTemplate.execute(status -> { + try { + MedicalFacilityEntity findEntity = medicalFacilityRepository.findById(dto.getCode()).orElse(null); + saveMedicalFacility(dto, findEntity); + return true; + } catch (Exception e) { + log.error("Save new medical facility error: {}", e.getMessage(), e); + status.setRollbackOnly(); + return false; + } + }); + if(transaction != null && transaction) { + return dto; + } else { + ack.nack(Duration.ofMillis(500)); + return null; + } + }, executorService) + .thenAccept(basicInfoDto -> { + if (basicInfoDto != null) { + kafkaTemplate.send(kafkaProperties.getDetailTopic(), dto.getCode()); + ack.acknowledge(); + } + }); + + } + + private void saveMedicalFacility(BasicInfoDto dto, MedicalFacilityEntity entity) { + if (entity == null) { + MedicalFacilityEntity saveEntity = convertDtoToEntity(dto); + medicalFacilityRepository.save(saveEntity); + count.incrementAndGet(); + log.info("save count : {}", count.get()); + } + } + + private MedicalFacilityEntity convertDtoToEntity(BasicInfoDto dto) { + return MedicalFacilityEntity.of(dto.getCode(), dto.getName(), dto.getAddress(), dto.getPhoneNumber(), dto.getPageUrl(), + dto.getType(), dto.getState(), dto.getCity(), dto.getTown(), dto.getPostNumber(), dto.getCoordinate(), null, + null, null, null, null, null, null, null, + null, null, null, null, null, null, null, + null, null); + } +} diff --git a/consumer/src/main/java/org/healthmap/service/BasicInfoUpdateConsumer.java b/consumer/src/main/java/org/healthmap/service/BasicInfoUpdateConsumer.java index fa4f94d..8ae62cb 100644 --- a/consumer/src/main/java/org/healthmap/service/BasicInfoUpdateConsumer.java +++ b/consumer/src/main/java/org/healthmap/service/BasicInfoUpdateConsumer.java @@ -24,21 +24,21 @@ public class BasicInfoUpdateConsumer { private final KafkaTemplate kafkaTemplate; private final KafkaProperties kafkaProperties; private final TransactionTemplate transactionTemplate; - private final ExecutorService executorService = Executors.newFixedThreadPool(10); + private final ExecutorService executorService = Executors.newFixedThreadPool(20); private AtomicInteger count = new AtomicInteger(0); // 동작 확인용 private AtomicInteger realCount = new AtomicInteger(0); // 동작 확인용 @KafkaListener(topics = "${kafka-config.consumer.basic-topic}", groupId = "${kafka-config.consumer.groupId}", containerFactory = "basicInfoKafkaListenerContainerFactory") - public void listen(BasicInfoDto dto) { + public void updateBasicInfo(BasicInfoDto dto) { CompletableFuture future = CompletableFuture.supplyAsync(() -> { transactionTemplate.execute(status -> { try { updateMedicalFacility(dto); realCount.incrementAndGet(); - if(realCount.get() % 5000 == 0) { - log.info("updated now : {}", realCount.get()); + if (realCount.get() % 5000 == 0) { + log.info("updated count : {}", realCount.get()); } } catch (Exception e) { log.error("Updating medical facility error: {}", e.getMessage(), e); @@ -56,7 +56,7 @@ public void listen(BasicInfoDto dto) { future.join(); } - public void updateMedicalFacility(BasicInfoDto dto) { + private void updateMedicalFacility(BasicInfoDto dto) { MedicalFacilityEntity entity = medicalFacilityRepository.findById(dto.getCode()).orElse(null); if (entity != null) { medicalFacilityRepository.updateBasicInfo(entity.getId(), entity.getName(), entity.getAddress(), entity.getPhoneNumber(), @@ -64,7 +64,7 @@ public void updateMedicalFacility(BasicInfoDto dto) { entity.getCoordinate()); } else { count.incrementAndGet(); - log.info("update 완료 : {}", count.get()); + log.info("update 개수 : {}", count.get()); } } } diff --git a/messaging/src/main/java/org/healthmap/config/KafkaConsumerConfig.java b/messaging/src/main/java/org/healthmap/config/KafkaConsumerConfig.java index 8e93af4..b8e57c2 100644 --- a/messaging/src/main/java/org/healthmap/config/KafkaConsumerConfig.java +++ b/messaging/src/main/java/org/healthmap/config/KafkaConsumerConfig.java @@ -62,7 +62,7 @@ public ConsumerFactory basicInfoConsumerConfig() { public ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerConfig()); - factory.setConcurrency(10); //TODO: 차후 변경 + factory.setConcurrency(10); //TODO: partition 개수만큼 차후 변경 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.executor()); @@ -73,10 +73,20 @@ public ConcurrentKafkaListenerContainerFactory concurrentKafkaLi public ConcurrentKafkaListenerContainerFactory basicInfoKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(basicInfoConsumerConfig()); - factory.setConcurrency(10); //TODO: partition 개수만큼? + factory.setConcurrency(10); //TODO: partition 개수만큼 factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.executor()); //TODO: ACK를 수동? 자동? + return factory; + } + @Bean + public ConcurrentKafkaListenerContainerFactory saveBasicInfoContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(basicInfoConsumerConfig()); + factory.setConcurrency(10); + factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.saveExecutor()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + return factory; } } diff --git a/messaging/src/main/java/org/healthmap/config/KafkaProperties.java b/messaging/src/main/java/org/healthmap/config/KafkaProperties.java index b12aa80..9533e10 100644 --- a/messaging/src/main/java/org/healthmap/config/KafkaProperties.java +++ b/messaging/src/main/java/org/healthmap/config/KafkaProperties.java @@ -9,12 +9,16 @@ public class KafkaProperties { private final String detailTopic; private final String basicTopic; private final String updateTopic; + private final String deleteTopic; private final String groupId; + private final String saveGroupId; - public KafkaProperties(String detailTopic, String basicTopic, String updateTopic, String groupId) { + public KafkaProperties(String detailTopic, String basicTopic, String updateTopic, String deleteTopic, String groupId, String saveGroupId) { this.detailTopic = detailTopic; this.basicTopic = basicTopic; this.updateTopic = updateTopic; + this.deleteTopic = deleteTopic; this.groupId = groupId; + this.saveGroupId = saveGroupId; } } diff --git a/messaging/src/main/java/org/healthmap/config/TaskExecutorConfig.java b/messaging/src/main/java/org/healthmap/config/TaskExecutorConfig.java index 9b3d329..6452178 100644 --- a/messaging/src/main/java/org/healthmap/config/TaskExecutorConfig.java +++ b/messaging/src/main/java/org/healthmap/config/TaskExecutorConfig.java @@ -10,11 +10,21 @@ public class TaskExecutorConfig { @Bean public AsyncTaskExecutor executor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(5); - executor.setMaxPoolSize(10); + executor.setCorePoolSize(15); + executor.setMaxPoolSize(20); executor.setQueueCapacity(100); executor.setThreadNamePrefix("KafkaExecutor-"); executor.initialize(); return executor; } + @Bean + public AsyncTaskExecutor saveExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(15); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("KafkaExecutorForSave-"); + executor.initialize(); + return executor; + } } diff --git a/openapi/src/main/java/org/healthmap/openapi/api/MedicalFacilityApi.java b/openapi/src/main/java/org/healthmap/openapi/api/MedicalFacilityApi.java index 149d28d..71ce262 100644 --- a/openapi/src/main/java/org/healthmap/openapi/api/MedicalFacilityApi.java +++ b/openapi/src/main/java/org/healthmap/openapi/api/MedicalFacilityApi.java @@ -35,7 +35,8 @@ public class MedicalFacilityApi { private final int rowSize; //한 페이지 결과 수 private final String serviceKey; private final String numOfRows; - private final ExecutorService executorService; + private final ExecutorService openApiExecutorService; + private final ExecutorService localExecutorService; private final HttpClient client; private final ObjectMapper xmlMapper; @@ -45,7 +46,8 @@ public MedicalFacilityApi(KeyProperties keyInfo) { this.rowSize = 5000; this.serviceKey = "?serviceKey=" + keyInfo.getServerKey(); this.numOfRows = "&numOfRows=" + rowSize; - this.executorService = Executors.newFixedThreadPool(10); + this.localExecutorService = Executors.newFixedThreadPool(50); + this.openApiExecutorService = Executors.newFixedThreadPool(15); this.client = HttpClient.newBuilder().build(); this.xmlMapper = new XmlMapper(); } @@ -68,15 +70,15 @@ public CompletableFuture> getMedicalFacilityInfoList log.error(e.getMessage()); throw new OpenApiProblemException(OpenApiErrorCode.SERVER_ERROR); } - }, executorService) - .thenApply(response -> { + }, openApiExecutorService) + .thenApplyAsync(response -> { int statusCode = response.statusCode(); if (statusCode == HttpURLConnection.HTTP_OK) { return response.body(); } else { throw new OpenApiProblemException(OpenApiErrorCode.OPEN_API_REQUEST_ERROR); } - }) + }, localExecutorService) .thenApply(this::getMedicalFacilityXmlDtoList); }