Skip to content
This repository has been archived by the owner on Oct 24, 2024. It is now read-only.

[#46] 비동기적인 Push 알람 서비스 구현 #48

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ build/
### VS Code ###
.vscode/

application.properties
application.properties
firebase
firebase/make-delivery-firebase-adminsdk-8jura-1d0b64450e.json
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>

<dependency>
<groupId>com.google.firebase</groupId>
<artifactId>firebase-admin</artifactId>
<version>6.8.1</version>
</dependency>

</dependencies>

<build>
Expand Down
41 changes: 41 additions & 0 deletions src/main/java/com/flab/makedel/config/SpringAsyncConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.flab.makedel.config;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/*
스프링의 @Async를 사용할 때 비동기처리를 새로운 스레드 풀에서 해주기 위한 설정입니다.
이 설정이 없다면 SimpleAsyncTaskExecutor를 사용하는데 이는 새로운 비동기 작업을
스레드 풀에서 처리하는 것이 아니라 새로운 스레드를 매번 생성하여 작업을 수행시킵니다.
또한 스레드 관리를 직접 할 수 없어 위험할 수 있습니다.
따라서 밑에 설정에서 스레드 풀을 빈으로 설정해서 @Async 로직이 수행될 때
이 스레드 풀을 이용하도록 설정해줍니다.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*/

@Configuration
@EnableAsync
public class SpringAsyncConfig {

private static final int CORE_POOL_SIZE = 3;
private static final int MAX_POOL_SIZE = 100;
private static final int QUEUE_CAPACITY = 3;
private static final String NAME_PREFIX = "springAsyncTask-";

@Bean(name = "springAsyncTask")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

자바에서 기본으로 제공하는 factory 메소드가 있습니다. 혹시 커스터마이징을 의도하셨다면 이유가 있으실까요?

Copy link
Collaborator Author

@tjdrnr0557 tjdrnr0557 Nov 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newFixedThreadPool은 스레드에 할당된 시간 제한이 무제한으로 설정되어 있으므로
아무런 작업 없이 대기하고 있는 스레드도 아무작업하지 않는데도 메모리를 잡아먹고 있으므로 적용할 수 없다고 생각했습니다.
시간 제한을 두어야 풀의 스레드 개수가 코어크기를 넘어설 때 제거할 수 있습니다. 물론 제거하고나면 나중에 다시 스레드를 생성해야하는 단점은 있습니다. 또한 newFixedThreadPool은 크기가 제한되지 않은 LinkedBlockingQueue를 사용하므로 푸쉬알람이 많이오면 큐에 쌓아두기만 하여 병목이 생길 수 있습니다.

또한 newCachedThreadPool은 스레드 풀의 최대 크기가 Integer.MAX_VALUE로 지정되어있어 적용하기 알맞지 않다고 생각했습니다. 왜냐하면 푸쉬알람이 아무리 많이와도 스레드의 개수를 무작정 많이 늘려도 모두 처리할 수 없기 때문에 적당한 크기의 최대 스레드 개수를 지정하기 위해 커스터마이징을 하게 되었습니다. 하지만 synchronousQueue를 이용하여 스레드를 큐에 넣지 않고 바로 스레드로 작업을 넘겨주기 때문에 푸쉬알람이 수십개가 한번에 온다면 max_pool_size를 늘려놓고 이 방식을 쓰는 것도 대안이 될 수 있습니다.

푸쉬 알람 보내는 것을 비동기 작업으로 처리할 것이기 때문에 newCachedThreadPool이 알맞긴 하지만 시간제한을 30초로 두고 직접 코어 사이즈나 큐개수등을 제한하기 위해 커스터마이징을 하게되었습니다.

taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

core pool size와 max pool size를 다르게 해주신 이유는 어떤 상황을 가정해서 다르게 주신 것일까요?

Copy link
Collaborator Author

@tjdrnr0557 tjdrnr0557 Nov 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

core pool size는 스레드 풀 안에 유지되는 스레드 개수이고
max pool size는 작업 큐에 QUEUE_CAPACITY만큼 꽉찼을 시, 작업을 처리하려면 스레드의 개수가 늘어나야하는데 스레드의 개수가 최대로 몇개까지 늘어날 수 있는 사이즈입니다. 즉 스레드 풀에서 관리하는 최대 스레드의 개수입니다.

만약 core pool size를 높게 둔다면 스레드를 여러개만들어 두니 바로 바로 처리할 수 있지만 스레드를 많이 만들어 두는 것 자체가 메모리 낭비일 수 있으므로 작게 두었습니다.
max pool size는 푸쉬알람이 한번에 몇십개를 보내야 한다면 스레드의 개수가 많이 늘어날 수 있으니 높게 설정해두었습니다.

이 둘을 다르게 설정해준 이유는 한번에 갑자기 많은 푸쉬알람을 보내야 할 때 core pool size의 스레드 개수가 알람을 보내고 response가 올떄 까지 blocking이 되면 작업 큐에 있는 작업들이 처리되지 못하여 core pool size의 스레드를 할당받지 못하고 기다리게 될 것이므로 max pool size만큼 스레드를 늘려 푸쉬 알람을 보내기 위해서 입니다.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

스레드를 생성하고 소멸시키면서 생기는 오버헤드보다는 메모리를 적게 사용하는 것을 선택하신 것 같네요~
그러면 메모리에 얼마나 차이가 있을지는 혹시 계산해보셨을까요?

Copy link
Collaborator Author

@tjdrnr0557 tjdrnr0557 Nov 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

푸쉬메세지를 보낼 때 Runtime 클래스를 이용해 메모리 사용량을 계산해보았습니다.
Max Pool을 500으로 지정해놓았을때랑 5로 지정해 놓았을 때 푸쉬메세지를 보내보면 스레드를 495개 더 만드는데에 메모리 차이는 70MB정도의 차이가 있었습니다. (매번 다르게 나오는데 이 부분은 정확히 왜 계속 다르게 나오는지 모르겠습니다)
푸쉬메세지가 한번에 폭증하는 것을 병목없이 보내주는데 70MB를 더 쓰는 것이 더 효율적이라고 생각했습니다.
만약 큐에 쌓아놓고 푸쉬메세지요청이 계속 더 들어온다면 병목이 계속 생길 수 있다고 생각하기 때문입니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

매번 다르게 나와서 정확한 수치 측정은 불가능하나

Maxpool을 500으로 잡았을때와 5로 잡았을때 메모리 차이는 50~100MB정도의 차이는 꾸준히 있었습니다.

taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. QUEUE_CAPACITY 가 상당히 낮은데 어떤 이유로 이렇게 산정하셨나요?
  2. QueueCapacity는 크기에 따른 각 어떤 장단점이 있을까요?

Copy link
Collaborator Author

@tjdrnr0557 tjdrnr0557 Nov 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


core_pool_size만큼 스레드 풀은 스레드를 만들어놓고 스레드를 처리하다 더 많은 작업량이 들어오면 작업 큐에 넣어놓고 작업 큐는 대기하고 있고 core_pool_size의 스레드들이 작업은 완료하면 작업 큐에서 한개씩 스레드를 할당해서 스레드를 재사용합니다.

만약 푸쉬알람이 폭증해서 동시에 몇십개를 보내야 하는 상황에서 QUEUE_CAPACITY가 20이고 core_pool_size로 정해놓은 스레드들이 3이라면 3개의 스레드가 푸쉬알람을 보내고 response를 받고 이 일 처리를 끝내고 이 3개의 스레드로 20개의 작업큐의 작업을 해야 하므로 작업큐에서도 blocking이 있을것이고 core_pool_size만큼의 스레드들도 푸쉬알람을 보내고 response까지 받을때까진 blocking이 있을것이므로 작업큐에서 대기하는 작업들은 굉장히 늦게 처리될 것입니다.

따라서 queue_capactiy를 아예 낮게 두어 core_pool_size의 스레드가 꽉차고 queue_capacity도 꽉찬다면 작업 큐에서 계속 작업들이 기다리는 것을 방지하고자 아예 max_pool_size를 높게 두어 한번에 몇십개씩 푸쉬알람을 보내야 할 때 스레드의 개수를 늘려 푸쉬알람 병목을 대비하였습니다.


기존 스레드풀의 core_pool_size만큼의 스레드가 모두 일을하고있고 작업큐에 작업이 쌓였다면
기존 core_pool_size만큼의 스레드가 제할일을 끝내고 작업큐에 남아있는 작업들에 할일을 끝낸 스레드를 할당합니다.
기존 스레드들은 계속 재활용되며 작업큐의 작업들에 스레드를 할당합니다.
따라서 queue_capacity가 높다면 시간은 조금 오래걸리더라도 새로운 스레드를 생성하지 않고 재활용 하기 때문에 새로운 스레드를 만드는데 메모리를 더 쓰지 않고 재활용하므로 메모리면에서 효율적입니다.

하지만 queue_capacity가 낮다면 작업큐까지 다 꽉차버리면 max_pool_size까지 스레드를 필요한만큼 늘려버리기 때문에 바로바로 일처리를 할 수 있고 병목이 생길 확률이 줄어듭니다. 시간은 적게 걸리지만 새로운 스레드들을 생성하기 때문에 새로운 스레드를 만드는데 메모리를 더 쓰게 될 것입니다.

taskExecutor.setThreadNamePrefix(NAME_PREFIX);
taskExecutor.setWaitForTasksToCompleteOnShutdown(false);
taskExecutor.setRejectedExecutionHandler(new AbortPolicy());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AbortPolicy로 설정해주신 이유가 있으실까요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort policy는 최대 스레드 크기까지 쓰고 작업큐까지 꽉차면 RejectedExecutionException 예외를 던집니다. 이 예외를 잡아서 직접 대응해줘야합니다. 만약 푸쉬 알람을 보내다 실패한다면 이 예외를 푸쉬 알람을 보내려고 한 사용자에게 throw하여 알려야 한다고 생각하여 abortpolicy로 설정하였습니다.

return taskExecutor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

import com.flab.makedel.annotation.LoginCheck;
import com.flab.makedel.annotation.LoginCheck.UserLevel;
import com.flab.makedel.dto.PushMessageDTO;
import com.flab.makedel.dto.RiderDTO;
import com.flab.makedel.service.PushService;
import com.flab.makedel.service.RiderService;
import com.google.firebase.messaging.FirebaseMessagingException;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.DeleteMapping;
Expand Down Expand Up @@ -45,5 +50,5 @@ public void acceptStandbyOrder(@PathVariable long orderId,
public void finishDeliveringOrder(@PathVariable long orderId, RiderDTO rider) {
riderService.finishDeliveringOrder(orderId, rider);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.flab.makedel.annotation.LoginCheck.UserLevel;
import com.flab.makedel.dto.StoreDTO;
import com.flab.makedel.service.StoreService;
import com.google.firebase.messaging.FirebaseMessagingException;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -82,7 +83,7 @@ public ResponseEntity<Void> openMyStore(@PathVariable long storeId,
@PostMapping("/{storeId}/orders/{orderId}/approve")
@LoginCheck(userLevel = UserLevel.OWNER)
public void approveOrder(@PathVariable long orderId, @PathVariable long storeId,
@CurrentUserId String ownerId) {
@CurrentUserId String ownerId) throws FirebaseMessagingException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이 프로젝트에서 파이어베이스를 사용한다는 것은 프로젝트 내부의 관심사이기에 외부로 파이어베이스 관련 예외를 전파하는게 아닌 다른 방식으로 처리해주시면 좋을 것 같습니다~

storeService.validateMyStore(storeId, ownerId);
storeService.approveOrder(orderId);
}
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/com/flab/makedel/dao/CartItemDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
exec 후에 오류가 나지 않은 부분은 실행된다.
exec 이전에 command queue에 적재하는 도중 실패하는 경우 (command 문법오류,메모리 부족오류등,
다른 클라이언트에서 command날려 atomic보장이 안되는 경우) 에는 exec하면 전부 discard된다.
(실험해보니 multi 후 트랜잭션중 다른 스레드에서 command를 날리면 discard된다.
오히려 다른스레드에서 그 키에 command 날린 것만 반영이 되고 원래트랜잭션은 discard된다. 아마도
원래 트랜잭션은 어차피 처리가 안되고 discard되니 다른 스레드에서 날린 command는 유효하게 처리하는거같다.)
(실험해보니 multi 후 트랜잭션중 다른 스레드에서 command를 날리면 discard된다.)
(레디스 2.6.5이후로 트랜잭션시작 후 오류가 있으면 exec될 때 전부 discard된다.)
트랜잭션 명령어들은 exec되기 위해 큐에서 기다리는데 discard를 이용해 실행을 하지 않을 수 있다.
트랜잭션의 locking은 watch를 이용한 optimistic locking이다. watch로 어떠한 키를 감시하고
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/flab/makedel/dto/PushMessageDTO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.flab.makedel.dto;

import lombok.Builder;
import lombok.Getter;

@Getter
@Builder
public class PushMessageDTO {

private final String title;

private final String content;

private final OrderReceiptDTO orderReceipt;

private final String createdAt;

public static final String RIDER_MESSAGE_TITLE = "배차 요청";
public static final String RIDER_MESSAGE_CONTENT = "근처 가게에서 주문이 승인된 후 배차 요청이 도착했습니다. 승인하시겠습니까?";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

만약 이 어플리케이션을 베트남에서도 서비스하게 된다면 이 상수가 큰 걸림돌이 되지 않을까요? 이럴 땐 어떻게 해야할까요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resources 디렉토리 안에 베트남의 언어로 출력할 메세지를 작성하고 resource bundle로 묶습니다. 그 다음 ResourceBundleMessageSoure빈이 리소스 번들을 메세지 소스로 읽어와서 이를 Local.VIETNAM 이런식으로 출력해줄 수 있습니다.


}
65 changes: 65 additions & 0 deletions src/main/java/com/flab/makedel/service/PushService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.flab.makedel.service;

import com.flab.makedel.dao.DeliveryDAO;
import com.flab.makedel.dto.PushMessageDTO;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.firebase.FirebaseApp;
import com.google.firebase.FirebaseOptions;
import com.google.firebase.messaging.BatchResponse;
import com.google.firebase.messaging.FirebaseMessaging;
import com.google.firebase.messaging.FirebaseMessagingException;
import com.google.firebase.messaging.Message;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.stereotype.Service;

@Service
@Log4j2
public class PushService {

@Value("${firebase.config.path}")
private final String firebaseConfigPath;
private final DeliveryDAO deliveryDAO;

public PushService(@Value("${firebase.config.path}") String firebaseConfigPath,
DeliveryDAO deliveryDAO) {
this.firebaseConfigPath = firebaseConfigPath;
this.deliveryDAO = deliveryDAO;
}

@PostConstruct
public void init() throws IOException {
FirebaseOptions options = new FirebaseOptions.Builder()
.setCredentials(GoogleCredentials
.fromStream(new ClassPathResource(firebaseConfigPath).getInputStream()))
.build();
if (FirebaseApp.getApps().isEmpty()) {
FirebaseApp.initializeApp(options);
}
}

public void sendMessageToStandbyRidersInSameArea(String address, PushMessageDTO pushMessage)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

다시 보니 이 클래스의 이름은 PushService인데 메소드 이름에 StandbyRidersInSameArea가 붙어있습니다. 순수하게 푸시 로직만 들어있는게 아니라 라이더 관련 책임도 들어있네요. 그러면 두 책임간의 결합도가 높은게 아닐까요?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

도메인 관계를 잘못 설정하고 있었네요. RiderService가 Pushservice에 의존하여 메세지를 보내게 변경하였습니다.

throws FirebaseMessagingException {
Set<String> tokenSet = deliveryDAO.selectStandbyRiderTokenList(address);
List<Message> messages = tokenSet.stream().map(token -> Message.builder()
.putData("title", pushMessage.getTitle())
.putData("content", pushMessage.getContent())
.putData("orderReceipt", pushMessage.getOrderReceipt().toString())
.putData("createdAt", pushMessage.getCreatedAt())
.setToken(token)
.build())
.collect(Collectors.toList());

FirebaseMessaging.getInstance().sendAll(messages);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이거를 비동기로 처리해준다면 @Async는 붙이지 않아도 되지 않을까요?

}

}
18 changes: 17 additions & 1 deletion src/main/java/com/flab/makedel/service/StoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import com.flab.makedel.dto.OrderDTO.OrderStatus;
import com.flab.makedel.dto.OrderDetailDTO;
import com.flab.makedel.dto.OrderReceiptDTO;
import com.flab.makedel.dto.PushMessageDTO;
import com.flab.makedel.dto.StoreDTO;
import com.flab.makedel.mapper.OrderMapper;
import com.flab.makedel.mapper.StoreMapper;
import com.google.firebase.messaging.FirebaseMessagingException;
import java.time.LocalDateTime;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
Expand All @@ -20,6 +23,7 @@ public class StoreService {
private final StoreMapper storeMapper;
private final OrderMapper orderMapper;
private final DeliveryService deliveryService;
private final PushService pushService;

public void insertStore(StoreDTO store) {
storeMapper.insertStore(store);
Expand Down Expand Up @@ -65,10 +69,22 @@ public void validateMyStore(long storeId, String ownerId) throws HttpClientError
}

@Transactional
public void approveOrder(long orderId) {
public void approveOrder(long orderId) throws FirebaseMessagingException {
orderMapper.approveOrder(orderId, OrderStatus.APPROVED_ORDER);
OrderReceiptDTO orderReceipt = orderMapper.selectOrderReceipt(orderId);
deliveryService.registerStandbyOrderWhenOrderApprove(orderId, orderReceipt);
pushService.sendMessageToStandbyRidersInSameArea(orderReceipt.getStoreInfo().getAddress(),
getPushMessage(orderReceipt));
}

private PushMessageDTO getPushMessage(OrderReceiptDTO orderReceipt) {
return PushMessageDTO.builder()
.title(PushMessageDTO.RIDER_MESSAGE_TITLE)
.content(PushMessageDTO.RIDER_MESSAGE_TITLE)
.createdAt(LocalDateTime.now().toString())
.orderReceipt(orderReceipt)
.build();

}

}