Skip to content

Commit

Permalink
[#31] 추가된 기본정보 저장 consumer 추가 (#32)
Browse files Browse the repository at this point in the history
- 전체 BasicInfo 사용
- updateConsumer에도 수동 커밋을 할지 차후 결정
  • Loading branch information
JHwan96 authored Oct 21, 2024
1 parent c14c804 commit 0a557d1
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.healthmap.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.healthmap.config.KafkaProperties;
import org.healthmap.db.medicalfacility.MedicalFacilityEntity;
import org.healthmap.db.medicalfacility.MedicalFacilityRepository;
import org.healthmap.dto.BasicInfoDto;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionTemplate;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Service
@RequiredArgsConstructor
public class BasicInfoSaveConsumer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final TransactionTemplate transactionTemplate;
private final KafkaProperties kafkaProperties;
private final MedicalFacilityRepository medicalFacilityRepository;
private final ExecutorService executorService = Executors.newFixedThreadPool(20);
private AtomicInteger count = new AtomicInteger(0); // 동작 확인용

@KafkaListener(topics = "${kafka-config.consumer.update-topic}",
groupId = "${kafka-config.consumer.save-groupId}",
containerFactory = "saveBasicInfoContainerFactory")
public void saveBasicInfo(BasicInfoDto dto, Acknowledgment ack) {
CompletableFuture.supplyAsync(() -> {
Boolean transaction = transactionTemplate.execute(status -> {
try {
MedicalFacilityEntity findEntity = medicalFacilityRepository.findById(dto.getCode()).orElse(null);
saveMedicalFacility(dto, findEntity);
return true;
} catch (Exception e) {
log.error("Save new medical facility error: {}", e.getMessage(), e);
status.setRollbackOnly();
return false;
}
});
if(transaction != null && transaction) {
return dto;
} else {
ack.nack(Duration.ofMillis(500));
return null;
}
}, executorService)
.thenAccept(basicInfoDto -> {
if (basicInfoDto != null) {
kafkaTemplate.send(kafkaProperties.getDetailTopic(), dto.getCode());
ack.acknowledge();
}
});

}

private void saveMedicalFacility(BasicInfoDto dto, MedicalFacilityEntity entity) {
if (entity == null) {
MedicalFacilityEntity saveEntity = convertDtoToEntity(dto);
medicalFacilityRepository.save(saveEntity);
count.incrementAndGet();
log.info("save count : {}", count.get());
}
}

private MedicalFacilityEntity convertDtoToEntity(BasicInfoDto dto) {
return MedicalFacilityEntity.of(dto.getCode(), dto.getName(), dto.getAddress(), dto.getPhoneNumber(), dto.getPageUrl(),
dto.getType(), dto.getState(), dto.getCity(), dto.getTown(), dto.getPostNumber(), dto.getCoordinate(), null,
null, null, null, null, null, null, null,
null, null, null, null, null, null, null,
null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ public class BasicInfoUpdateConsumer {
private final KafkaTemplate<String, BasicInfoDto> kafkaTemplate;
private final KafkaProperties kafkaProperties;
private final TransactionTemplate transactionTemplate;
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final ExecutorService executorService = Executors.newFixedThreadPool(20);
private AtomicInteger count = new AtomicInteger(0); // 동작 확인용
private AtomicInteger realCount = new AtomicInteger(0); // 동작 확인용

@KafkaListener(topics = "${kafka-config.consumer.basic-topic}",
groupId = "${kafka-config.consumer.groupId}",
containerFactory = "basicInfoKafkaListenerContainerFactory")
public void listen(BasicInfoDto dto) {
public void updateBasicInfo(BasicInfoDto dto) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
transactionTemplate.execute(status -> {
try {
updateMedicalFacility(dto);
realCount.incrementAndGet();
if(realCount.get() % 5000 == 0) {
log.info("updated now : {}", realCount.get());
if (realCount.get() % 5000 == 0) {
log.info("updated count : {}", realCount.get());
}
} catch (Exception e) {
log.error("Updating medical facility error: {}", e.getMessage(), e);
Expand All @@ -56,15 +56,15 @@ public void listen(BasicInfoDto dto) {
future.join();
}

public void updateMedicalFacility(BasicInfoDto dto) {
private void updateMedicalFacility(BasicInfoDto dto) {
MedicalFacilityEntity entity = medicalFacilityRepository.findById(dto.getCode()).orElse(null);
if (entity != null) {
medicalFacilityRepository.updateBasicInfo(entity.getId(), entity.getName(), entity.getAddress(), entity.getPhoneNumber(),
entity.getUrl(), entity.getType(), entity.getState(), entity.getCity(), entity.getTown(), entity.getPostNumber(),
entity.getCoordinate());
} else {
count.incrementAndGet();
log.info("update 완료 : {}", count.get());
log.info("update 개수 : {}", count.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public ConsumerFactory<String, BasicInfoDto> basicInfoConsumerConfig() {
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerConfig());
factory.setConcurrency(10); //TODO: 차후 변경
factory.setConcurrency(10); //TODO: partition 개수만큼 차후 변경
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.executor());

Expand All @@ -73,10 +73,20 @@ public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaLi
public ConcurrentKafkaListenerContainerFactory<String, BasicInfoDto> basicInfoKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BasicInfoDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(basicInfoConsumerConfig());
factory.setConcurrency(10); //TODO: partition 개수만큼?
factory.setConcurrency(10); //TODO: partition 개수만큼
factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.executor());
//TODO: ACK를 수동? 자동?

return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, BasicInfoDto> saveBasicInfoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BasicInfoDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(basicInfoConsumerConfig());
factory.setConcurrency(10);
factory.getContainerProperties().setListenerTaskExecutor(taskExecutorConfig.saveExecutor());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ public class KafkaProperties {
private final String detailTopic;
private final String basicTopic;
private final String updateTopic;
private final String deleteTopic;
private final String groupId;
private final String saveGroupId;

public KafkaProperties(String detailTopic, String basicTopic, String updateTopic, String groupId) {
public KafkaProperties(String detailTopic, String basicTopic, String updateTopic, String deleteTopic, String groupId, String saveGroupId) {
this.detailTopic = detailTopic;
this.basicTopic = basicTopic;
this.updateTopic = updateTopic;
this.deleteTopic = deleteTopic;
this.groupId = groupId;
this.saveGroupId = saveGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ public class TaskExecutorConfig {
@Bean
public AsyncTaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setCorePoolSize(15);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("KafkaExecutor-");
executor.initialize();
return executor;
}
@Bean
public AsyncTaskExecutor saveExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(15);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("KafkaExecutorForSave-");
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class MedicalFacilityApi {
private final int rowSize; //한 페이지 결과 수
private final String serviceKey;
private final String numOfRows;
private final ExecutorService executorService;
private final ExecutorService openApiExecutorService;
private final ExecutorService localExecutorService;
private final HttpClient client;
private final ObjectMapper xmlMapper;

Expand All @@ -45,7 +46,8 @@ public MedicalFacilityApi(KeyProperties keyInfo) {
this.rowSize = 5000;
this.serviceKey = "?serviceKey=" + keyInfo.getServerKey();
this.numOfRows = "&numOfRows=" + rowSize;
this.executorService = Executors.newFixedThreadPool(10);
this.localExecutorService = Executors.newFixedThreadPool(50);
this.openApiExecutorService = Executors.newFixedThreadPool(15);
this.client = HttpClient.newBuilder().build();
this.xmlMapper = new XmlMapper();
}
Expand All @@ -68,15 +70,15 @@ public CompletableFuture<List<MedicalFacilityXmlDto>> getMedicalFacilityInfoList
log.error(e.getMessage());
throw new OpenApiProblemException(OpenApiErrorCode.SERVER_ERROR);
}
}, executorService)
.thenApply(response -> {
}, openApiExecutorService)
.thenApplyAsync(response -> {
int statusCode = response.statusCode();
if (statusCode == HttpURLConnection.HTTP_OK) {
return response.body();
} else {
throw new OpenApiProblemException(OpenApiErrorCode.OPEN_API_REQUEST_ERROR);
}
})
}, localExecutorService)
.thenApply(this::getMedicalFacilityXmlDtoList);
}

Expand Down

0 comments on commit 0a557d1

Please sign in to comment.