Skip to content

Commit

Permalink
Route ice event messages over RabbitMQ to allow running as multi-inst…
Browse files Browse the repository at this point in the history
…ance
  • Loading branch information
Brutus5000 committed Nov 25, 2024
1 parent dc09ab1 commit 617feb8
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
implementation("io.quarkus:quarkus-config-yaml")
implementation("io.quarkus:quarkus-scheduler")
implementation("io.quarkus:quarkus-resteasy-reactive-jackson")
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-rabbitmq")
implementation("io.quarkus:quarkus-container-image-docker")
implementation("io.quarkus:quarkus-hibernate-orm-panache-kotlin")
implementation("io.quarkus:quarkus-jdbc-mariadb")
Expand Down
25 changes: 20 additions & 5 deletions src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@ import com.faforever.icebreaker.persistence.IceSessionEntity
import com.faforever.icebreaker.persistence.IceSessionRepository
import com.faforever.icebreaker.security.CurrentUserService
import com.faforever.icebreaker.util.AsyncRunner
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.convertValue
import io.quarkus.scheduler.Scheduled
import io.quarkus.security.ForbiddenException
import io.quarkus.security.UnauthorizedException
import io.quarkus.security.identity.SecurityIdentity
import io.smallrye.jwt.build.Jwt
import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.helpers.MultiEmitterProcessor
import io.vertx.core.json.JsonObject
import jakarta.enterprise.inject.Instance
import jakarta.inject.Singleton
import jakarta.transaction.Transactional
import org.eclipse.microprofile.jwt.JsonWebToken
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Instant
Expand All @@ -30,10 +36,13 @@ class SessionService(
private val iceSessionRepository: IceSessionRepository,
private val securityIdentity: SecurityIdentity,
private val currentUserService: CurrentUserService,
private val objectMapper: ObjectMapper,
@Channel("events-out")
private val rabbitmqEventEmitter: Emitter<EventMessage>,
) {
private val activeSessionHandlers = sessionHandlers.filter { it.active }
private val eventEmitter = MultiEmitterProcessor.create<EventMessage>()
private val eventBroadcast: Multi<EventMessage> = eventEmitter.toMulti().broadcast().toAllSubscribers()
private val localEventEmitter = MultiEmitterProcessor.create<EventMessage>()
private val localEventBroadcast: Multi<EventMessage> = localEventEmitter.toMulti().broadcast().toAllSubscribers()

fun buildToken(gameId: Long): String {
val userId =
Expand Down Expand Up @@ -125,9 +134,9 @@ class SessionService(

fun listenForEventMessages(gameId: Long): Multi<EventMessage> {
val userId = currentUserService.getCurrentUserId()
eventEmitter.emit(ConnectedMessage(gameId = gameId, senderId = currentUserService.getCurrentUserId()!!))
rabbitmqEventEmitter.send(ConnectedMessage(gameId = gameId, senderId = currentUserService.getCurrentUserId()!!))

return eventBroadcast.filter {
return localEventBroadcast.filter {
it.gameId == gameId && (it.recipientId == userId || (it.recipientId == null && it.senderId != userId))
}
}
Expand All @@ -143,6 +152,12 @@ class SessionService(
"current user id $currentUserId from endpoint does not match sourceId ${candidatesMessage.senderId} in candidateMessage"
}

eventEmitter.emit(candidatesMessage)
rabbitmqEventEmitter.send(candidatesMessage)
}

@Incoming("events-in")
fun onEventMessage(eventMessage: JsonObject) {
val parsedMessage = objectMapper.convertValue<EventMessage>(eventMessage.map)
localEventEmitter.emit(parsedMessage)
}
}
24 changes: 24 additions & 0 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,30 @@ smallrye:
jwt:
sign:
key: ${JWT_PRIVATE_KEY_PATH}
mp:
messaging:
incoming:
events-in:
connector: smallrye-rabbitmq
virtual-host: ${RABBITMQ_VHOST:/faf-core}
queue:
name: events.${HOSTNAME:local}
auto-delete: true
exclusive: true
exchange:
name: ice
outgoing:
events-out:
connector: smallrye-rabbitmq
virtual-host: ${RABBITMQ_VHOST:/faf-core}
exchange:
name: ice

rabbitmq-host: ${RABBITMQ_HOST:localhost}
rabbitmq-port: ${RABBITMQ_PORT:5672}
rabbitmq-username: ${RABBITMQ_USER:admin}
rabbitmq-password: ${RABBITMQ_PASSWORD:banana}

"%dev":
smallrye:
jwt:
Expand Down

0 comments on commit 617feb8

Please sign in to comment.