diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/db/TranslationDbo.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/db/TranslationDbo.kt deleted file mode 100644 index d35ba5d9..00000000 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/db/TranslationDbo.kt +++ /dev/null @@ -1,11 +0,0 @@ -package no.nav.arena_tiltak_aktivitet_acl.domain.db - -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId -import java.util.* - -data class TranslationDbo( - val aktivitetId: UUID, - val arenaId: DeltakelseId, - val aktivitetKategori: AktivitetKategori -) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt index 2b316ea4..a832d78a 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt @@ -39,6 +39,7 @@ open class DeltakerProcessor( private val tiltakService: TiltakService, private val personsporingService: PersonsporingService, private val oppfolgingsperiodeService: OppfolgingsperiodeService, + private val aktivitetskortIdService: AktivitetskortIdService ) : ArenaMessageProcessor { companion object { @@ -114,7 +115,7 @@ open class DeltakerProcessor( oppfolgingsperiode = periodeMatch.oppfolgingsperiode.uuid, oppfolgingsSluttDato = periodeMatch.oppfolgingsperiode.sluttDato ) - aktivitetService.upsert(aktivitet, aktivitetskortHeaders) + aktivitetService.upsert(aktivitet, aktivitetskortHeaders, deltakelse.tiltakdeltakelseId) if (endring.skalIgnoreres) { log.info("Deltakeren har status=${arenaDeltaker.DELTAKERSTATUSKODE} og administrasjonskode=${tiltak.administrasjonskode} som ikke skal håndteres") @@ -178,7 +179,7 @@ open class DeltakerProcessor( // Har tidligere deltakelse på samme oppfolgingsperiode eksisterendeAktivitetsId != null -> EndringsType.OppdaterAktivitet(eksisterendeAktivitetsId, skalIgnoreres) // Har ingen tidligere aktivitetskort - oppfolgingsperiodeTilAktivitetskortId.isEmpty() -> EndringsType.NyttAktivitetskort(periodeMatch.oppfolgingsperiode, skalIgnoreres) + oppfolgingsperiodeTilAktivitetskortId.isEmpty() -> EndringsType.NyttAktivitetskort(getAkivitetskortId(deltakelseId), periodeMatch.oppfolgingsperiode, skalIgnoreres) // Har tidligere deltakelse men ikke på samme oppfølgingsperiode else -> { EndringsType.NyttAktivitetskortByttPeriode(periodeMatch.oppfolgingsperiode, skalIgnoreres) @@ -186,6 +187,10 @@ open class DeltakerProcessor( } } + fun getAkivitetskortId(deltakelseId: DeltakelseId): UUID { + return aktivitetskortIdService.getOrCreate(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET) + } + fun syncOppfolgingsperioder(deltakelseId: DeltakelseId, oppfolginsperioder: List) { aktivitetService.closeClosedPerioder(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET, oppfolginsperioder) } @@ -193,7 +198,7 @@ open class DeltakerProcessor( sealed class EndringsType(val aktivitetskortId: UUID, val skalIgnoreres: Boolean) { class OppdaterAktivitet(aktivitetskortId: UUID, skalIgnoreres: Boolean): EndringsType(aktivitetskortId, skalIgnoreres) - class NyttAktivitetskort(val oppfolgingsperiode: Oppfolgingsperiode, skalIgnoreres: Boolean): EndringsType(UUID.randomUUID(), skalIgnoreres) + class NyttAktivitetskort(aktivitetskortId:UUID, val oppfolgingsperiode: Oppfolgingsperiode, skalIgnoreres: Boolean): EndringsType(aktivitetskortId, skalIgnoreres) class NyttAktivitetskortByttPeriode(val oppfolgingsperiode: Oppfolgingsperiode, skalIgnoreres: Boolean): EndringsType(UUID.randomUUID(), skalIgnoreres) } diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AdvisoryLockRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AdvisoryLockRepository.kt new file mode 100644 index 00000000..62379656 --- /dev/null +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AdvisoryLockRepository.kt @@ -0,0 +1,51 @@ +package no.nav.arena_tiltak_aktivitet_acl.repositories + +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import org.intellij.lang.annotations.Language +import org.slf4j.LoggerFactory +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Propagation +import org.springframework.transaction.annotation.Transactional +import java.io.Closeable + +@Component +open class AdvisoryLockRepository( + private val template: NamedParameterJdbcTemplate +) { + private val log = LoggerFactory.getLogger(javaClass) + + /** + * Acquire a transactional advisory lock on a lockId. The duration of the lock is that of the containing transaction, + * whence the methode requires a transaction already to be present before the call. + */ + @Transactional(propagation = Propagation.MANDATORY) + open fun aquireTransactionalAdvisoryLock(lockId: Long) { + @Language("postgresql") + val sql = """ + SELECT pg_advisory_xact_lock(:lockId); + """.trimIndent() + template.query(sql, mapOf("lockId" to lockId)) {} + } + + @Transactional(propagation = Propagation.MANDATORY) + open fun lockDeltakelse(deltakelseId: DeltakelseId) { + aquireTransactionalAdvisoryLock(deltakelseId.value) + } + + @Transactional(propagation = Propagation.MANDATORY) + open fun safeDeltakelse(deltakelseId: DeltakelseId): SafeDeltakelse { + lockDeltakelse(deltakelseId) + log.info("Acquired lock on deltakelseId: $deltakelseId") + return SafeDeltakelse(deltakelseId) + } + + class SafeDeltakelse(val deltakelseId: DeltakelseId): Closeable { + private val log = LoggerFactory.getLogger(javaClass) + override fun close() { + log.info("Safe to release lock on deltakelseId: $deltakelseId") + } + } +} + + diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt index 9179dc4d..2d2cb17d 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt @@ -3,7 +3,8 @@ package no.nav.arena_tiltak_aktivitet_acl.repositories import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.AvsluttetOppfolgingsperiode import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId -import no.nav.arena_tiltak_aktivitet_acl.utils.* +import no.nav.arena_tiltak_aktivitet_acl.utils.getNullableZonedDateTime +import no.nav.arena_tiltak_aktivitet_acl.utils.getUUID import org.intellij.lang.annotations.Language import org.slf4j.LoggerFactory import org.springframework.dao.IncorrectResultSizeDataAccessException @@ -15,11 +16,12 @@ import java.sql.ResultSet import java.util.* @Component -open class AktivitetRepository( +class AktivitetRepository( private val template: NamedParameterJdbcTemplate ) { private val log = LoggerFactory.getLogger(javaClass) fun upsert(aktivitet: AktivitetDbo) { + log.info("In repo ${aktivitet.id} ${aktivitet.oppfolgingsperiodeUUID} ${aktivitet.arenaId} ${aktivitet.oppfolgingsSluttTidspunkt}") @Language("PostgreSQL") val sql = """ INSERT INTO aktivitet(id, person_ident, kategori_type, data, arena_id, tiltak_kode, oppfolgingsperiode_uuid, oppfolgingsperiode_slutt_tidspunkt) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt new file mode 100644 index 00000000..680733a3 --- /dev/null +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt @@ -0,0 +1,57 @@ +package no.nav.arena_tiltak_aktivitet_acl.repositories + +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.utils.getUUID +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Component +import java.util.UUID + +@Component +class AktivitetskortIdRepository( + private val template: NamedParameterJdbcTemplate +) { + + fun deleteDeltakelseId(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): Int { + val sql = """ + DELETE FROM forelopig_aktivitet_id WHERE deltakelse_id = :deltakelseId and kategori = :kategori + """.trimIndent() + return template.update(sql, + mapOf( + "kategori" to aktivitetKategori.name, + "deltakelseId" to deltakelseId.value, + )) + } + + fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID { + val currentId = getCurrentId(deltakelseId, aktivitetKategori) + if (currentId != null) return currentId + + val generatedId = UUID.randomUUID() + val insertNewId = """ + INSERT INTO forelopig_aktivitet_id(id, kategori, deltakelse_id) VALUES (:id, :kategori, :deltakelseId) + """.trimIndent() + template.update(insertNewId, + mapOf( + "id" to generatedId, + "kategori" to aktivitetKategori.name, + "deltakelseId" to deltakelseId.value, + )) + return generatedId + } + + private fun getCurrentId(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID? { + val getCurrentId = """ + SELECT id FROM forelopig_aktivitet_id WHERE deltakelse_id = :deltakelseId and kategori = :aktivitetKategori + """.trimIndent() + return template.query( + getCurrentId, + mapOf( + "deltakelseId" to deltakelseId.value, + "aktivitetKategori" to aktivitetKategori.name + ) + ) { row, _ -> row.getUUID("id") } + .firstOrNull() + } + +} diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt index f387ac68..10ac2860 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt @@ -10,15 +10,13 @@ import no.nav.arena_tiltak_aktivitet_acl.auth.AuthService import no.nav.arena_tiltak_aktivitet_acl.auth.Issuer import no.nav.arena_tiltak_aktivitet_acl.domain.dto.TranslationQuery import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId -import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository +import no.nav.arena_tiltak_aktivitet_acl.services.AktivitetskortIdService import no.nav.security.token.support.core.api.Protected import no.nav.security.token.support.core.api.ProtectedWithClaims -import org.springframework.http.HttpStatus import org.springframework.web.bind.annotation.PostMapping import org.springframework.web.bind.annotation.RequestBody import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.RestController -import org.springframework.web.server.ResponseStatusException import java.util.* @@ -28,7 +26,7 @@ import java.util.* @RequestMapping("/api/translation") class TranslationController( private val authService: AuthService, - private val aktivitetRepository: AktivitetRepository, + private val aktivitetskortIdService: AktivitetskortIdService ) { @ProtectedWithClaims(issuer = Issuer.AZURE_AD) @@ -43,8 +41,7 @@ class TranslationController( @RequestBody query: TranslationQuery ): UUID { authService.validerErM2MToken() - return aktivitetRepository.getCurrentAktivitetsId(DeltakelseId(query.arenaId), query.aktivitetKategori) - ?: throw ResponseStatusException(HttpStatus.NOT_FOUND, "No mapping found") + return aktivitetskortIdService.getOrCreate(DeltakelseId(query.arenaId), query.aktivitetKategori) } } diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt index 5875b70d..2a970791 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt @@ -6,20 +6,37 @@ import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategor import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Aktivitetskort import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetskortHeaders import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.repositories.AdvisoryLockRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository import org.springframework.stereotype.Service -import java.util.UUID +import org.springframework.transaction.annotation.Propagation +import org.springframework.transaction.annotation.Transactional +import java.util.* @Service -class AktivitetService( - val aktivitetRepository: AktivitetRepository +open class AktivitetService( + val aktivitetRepository: AktivitetRepository, + val aktivitetskortIdRepository: AktivitetskortIdRepository, + val deltakerLockRepository: AdvisoryLockRepository ) { - fun upsert(aktivitet: Aktivitetskort, headers: AktivitetskortHeaders) = aktivitetRepository.upsert(aktivitet.toDbo(headers)) - fun get(aktivitetId: UUID) = aktivitetRepository.getAktivitet(aktivitetId) - fun getAllBy(aktivitetId: DeltakelseId, aktivitetsKategori: AktivitetKategori) = + /** + * SafeDeltakelse will make sure no other transaction is processing the same deltakelse for the duration of the ongoing transaction. + * If another transaction is processing the same deltakelse (i.e. TranslationController, AktivitetskortIdService) this transaction will wait its turn until the other transaction is complete. + * @see no.nav.arena_tiltak_aktivitet_acl.services.AktivitetskortIdService.getOrCreate + */ + @Transactional(propagation = Propagation.MANDATORY) + open fun upsert(aktivitet: Aktivitetskort, headers: AktivitetskortHeaders, deltakelseId: DeltakelseId) { + deltakerLockRepository.safeDeltakelse(deltakelseId).use { + aktivitetRepository.upsert(aktivitet.toDbo(headers)) + aktivitetskortIdRepository.deleteDeltakelseId(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET) + } + } + open fun get(aktivitetId: UUID) = aktivitetRepository.getAktivitet(aktivitetId) + open fun getAllBy(aktivitetId: DeltakelseId, aktivitetsKategori: AktivitetKategori) = aktivitetRepository.getAllBy(aktivitetId, aktivitetsKategori) - fun closeClosedPerioder(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori, oppfolgingsperioder: List) { + open fun closeClosedPerioder(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori, oppfolgingsperioder: List) { val avsluttedePerioder = oppfolgingsperioder .mapNotNull { it.sluttDato diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt new file mode 100644 index 00000000..bfad6ad7 --- /dev/null +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt @@ -0,0 +1,33 @@ +package no.nav.arena_tiltak_aktivitet_acl.services + +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.repositories.AdvisoryLockRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional +import java.util.* + +@Service +open class AktivitetskortIdService( + val aktivitetRepository: AktivitetRepository, + val aktivitetskortIdRepository: AktivitetskortIdRepository, + val advisoryLockRepository: AdvisoryLockRepository +) { + /** + * SafeDeltakelse will make sure no other transaction is processing the same deltakelse for the duration of the ongoing transaction. + * If another transaction is processing the same deltakelse (i.e. AktivitetService) this transaction will wait its turn until the other transaction is complete. + * @see no.nav.arena_tiltak_aktivitet_acl.services.AktivitetService.upsert + */ + @Transactional + open fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID { + // Lock on deltakelseId. Gjelder så lenge den pågående transaksjonen er aktiv. + advisoryLockRepository.safeDeltakelse(deltakelseId).use { + val currentId = aktivitetRepository.getCurrentAktivitetsId(deltakelseId, aktivitetKategori) + if (currentId != null) return currentId + // Opprett i ny tabell + return aktivitetskortIdRepository.getOrCreate(deltakelseId, aktivitetKategori) + } + } +} diff --git a/src/main/resources/db/migration/V15__forelopig_aktivitet_id.sql b/src/main/resources/db/migration/V15__forelopig_aktivitet_id.sql new file mode 100644 index 00000000..5f60d03c --- /dev/null +++ b/src/main/resources/db/migration/V15__forelopig_aktivitet_id.sql @@ -0,0 +1,7 @@ + +CREATE TABLE forelopig_aktivitet_id +( + id UUID PRIMARY KEY NOT NULL, + deltakelse_id BIGINT UNIQUE NOT NULL, + kategori VARCHAR NOT NULL +); \ No newline at end of file diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/database/SingletonPostgresContainer.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/database/SingletonPostgresContainer.kt index 637b7a66..904c9842 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/database/SingletonPostgresContainer.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/database/SingletonPostgresContainer.kt @@ -66,7 +66,7 @@ object SingletonPostgresContainer { config.jdbcUrl = container.jdbcUrl config.username = container.username config.password = container.password - config.maximumPoolSize = 3 + config.maximumPoolSize = 10 config.minimumIdle = 1 return HikariDataSource(config) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt index 26876089..98a1959d 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt @@ -6,10 +6,13 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe import io.kotest.matchers.string.shouldContain import io.kotest.matchers.string.shouldMatch +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import no.nav.arena_tiltak_aktivitet_acl.clients.IdMappingClient import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.Oppfolgingsperiode import no.nav.arena_tiltak_aktivitet_acl.domain.db.IngestStatus -import no.nav.arena_tiltak_aktivitet_acl.domain.db.TranslationDbo import no.nav.arena_tiltak_aktivitet_acl.domain.dto.TranslationQuery import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.* import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId @@ -109,11 +112,11 @@ class DeltakerIntegrationTests : IntegrationTestBase() { } @Test - fun `skal gi 404 når id-mapping ikke finnes`() { + fun `skal gi 200 når id-mapping ikke finnes (og lage mapping)`() { val token = issueAzureAdM2MToken() val client = IdMappingClient(port!!) { token } val (response, _) = client.hentMapping(TranslationQuery(123123, AktivitetKategori.TILTAKSAKTIVITET)) - response.code shouldBe HttpStatus.NOT_FOUND.value() + response.code shouldBe HttpStatus.OK.value() } @Test @@ -763,10 +766,11 @@ class DeltakerIntegrationTests : IntegrationTestBase() { @Test fun `skal ikke opprettet aktivitetId (i mappingtabell) men ingeststatus oppdatereshvis sending av kafkamelding feiler`() { - doThrow(IllegalStateException("LOL")).`when`(kafkaProducerService).sendTilAktivitetskortTopic(this.any(UUID::class.java), any(KafkaMessageDto::class.java), any(AktivitetskortHeaders::class.java)) - val (gjennomforingId, deltakerId) = setup() + doThrow(IllegalStateException("LOL")).`when`(kafkaProducerService) + .sendTilAktivitetskortTopic(this.any(UUID::class.java), any(KafkaMessageDto::class.java), any(AktivitetskortHeaders::class.java)) + val (gjennomforingId, deltakelseId) = setup() val deltakerInput = DeltakerInput( - tiltakDeltakelseId = deltakerId, + tiltakDeltakelseId = deltakelseId, tiltakgjennomforingId = gjennomforingId, innsokBegrunnelse = "innsøkbegrunnelse", datoFra = LocalDate.now().minusDays(1), @@ -777,7 +781,61 @@ class DeltakerIntegrationTests : IntegrationTestBase() { arenaData.ingestStatus shouldBe IngestStatus.RETRY arenaData.note shouldBe "LOL" } - idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)).second shouldBe null + aktivitetRepository.getCurrentAktivitetsId(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET) shouldBe null + } + + @Test + fun `skal opprette mapping selvom aktivitet ikke finnes enda`() { + val (gjennomforingId, deltakerId) = setup() + val generertId = idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)) + .second + generertId shouldNotBe null + val deltakerInput = DeltakerInput( + tiltakDeltakelseId = deltakerId, + tiltakgjennomforingId = gjennomforingId, + innsokBegrunnelse = "innsøkbegrunnelse", + datoFra = LocalDate.now().minusDays(1), + endretAv = Ident(ident = "SIG123"), + ) + val deltakerCommand = NyDeltakerCommand(deltakerInput) + deltakerExecutor.execute(deltakerCommand).expectHandled { arenaData -> + arenaData.output.aktivitetskort.id shouldBe generertId + } + idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)) + .second shouldBe generertId + } + + @Test + fun `skal ha riktig mapping selv om translationcontroller og deltakerprocessor kjoerer samtidig`() { + val (gjennomforingId, deltakerId) = setup() + val generertId = idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)) + .second + generertId shouldNotBe null + val deltakerInput = DeltakerInput( + tiltakDeltakelseId = deltakerId, + tiltakgjennomforingId = gjennomforingId, + innsokBegrunnelse = "innsøkbegrunnelse", + datoFra = LocalDate.now().minusDays(1), + endretAv = Ident(ident = "SIG123"), + ) + val deltakerCommand = NyDeltakerCommand(deltakerInput) + var generertId1: UUID? = null + var generertId2: UUID? = null + runBlocking { + async(Dispatchers.IO) {// NB Ikke bruk default-dispatcher for async. Den håndterer ikke blokkerende kall + deltakerExecutor.execute(deltakerCommand).expectHandled { arenaData -> + generertId1 = arenaData.output.aktivitetskort.id + } + } + async(Dispatchers.IO) { + var delayTime = Random.nextLong(50, 200) + delay(delayTime) + generertId2 = idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)).second + } + } + generertId1 shouldNotBe null + generertId1 shouldBe generertId2 + } private val idMappingClient: IdMappingClient by lazy { diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt index e5997bb2..0bf6ced1 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt @@ -4,7 +4,6 @@ import ArenaOrdsProxyClient import io.kotest.assertions.throwables.shouldNotThrowAny import io.kotest.assertions.throwables.shouldThrowExactly import io.kotest.core.spec.style.FunSpec -import io.kotest.matchers.reflection.beLateInit import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe import io.mockk.* @@ -30,7 +29,6 @@ import java.time.ZonedDateTime import java.util.* class DeltakerProcessorTest : FunSpec({ - val dataSource = SingletonPostgresContainer.getDataSource() val ordsClient by lazy { @@ -57,6 +55,8 @@ class DeltakerProcessorTest : FunSpec({ lateinit var arenaDataRepository: ArenaDataRepository lateinit var personSporingRepository: PersonSporingRepository lateinit var aktivitetRepository: AktivitetRepository + lateinit var aktivitetskortIdRespository: AktivitetskortIdRepository + lateinit var advisoryLockRepository: AdvisoryLockRepository // Se SQL inserted før hver test val nonIgnoredGjennomforingArenaId = 1L @@ -67,6 +67,8 @@ class DeltakerProcessorTest : FunSpec({ arenaDataRepository = ArenaDataRepository(template) personSporingRepository = PersonSporingRepository(template) aktivitetRepository = AktivitetRepository(template) + aktivitetskortIdRespository = AktivitetskortIdRepository(template) + advisoryLockRepository = AdvisoryLockRepository(template) clearMocks(kafkaProducerService) DatabaseTestUtils.cleanAndInitDatabase(dataSource, "/deltaker-processor_test-data.sql") @@ -81,11 +83,12 @@ class DeltakerProcessorTest : FunSpec({ return DeltakerProcessor( arenaDataRepository = arenaDataRepository, kafkaProducerService = kafkaProducerService, - aktivitetService = AktivitetService(AktivitetRepository(template)), + aktivitetService = AktivitetService(AktivitetRepository(template), AktivitetskortIdRepository(template), AdvisoryLockRepository(template)), gjennomforingRepository = GjennomforingRepository(template), tiltakService = TiltakService(TiltakRepository(template)), oppfolgingsperiodeService = OppfolgingsperiodeService(oppfolgingClient), personsporingService = PersonsporingService(personSporingRepository, ordsClient), + aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository, advisoryLockRepository) ) } @@ -248,5 +251,5 @@ class DeltakerProcessorTest : FunSpec({ createDeltakerProcessor(oppfolgingsperioder).handleArenaMessage(newDeltaker) } } -}) +}) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt new file mode 100644 index 00000000..0641b3bb --- /dev/null +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt @@ -0,0 +1,206 @@ +package no.nav.arena_tiltak_aktivitet_acl.services + +import io.kotest.common.runBlocking +import io.kotest.matchers.collections.shouldContainInOrder +import io.kotest.matchers.shouldBe +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.* +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.integration.IntegrationTestBase +import no.nav.arena_tiltak_aktivitet_acl.repositories.AdvisoryLockRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetDbo +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository +import org.junit.jupiter.api.Test +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.transaction.support.TransactionTemplate +import java.time.LocalDateTime +import java.time.ZonedDateTime +import java.util.* +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.random.Random + +class AktivitetServiceTest : IntegrationTestBase() { + val log = LoggerFactory.getLogger(javaClass) + + @Autowired + lateinit var advisoryLockRepository: AdvisoryLockRepository + + @Autowired + lateinit var aktivitetRepository: AktivitetRepository + + @Autowired + lateinit var aktivitetskortIdRepository: AktivitetskortIdRepository + + @Autowired + lateinit var transactionTemplate: TransactionTemplate + + fun slowAktivitetRepository(): AktivitetRepository { + val mock = mockk() + val isSlow = AtomicBoolean(true) + + every { mock.upsert(any()) } answers { + // Make sure first call is slow + val isFirst = isSlow.getAndSet(false) + val aktivitet = firstArg() + log.info("isFirst $isFirst arenaId: ${aktivitet.arenaId} ${aktivitet}") + if (isFirst) { + runBlocking { delay(100) } + aktivitetRepository.upsert(aktivitet) + } else { + aktivitetRepository.upsert(aktivitet) + } + } + return mock + } + + val aktivitetskort = Aktivitetskort( + id = UUID.randomUUID(), + personIdent = "01234567890", + tittel = "Tittel", + aktivitetStatus = AktivitetStatus.GJENNOMFORES, + etiketter = emptyList(), + startDato = null, + sluttDato = null, + beskrivelse = null, + endretAv = Ident("IdentType", "ident"), + endretTidspunkt = LocalDateTime.now(), + avtaltMedNav = true, + detaljer = emptyList() + ) + + fun headers(deltakelseId: DeltakelseId): AktivitetskortHeaders { + return AktivitetskortHeaders( + "${AktivitetKategori.TILTAKSAKTIVITET.prefix}${deltakelseId.value}", + "ASDAS", + UUID.randomUUID(), + null + ) + } + + fun aktivitetskort(): Aktivitetskort { + return aktivitetskort + .copy(id = UUID.randomUUID()) + } + + fun firstSlowAktivitetsService() = AktivitetService( + slowAktivitetRepository(), + aktivitetskortIdRepository, + advisoryLockRepository + ) + + @Test + fun `skal blokkere prossessring på samme deltakelse-id`() { + val firstSlowAktivitetsService = firstSlowAktivitetsService() + val deltakelseId = DeltakelseId(1123) + val aktivitetskort = aktivitetskort() + val headers = headers(deltakelseId) + + val startOrder = mutableListOf() + val excutionOrder = mutableListOf() + kotlinx.coroutines.runBlocking { + val first = async(Dispatchers.IO) { + transactionTemplate.executeWithoutResult { + startOrder.add(1) + firstSlowAktivitetsService.upsert( + aktivitetskort, + headers, + deltakelseId, + ) + excutionOrder.add(1) + } + } + val second = async(Dispatchers.IO) { + delay(100) + transactionTemplate.executeWithoutResult { + startOrder.add(2) + firstSlowAktivitetsService.upsert( + aktivitetskort, + headers.copy(oppfolgingsSluttDato = ZonedDateTime.now()), + deltakelseId, + ) + excutionOrder.add(2) + } + } + listOf(first, second).awaitAll() + } + startOrder shouldBe listOf(1, 2) + excutionOrder shouldBe listOf(1, 2) + } + + @Test + fun `skal ikke blokke andre deltakelse-id-er`() { + val firstSlowAktivitetsService = firstSlowAktivitetsService() + val deltakelseId = DeltakelseId(2123) + val deltakelseId2 = DeltakelseId(2321) + + val excutionOrder = mutableListOf() + kotlinx.coroutines.runBlocking { + val first = async(Dispatchers.IO) { + transactionTemplate.executeWithoutResult { + firstSlowAktivitetsService.upsert( + aktivitetskort(), + headers(deltakelseId), + deltakelseId, + ) + excutionOrder.add(deltakelseId.value) + } + } + val second = async(Dispatchers.IO) { + delay(10) + transactionTemplate.executeWithoutResult { + firstSlowAktivitetsService.upsert( + aktivitetskort(), + headers(deltakelseId2), + deltakelseId2, + ) + excutionOrder.add(deltakelseId2.value) + } + } + listOf(first, second).awaitAll() + } + excutionOrder shouldBe listOf(deltakelseId2.value, deltakelseId.value) + } + + @Test + fun `test advisory locking`() { + val OFFSET = 99999L + val startOrder = mutableListOf() + val finishOrder = mutableListOf() + + val lockId = Random.nextLong(0, 1000) + OFFSET + kotlinx.coroutines.runBlocking { + val first = async(Dispatchers.IO) { + startOrder.add(1) + transactionTemplate.executeWithoutResult { + advisoryLockRepository.aquireTransactionalAdvisoryLock(lockId) + log.info("Lock acquired first") + log.info("waiting in first") + Thread.sleep(100) + log.info("Releasing first lock") + } + finishOrder.add(1) + } + val second = async(Dispatchers.IO) { + delay(20) + startOrder.add(2) + transactionTemplate.executeWithoutResult { + advisoryLockRepository.aquireTransactionalAdvisoryLock(lockId) + log.info("Lock acquired second") + } + finishOrder.add(2) + } + listOf(first, second).awaitAll() + } + log.info("Lock automatically released on transaction commit or rollback") + startOrder shouldContainInOrder listOf(1, 2) + finishOrder shouldContainInOrder listOf(1, 2) + + } +}