diff --git a/src/main/kotlin/com/hero/alignlab/config/web/ReactiveConcurrentUserWebSocketHandler.kt b/src/main/kotlin/com/hero/alignlab/config/web/ReactiveConcurrentUserWebSocketHandler.kt index f855bf5..9fa9ef3 100644 --- a/src/main/kotlin/com/hero/alignlab/config/web/ReactiveConcurrentUserWebSocketHandler.kt +++ b/src/main/kotlin/com/hero/alignlab/config/web/ReactiveConcurrentUserWebSocketHandler.kt @@ -19,6 +19,7 @@ import reactor.core.publisher.Mono import java.time.Duration import java.time.LocalDateTime +// rsocket의 경우, client의 requester 처리에 있어 부가적인 설정이 필요. 그러므로 reactive websocket으로 진행 // poc 용도의 handler, 리펙토링 예정 @Component class ReactiveConcurrentUserWebSocketHandler( @@ -104,7 +105,11 @@ class ReactiveConcurrentUserWebSocketHandler( Flux.interval(Duration.ofMillis(1000L)) .zipWith(eventFlux) { _, event -> event } .map(session::textMessage) - ).and(session.receive().map(WebSocketMessage::getPayloadAsText).log()).then() + ).and(session.receive().map(WebSocketMessage::getPayloadAsText).log()) + .doFinally { + handleSessionTermination(session, user.uid) + } + .then() } private fun isTokenHeader(headerKey: String): Boolean { @@ -116,6 +121,66 @@ class ReactiveConcurrentUserWebSocketHandler( val regex = Regex("/ws/v1/groups/(\\w+)/concurrent-users") return regex.find(uri)?.groupValues?.get(1) } + + private fun removeSessionFromMap(session: WebSocketSession) { + concurrentUserMap.forEach { (groupId, uidBySession) -> + val userToRemove = uidBySession.filterValues { it == session }.keys.firstOrNull() + if (userToRemove != null) { + uidBySession.remove(userToRemove) + logger.info { "Removed session for user $userToRemove from group $groupId" } + if (uidBySession.isEmpty()) { + concurrentUserMap.remove(groupId) + logger.info { "Removed group $groupId as it has no more users." } + } + } + } + } + + private fun handleSessionTermination(session: WebSocketSession, uid: Long) { + concurrentUserMap.forEach { (groupId, uidBySession) -> + val removedUser = uidBySession.remove(uid) + + if (removedUser != null) { + logger.info { "Removed session for user $uid from group $groupId" } + + if (uidBySession.isEmpty()) { + concurrentUserMap.remove(groupId) + logger.info { "Removed group $groupId as it has no more users." } + } else { + // Send the updated group status to remaining users + sendUpdatedGroupStatus(groupId, uidBySession) + } + } + } + } + + private fun sendUpdatedGroupStatus(groupId: Long, uidBySession: MutableMap) { + val uids = uidBySession.keys + val userInfoByUid = userInfoService.findAllByIds(uids.toList()).associateBy { it.id } + + val groupUsers = groupUserService.findAllByGroupIdAndUids(groupId, userInfoByUid.keys) + .associateBy { it.uid } + + val message = ConcurrentMessage( + groupId = groupId, + groupUsers = userInfoByUid.mapNotNull { (uid, info) -> + val groupUser = groupUsers[uid] ?: return@mapNotNull null + + ConcurrentMessage.ConcurrentUser( + groupUserId = groupUser.id, + uid = uid, + nickname = info.nickname + ) + } + ) + + uidBySession.forEach { (_, websocketSession) -> + websocketSession + .send(Mono.just(websocketSession.textMessage(mapper.writeValueAsString(message)))) + .subscribe() + } + } + } data class ConcurrentMessage(