diff --git a/build.gradle.kts b/build.gradle.kts index ace4482..1aca7fa 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -26,6 +26,7 @@ dependencies { implementation("io.quarkus:quarkus-config-yaml") implementation("io.quarkus:quarkus-scheduler") implementation("io.quarkus:quarkus-resteasy-reactive-jackson") + implementation("io.quarkus:quarkus-smallrye-reactive-messaging-rabbitmq") implementation("io.quarkus:quarkus-container-image-docker") implementation("io.quarkus:quarkus-hibernate-orm-panache-kotlin") implementation("io.quarkus:quarkus-jdbc-mariadb") diff --git a/docker-compose.yaml b/docker-compose.yaml index 12b565a..2ccc9fd 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -8,3 +8,12 @@ services: MARIADB_PASSWORD: banana ports: - "3306:3306" + + rabbitmq: + image: rabbitmq:3.12-management + environment: + RABBITMQ_DEFAULT_VHOST: /faf-core + RABBITMQ_DEFAULT_USER: faf-icebreaker + RABBITMQ_DEFAULT_PASS: banana + ports: + - "5672:5672" diff --git a/src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt b/src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt index f2bbd08..db9bc01 100644 --- a/src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt +++ b/src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt @@ -5,6 +5,8 @@ import com.faforever.icebreaker.persistence.IceSessionEntity import com.faforever.icebreaker.persistence.IceSessionRepository import com.faforever.icebreaker.security.CurrentUserService import com.faforever.icebreaker.util.AsyncRunner +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.convertValue import io.quarkus.scheduler.Scheduled import io.quarkus.security.ForbiddenException import io.quarkus.security.UnauthorizedException @@ -12,10 +14,14 @@ import io.quarkus.security.identity.SecurityIdentity import io.smallrye.jwt.build.Jwt import io.smallrye.mutiny.Multi import io.smallrye.mutiny.helpers.MultiEmitterProcessor +import io.vertx.core.json.JsonObject import jakarta.enterprise.inject.Instance import jakarta.inject.Singleton import jakarta.transaction.Transactional import org.eclipse.microprofile.jwt.JsonWebToken +import org.eclipse.microprofile.reactive.messaging.Channel +import org.eclipse.microprofile.reactive.messaging.Emitter +import org.eclipse.microprofile.reactive.messaging.Incoming import org.slf4j.Logger import org.slf4j.LoggerFactory import java.time.Instant @@ -30,10 +36,13 @@ class SessionService( private val iceSessionRepository: IceSessionRepository, private val securityIdentity: SecurityIdentity, private val currentUserService: CurrentUserService, + private val objectMapper: ObjectMapper, + @Channel("events-out") + private val rabbitmqEventEmitter: Emitter, ) { private val activeSessionHandlers = sessionHandlers.filter { it.active } - private val eventEmitter = MultiEmitterProcessor.create() - private val eventBroadcast: Multi = eventEmitter.toMulti().broadcast().toAllSubscribers() + private val localEventEmitter = MultiEmitterProcessor.create() + private val localEventBroadcast: Multi = localEventEmitter.toMulti().broadcast().toAllSubscribers() fun buildToken(gameId: Long): String { val userId = @@ -125,9 +134,9 @@ class SessionService( fun listenForEventMessages(gameId: Long): Multi { val userId = currentUserService.getCurrentUserId() - eventEmitter.emit(ConnectedMessage(gameId = gameId, senderId = currentUserService.getCurrentUserId()!!)) + rabbitmqEventEmitter.send(ConnectedMessage(gameId = gameId, senderId = currentUserService.getCurrentUserId()!!)) - return eventBroadcast.filter { + return localEventBroadcast.filter { it.gameId == gameId && (it.recipientId == userId || (it.recipientId == null && it.senderId != userId)) } } @@ -143,6 +152,12 @@ class SessionService( "current user id $currentUserId from endpoint does not match sourceId ${candidatesMessage.senderId} in candidateMessage" } - eventEmitter.emit(candidatesMessage) + rabbitmqEventEmitter.send(candidatesMessage) + } + + @Incoming("events-in") + fun onEventMessage(eventMessage: JsonObject) { + val parsedMessage = objectMapper.convertValue(eventMessage.map) + localEventEmitter.emit(parsedMessage) } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 37d2f38..06efa12 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -19,7 +19,6 @@ quarkus: # (also requires the CustomTenantResolver) self-tenant: # There is no .well-known/openid-configuration - auth-server-url: ${SELF_URL:https://ice.faforever.com} discovery-enabled: false token: # Hard coded JWT settings, as there is no JWKS @@ -61,6 +60,30 @@ smallrye: jwt: sign: key: ${JWT_PRIVATE_KEY_PATH} +mp: + messaging: + incoming: + events-in: + connector: smallrye-rabbitmq + virtual-host: ${RABBITMQ_VHOST:/faf-core} + queue: + name: events.${HOSTNAME:local} + auto-delete: true + exclusive: true + exchange: + name: ice + outgoing: + events-out: + connector: smallrye-rabbitmq + virtual-host: ${RABBITMQ_VHOST:/faf-core} + exchange: + name: ice + +rabbitmq-host: ${RABBITMQ_HOST:localhost} +rabbitmq-port: ${RABBITMQ_PORT:5672} +rabbitmq-username: ${RABBITMQ_USER:faf-icebreaker} +rabbitmq-password: ${RABBITMQ_PASSWORD:banana} + "%dev": xirsys: enabled: ${XIRSYS_ENABLED:false}