Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Mission] Ne pas empêcher la création/modification d'une mission s'il y a un souci pour publier l'événement de mise à jour #2039

Merged
merged 5 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ class CreateOrUpdateMission(

val normalizedMission =
mission.geom?.let { nonNullGeom ->
mission.copy(geom = postgisFunctionRepository.normalizeMultipolygon(nonNullGeom))
} ?: mission
mission.copy(
geom = postgisFunctionRepository.normalizeMultipolygon(nonNullGeom),
)
}
?: mission

val facade =
normalizedMission.geom?.let { nonNullGeom ->
Expand All @@ -41,15 +44,17 @@ class CreateOrUpdateMission(
)
val savedMission = missionRepository.save(missionToSave)

logger.info("Mission ${savedMission.mission.id} created or updated")
if (savedMission.mission.id == null) {
throw IllegalArgumentException("Mission id is null")
}

logger.info("Sending CREATE/UPDATE event for mission id ${savedMission.mission.id}.")
eventPublisher.publishEvent(
UpdateMissionEvent(savedMission.mission),
)
if (mission.id != null) {
logger.info("Sending CREATE/UPDATE event for mission id ${savedMission.mission.id}.")
eventPublisher.publishEvent(
UpdateMissionEvent(savedMission.mission),
)
}
logger.info("Mission ${savedMission.mission.id} created or updated")

return savedMission.mission
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ class CreateOrUpdateReporting(
)
logger.info("Reporting ${savedReporting.reporting.id} created or updated")

logger.info("Sending CREATE/UPDATE event for reporting id ${savedReporting.reporting.id}.")
eventPublisher.publishEvent(
UpdateReportingEvent(savedReporting),
)
if (reporting.id != null) {
logger.info("Sending CREATE/UPDATE event for reporting id ${savedReporting.reporting.id}.")
eventPublisher.publishEvent(
UpdateReportingEvent(savedReporting),
)
}

return savedReporting
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package fr.gouv.cacem.monitorenv.infrastructure.api.endpoints

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import java.util.concurrent.Executor

@Configuration
@EnableAsync
class AsyncConfig {
@Bean
fun taskExecutor(): Executor {
val executor = ThreadPoolTaskExecutor()
executor.setCorePoolSize(5)
executor.setMaxPoolSize(10)
executor.setQueueCapacity(25)
executor.setThreadNamePrefix("AsyncExecutor-")
executor.initialize()
return executor
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import fr.gouv.cacem.monitorenv.domain.use_cases.reportings.events.UpdateReporti
import fr.gouv.cacem.monitorenv.infrastructure.api.adapters.bff.outputs.reportings.ReportingDataOutput
import org.slf4j.LoggerFactory
import org.springframework.context.event.EventListener
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.event
Expand Down Expand Up @@ -42,6 +43,7 @@ class SSEReporting {
}

/** This method listen for `ReportingEvent` to send to the frontend listeners */
@Async
@EventListener(UpdateReportingEvent::class)
fun handleUpdateReportingEvent(event: UpdateReportingEvent) {
logger.info("SSE: Received reporting event for reporting ${event.reporting.reporting.id}.")
Expand All @@ -65,6 +67,7 @@ class SSEReporting {

return@map sseEmitter
} catch (e: Exception) {
logger.info("Error when sending reporting event with id ${event.reporting.reporting.id} : $e")
sseEmitter.completeWithError(e)

return@map sseEmitter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ import fr.gouv.cacem.monitorenv.domain.use_cases.missions.events.UpdateMissionEv
import fr.gouv.cacem.monitorenv.infrastructure.api.adapters.publicapi.outputs.MissionWithRapportNavActionsDataOutput
import org.slf4j.LoggerFactory
import org.springframework.context.event.EventListener
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.event
import java.time.ZonedDateTime

@Component
class SSEMission {
class SSEMission(private val sseEmitterFactory: () -> SseEmitter = { SseEmitter(TWENTY_FOUR_HOURS) }) {
private val logger = LoggerFactory.getLogger(SSEMission::class.java)
val mutexLock = Any()

private val MISSION_UPDATE_EVENT_NAME = "MISSION_UPDATE"
private val TWENTY_FOUR_HOURS = (24 * 60 * 60 * 1000).toLong()

companion object {
private const val TWENTY_FOUR_HOURS = (24 * 60 * 60 * 1000).toLong()

/**
* This is used to store the SSE listeners
*/
Expand All @@ -29,7 +31,7 @@ class SSEMission {
*/
fun registerListener(): SseEmitter {
logger.info("Adding new SSE listener of mission updates at ${ZonedDateTime.now()}.")
val sseEmitter = SseEmitter(TWENTY_FOUR_HOURS)
val sseEmitter = sseEmitterFactory()

synchronized(mutexLock) {
sseStore.add(sseEmitter)
Expand All @@ -52,6 +54,7 @@ class SSEMission {
/**
* This method listen for `MissionEvent` to send to the frontend listeners
*/
@Async
@EventListener(UpdateMissionEvent::class)
fun handleUpdateMissionEvent(event: UpdateMissionEvent) {
logger.info("SSE: Received mission event for mission ${event.mission.id}.")
Expand All @@ -74,6 +77,7 @@ class SSEMission {

return@map sseEmitter
} catch (e: Exception) {
logger.info("Error when sending mission event with id ${event.mission.id} : $e")
sseEmitter.completeWithError(e)

return@map sseEmitter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,7 @@ import fr.gouv.cacem.monitorenv.domain.entities.reporting.ReportingSourceEntity
import fr.gouv.cacem.monitorenv.domain.entities.reporting.SourceTypeEnum
import fr.gouv.cacem.monitorenv.domain.entities.semaphore.SemaphoreEntity
import fr.gouv.cacem.monitorenv.domain.exceptions.ReportingAlreadyAttachedException
import fr.gouv.cacem.monitorenv.domain.repositories.IControlUnitRepository
import fr.gouv.cacem.monitorenv.domain.repositories.IFacadeAreasRepository
import fr.gouv.cacem.monitorenv.domain.repositories.IMissionRepository
import fr.gouv.cacem.monitorenv.domain.repositories.IPostgisFunctionRepository
import fr.gouv.cacem.monitorenv.domain.repositories.IReportingRepository
import fr.gouv.cacem.monitorenv.domain.repositories.ISemaphoreRepository
import fr.gouv.cacem.monitorenv.domain.repositories.*
import fr.gouv.cacem.monitorenv.domain.use_cases.controlUnit.dtos.FullControlUnitDTO
import fr.gouv.cacem.monitorenv.domain.use_cases.reportings.dtos.ReportingDetailsDTO
import fr.gouv.cacem.monitorenv.domain.use_cases.reportings.fixtures.ReportingFixture.Companion.aReporting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package fr.gouv.cacem.monitorenv.infrastructure.api.endpoints.publicapi.v1.missions

import com.nhaarman.mockitokotlin2.*
import fr.gouv.cacem.monitorenv.domain.use_cases.missions.events.UpdateMissionEvent
import fr.gouv.cacem.monitorenv.domain.use_cases.missions.fixtures.MissionFixture
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mockito.mock
import org.springframework.boot.test.system.CapturedOutput
import org.springframework.boot.test.system.OutputCaptureExtension
import org.springframework.test.context.junit.jupiter.SpringExtension
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter

@ExtendWith(SpringExtension::class)
@ExtendWith(OutputCaptureExtension::class)
class SSEMissionUTests {
private lateinit var sseMission: SSEMission
private lateinit var sseEmitterMock: SseEmitter

@BeforeEach
fun setUp() {
sseEmitterMock = mock(SseEmitter::class.java)
sseMission =
SSEMission {
sseEmitterMock
}
}

@Test
fun `handleUpdateMissionEvent should handle exception when send() throws`(log: CapturedOutput) {
// Given
val updateMissionEvent =
UpdateMissionEvent(
mission = MissionFixture.aMissionEntity(),
)
whenever(
sseEmitterMock.send(any<Set<ResponseBodyEmitter.DataWithMediaType>>()),
).thenThrow(RuntimeException("Simulated exception"))
sseMission.registerListener()

// When
sseMission.handleUpdateMissionEvent(updateMissionEvent)

// Then
assertThat(log.out).contains("Error when sending mission event with id")
verify(sseEmitterMock).completeWithError(any())
verify(sseEmitterMock, never()).complete()
}
}
Loading