Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#31] 추가된 기본정보 저장 consumer 추가 #32

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.healthmap.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.healthmap.config.KafkaProperties;
import org.healthmap.db.medicalfacility.MedicalFacilityEntity;
import org.healthmap.db.medicalfacility.MedicalFacilityRepository;
import org.healthmap.dto.BasicInfoDto;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Service
@RequiredArgsConstructor
public class BasicInfoSaveConsumer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final TransactionTemplate transactionTemplate;
private final KafkaProperties kafkaProperties;
private final MedicalFacilityRepository medicalFacilityRepository;
private final ExecutorService executorService = Executors.newFixedThreadPool(20);
private AtomicInteger count = new AtomicInteger(0); // 동작 확인용

@KafkaListener(topics = "${kafka-config.consumer.update-topic}",
groupId = "${kafka-config.consumer.save-groupId}",
containerFactory = "saveBasicInfoContainerFactory")
public void saveBasicInfo(BasicInfoDto dto, Acknowledgment ack) {
CompletableFuture.supplyAsync(() -> {
Boolean transaction = transactionTemplate.execute(status -> {
try {
MedicalFacilityEntity findEntity = medicalFacilityRepository.findById(dto.getCode()).orElse(null);
saveMedicalFacility(dto, findEntity);
return true;
} catch (Exception e) {
log.error("Save new medical facility error: {}", e.getMessage(), e);
status.setRollbackOnly();
return false;
}
});
if(transaction != null && transaction) {
return dto;
} else {
ack.nack(Duration.ofMillis(500));
return null;
}
}, executorService)
.thenAccept(basicInfoDto -> {
if (basicInfoDto != null) {
kafkaTemplate.send(kafkaProperties.getDetailTopic(), dto.getCode());
ack.acknowledge();
}
});

}

private void saveMedicalFacility(BasicInfoDto dto, MedicalFacilityEntity entity) {
if (entity == null) {
MedicalFacilityEntity saveEntity = convertDtoToEntity(dto);
medicalFacilityRepository.save(saveEntity);
count.incrementAndGet();
log.info("save count : {}", count.get());
}
}

private MedicalFacilityEntity convertDtoToEntity(BasicInfoDto dto) {
return MedicalFacilityEntity.of(dto.getCode(), dto.getName(), dto.getAddress(), dto.getPhoneNumber(), dto.getPageUrl(),
dto.getType(), dto.getState(), dto.getCity(), dto.getTown(), dto.getPostNumber(), dto.getCoordinate(), null,
null, null, null, null, null, null, null,
null, null, null, null, null, null, null,
null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ public class BasicInfoUpdateConsumer {
private final KafkaTemplate<String, BasicInfoDto> kafkaTemplate;
private final KafkaProperties kafkaProperties;
private final TransactionTemplate transactionTemplate;
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final ExecutorService executorService = Executors.newFixedThreadPool(20);
private AtomicInteger count = new AtomicInteger(0); // 동작 확인용
private AtomicInteger realCount = new AtomicInteger(0); // 동작 확인용

@KafkaListener(topics = "${kafka-config.consumer.basic-topic}",
groupId = "${kafka-config.consumer.groupId}",
containerFactory = "basicInfoKafkaListenerContainerFactory")
public void listen(BasicInfoDto dto) {
public void updateBasicInfo(BasicInfoDto dto) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
transactionTemplate.execute(status -> {
try {
updateMedicalFacility(dto);
realCount.incrementAndGet();
if(realCount.get() % 5000 == 0) {
log.info("updated now : {}", realCount.get());
if (realCount.get() % 5000 == 0) {
log.info("updated count : {}", realCount.get());
}
} catch (Exception e) {
log.error("Updating medical facility error: {}", e.getMessage(), e);
Expand All @@ -56,15 +56,15 @@ public void listen(BasicInfoDto dto) {
future.join();
}

public void updateMedicalFacility(BasicInfoDto dto) {
private void updateMedicalFacility(BasicInfoDto dto) {
MedicalFacilityEntity entity = medicalFacilityRepository.findById(dto.getCode()).orElse(null);
if (entity != null) {
medicalFacilityRepository.updateBasicInfo(entity.getId(), entity.getName(), entity.getAddress(), entity.getPhoneNumber(),
entity.getUrl(), entity.getType(), entity.getState(), entity.getCity(), entity.getTown(), entity.getPostNumber(),
entity.getCoordinate());
} else {
count.incrementAndGet();
log.info("update 완료 : {}", count.get());
log.info("update 개수 : {}", count.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public ConsumerFactory<String, BasicInfoDto> basicInfoConsumerConfig() {
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerConfig());
factory.setConcurrency(10); //TODO: 차후 변경
factory.setConcurrency(10); //TODO: partition 개수만큼 차후 변경
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.executor());

Expand All @@ -73,10 +73,20 @@ public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaLi
public ConcurrentKafkaListenerContainerFactory<String, BasicInfoDto> basicInfoKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BasicInfoDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(basicInfoConsumerConfig());
factory.setConcurrency(10); //TODO: partition 개수만큼?
factory.setConcurrency(10); //TODO: partition 개수만큼
factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.executor());
//TODO: ACK를 수동? 자동?

return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, BasicInfoDto> saveBasicInfoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BasicInfoDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(basicInfoConsumerConfig());
factory.setConcurrency(10);
factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.saveExecutor());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ public class KafkaProperties {
private final String detailTopic;
private final String basicTopic;
private final String updateTopic;
private final String deleteTopic;
private final String groupId;
private final String saveGroupId;

public KafkaProperties(String detailTopic, String basicTopic, String updateTopic, String groupId) {
public KafkaProperties(String detailTopic, String basicTopic, String updateTopic, String deleteTopic, String groupId, String saveGroupId) {
this.detailTopic = detailTopic;
this.basicTopic = basicTopic;
this.updateTopic = updateTopic;
this.deleteTopic = deleteTopic;
this.groupId = groupId;
this.saveGroupId = saveGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ public class TaskExecutorConfig {
@Bean
public AsyncTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setCorePoolSize(15);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("KafkaExecutor-");
executor.initialize();
return executor;
}
@Bean
public AsyncTaskExecutor saveExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(15);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("KafkaExecutorForSave-");
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class MedicalFacilityApi {
private final int rowSize; //한 페이지 결과 수
private final String serviceKey;
private final String numOfRows;
private final ExecutorService executorService;
private final ExecutorService openApiExecutorService;
private final ExecutorService localExecutorService;
private final HttpClient client;
private final ObjectMapper xmlMapper;

Expand All @@ -45,7 +46,8 @@ public MedicalFacilityApi(KeyProperties keyInfo) {
this.rowSize = 5000;
this.serviceKey = "?serviceKey=" + keyInfo.getServerKey();
this.numOfRows = "&numOfRows=" + rowSize;
this.executorService = Executors.newFixedThreadPool(10);
this.localExecutorService = Executors.newFixedThreadPool(50);
this.openApiExecutorService = Executors.newFixedThreadPool(15);
this.client = HttpClient.newBuilder().build();
this.xmlMapper = new XmlMapper();
}
Expand All @@ -68,15 +70,15 @@ public CompletableFuture<List<MedicalFacilityXmlDto>> getMedicalFacilityInfoList
log.error(e.getMessage());
throw new OpenApiProblemException(OpenApiErrorCode.SERVER_ERROR);
}
}, executorService)
.thenApply(response -> {
}, openApiExecutorService)
.thenApplyAsync(response -> {
int statusCode = response.statusCode();
if (statusCode == HttpURLConnection.HTTP_OK) {
return response.body();
} else {
throw new OpenApiProblemException(OpenApiErrorCode.OPEN_API_REQUEST_ERROR);
}
})
}, localExecutorService)
.thenApply(this::getMedicalFacilityXmlDtoList);
}

Expand Down
Loading