diff --git a/src/main/kotlin/com/hero/alignlab/ws/handler/ReactiveConcurrentUserWebSocketHandler.kt b/src/main/kotlin/com/hero/alignlab/ws/handler/ReactiveConcurrentUserWebSocketHandler.kt index cb59337..23b479e 100644 --- a/src/main/kotlin/com/hero/alignlab/ws/handler/ReactiveConcurrentUserWebSocketHandler.kt +++ b/src/main/kotlin/com/hero/alignlab/ws/handler/ReactiveConcurrentUserWebSocketHandler.kt @@ -15,10 +15,7 @@ import org.springframework.stereotype.Component import org.springframework.web.reactive.socket.WebSocketHandler import org.springframework.web.reactive.socket.WebSocketMessage import org.springframework.web.reactive.socket.WebSocketSession -import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.core.publisher.SynchronousSink -import java.time.Duration import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap @@ -39,17 +36,6 @@ class ReactiveConcurrentUserWebSocketHandler( */ private val concurrentUserByMap: ConcurrentMap> = ConcurrentHashMap() - /** ping-pong check */ - private val eventFlux: Flux = Flux.generate { sink: SynchronousSink -> - runCatching { - mapper.writeValueAsString("ping pong") - }.onSuccess { json -> - sink.next(json) - }.onFailure { e -> - sink.error(e) - } - } - override fun handle(session: WebSocketSession): Mono { val authUserToken = session.handshakeInfo.headers.resolve() @@ -86,11 +72,9 @@ class ReactiveConcurrentUserWebSocketHandler( } } - return session.send( - Flux.interval(Duration.ofMillis(1000L)) - .zipWith(eventFlux) { _, event -> event } - .map(session::textMessage) - ).and(session.receive().map(WebSocketMessage::getPayloadAsText).log()) + return session.receive() + .map(WebSocketMessage::getPayloadAsText) + .log() .doFinally { handleSessionTermination(session, user.uid) } .then() }