From 390b036ed2397b8df2fb587949e8171446145497 Mon Sep 17 00:00:00 2001 From: sigurdgroneng Date: Mon, 30 Oct 2023 16:14:00 +0100 Subject: [PATCH 01/11] =?UTF-8?q?WIP:=20Alltid=20svar=20med=20ID=20p=C3=A5?= =?UTF-8?q?=20mapping=20endepunkt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/db/TranslationDbo.kt | 11 ------ .../processors/DeltakerProcessor.kt | 10 ++++- .../AktivitetskortIdRepository.kt | 39 +++++++++++++++++++ .../rest/TranslationController.kt | 7 +++- .../services/AktivitetskortIdService.kt | 19 +++++++++ .../integration/DeltakerIntegrationTests.kt | 1 - 6 files changed, 71 insertions(+), 16 deletions(-) delete mode 100644 src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/db/TranslationDbo.kt create mode 100644 src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt create mode 100644 src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/db/TranslationDbo.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/db/TranslationDbo.kt deleted file mode 100644 index d35ba5d9..00000000 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/db/TranslationDbo.kt +++ /dev/null @@ -1,11 +0,0 @@ -package no.nav.arena_tiltak_aktivitet_acl.domain.db - -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId -import java.util.* - -data class TranslationDbo( - val aktivitetId: UUID, - val arenaId: DeltakelseId, - val aktivitetKategori: AktivitetKategori -) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt index 2b316ea4..5dcdee01 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt @@ -39,6 +39,7 @@ open class DeltakerProcessor( private val tiltakService: TiltakService, private val personsporingService: PersonsporingService, private val oppfolgingsperiodeService: OppfolgingsperiodeService, + private val aktivitetskortIdService: AktivitetskortIdService ) : ArenaMessageProcessor { companion object { @@ -178,7 +179,7 @@ open class DeltakerProcessor( // Har tidligere deltakelse på samme oppfolgingsperiode eksisterendeAktivitetsId != null -> EndringsType.OppdaterAktivitet(eksisterendeAktivitetsId, skalIgnoreres) // Har ingen tidligere aktivitetskort - oppfolgingsperiodeTilAktivitetskortId.isEmpty() -> EndringsType.NyttAktivitetskort(periodeMatch.oppfolgingsperiode, skalIgnoreres) + oppfolgingsperiodeTilAktivitetskortId.isEmpty() -> EndringsType.NyttAktivitetskort(getAkivitetskortId(deltakelseId), periodeMatch.oppfolgingsperiode, skalIgnoreres) // Har tidligere deltakelse men ikke på samme oppfølgingsperiode else -> { EndringsType.NyttAktivitetskortByttPeriode(periodeMatch.oppfolgingsperiode, skalIgnoreres) @@ -186,6 +187,11 @@ open class DeltakerProcessor( } } + fun getAkivitetskortId(deltakelseId: DeltakelseId): UUID { + // TODO: NOe service som fikser id-er for ikke opprettet + return aktivitetskortIdService.getOrCreate(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET) + } + fun syncOppfolgingsperioder(deltakelseId: DeltakelseId, oppfolginsperioder: List) { aktivitetService.closeClosedPerioder(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET, oppfolginsperioder) } @@ -193,7 +199,7 @@ open class DeltakerProcessor( sealed class EndringsType(val aktivitetskortId: UUID, val skalIgnoreres: Boolean) { class OppdaterAktivitet(aktivitetskortId: UUID, skalIgnoreres: Boolean): EndringsType(aktivitetskortId, skalIgnoreres) - class NyttAktivitetskort(val oppfolgingsperiode: Oppfolgingsperiode, skalIgnoreres: Boolean): EndringsType(UUID.randomUUID(), skalIgnoreres) + class NyttAktivitetskort(aktivitetskortId:UUID, val oppfolgingsperiode: Oppfolgingsperiode, skalIgnoreres: Boolean): EndringsType(aktivitetskortId, skalIgnoreres) class NyttAktivitetskortByttPeriode(val oppfolgingsperiode: Oppfolgingsperiode, skalIgnoreres: Boolean): EndringsType(UUID.randomUUID(), skalIgnoreres) } diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt new file mode 100644 index 00000000..dfd5f7f0 --- /dev/null +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt @@ -0,0 +1,39 @@ +package no.nav.arena_tiltak_aktivitet_acl.repositories + +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.utils.getUUID +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import java.util.UUID + +class AktivitetskortIdRepository( + private val template: NamedParameterJdbcTemplate +) { + fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID { + val currentId = getCurrentId(deltakelseId, aktivitetKategori) + if (currentId != null) return currentId + + val generatedId = UUID.randomUUID() + val insertNewId = """ + INSERT INTO AKTIVITETSKORT_ID(id, kategori, deltakelse_id) VALUES (:id, :kategori, :deltakelseId) + """.trimIndent() + template.update(insertNewId, + mapOf( + "id" to generatedId, + "kategori" to aktivitetKategori.name, + "deltakelse_id" to deltakelseId, + )) + return generatedId + } + + private fun getCurrentId(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID? { + val getCurrentId = """ + SELECT id FROM AKTIVITETSKORT_ID WHERE deltakelse_id = :deltakelseId and kategori = :aktivitetKategori + """.trimIndent() + return template.query( + getCurrentId, + mapOf("deltakelseId" to deltakelseId.value, "aktivitetKategori" to aktivitetKategori.name)) { row, _ -> row.getUUID("id") } + .firstOrNull() + } + +} diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt index f387ac68..1c4a0836 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt @@ -11,6 +11,7 @@ import no.nav.arena_tiltak_aktivitet_acl.auth.Issuer import no.nav.arena_tiltak_aktivitet_acl.domain.dto.TranslationQuery import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository +import no.nav.arena_tiltak_aktivitet_acl.services.AktivitetskortIdService import no.nav.security.token.support.core.api.Protected import no.nav.security.token.support.core.api.ProtectedWithClaims import org.springframework.http.HttpStatus @@ -29,6 +30,7 @@ import java.util.* class TranslationController( private val authService: AuthService, private val aktivitetRepository: AktivitetRepository, + private val aktivitetskortIdService: AktivitetskortIdService ) { @ProtectedWithClaims(issuer = Issuer.AZURE_AD) @@ -43,8 +45,9 @@ class TranslationController( @RequestBody query: TranslationQuery ): UUID { authService.validerErM2MToken() - return aktivitetRepository.getCurrentAktivitetsId(DeltakelseId(query.arenaId), query.aktivitetKategori) - ?: throw ResponseStatusException(HttpStatus.NOT_FOUND, "No mapping found") + return aktivitetskortIdService.getOrCreate(DeltakelseId(query.arenaId), query.aktivitetKategori) +// return aktivitetRepository.getCurrentAktivitetsId(DeltakelseId(query.arenaId), query.aktivitetKategori) +// ?: throw ResponseStatusException(HttpStatus.NOT_FOUND, "No mapping found") } } diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt new file mode 100644 index 00000000..4d2f22ba --- /dev/null +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt @@ -0,0 +1,19 @@ +package no.nav.arena_tiltak_aktivitet_acl.services + +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository +import java.util.* + +class AktivitetskortIdService( + val aktivitetRepository: AktivitetRepository, + val aktivitetskortIdRepository: AktivitetskortIdRepository +) { + fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID { + val currentId = aktivitetRepository.getCurrentAktivitetsId(deltakelseId, aktivitetKategori) + if (currentId != null) return currentId + // Opprett i ny tabell + return aktivitetskortIdRepository.getOrCreate(deltakelseId, aktivitetKategori) + } +} diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt index 26876089..2f066fd7 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt @@ -9,7 +9,6 @@ import io.kotest.matchers.string.shouldMatch import no.nav.arena_tiltak_aktivitet_acl.clients.IdMappingClient import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.Oppfolgingsperiode import no.nav.arena_tiltak_aktivitet_acl.domain.db.IngestStatus -import no.nav.arena_tiltak_aktivitet_acl.domain.db.TranslationDbo import no.nav.arena_tiltak_aktivitet_acl.domain.dto.TranslationQuery import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.* import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId From 019787d812851e92a01a3b77191b0ad2fa19e9b1 Mon Sep 17 00:00:00 2001 From: sigurdgroneng Date: Tue, 31 Oct 2023 09:50:40 +0100 Subject: [PATCH 02/11] WIP: test --- .../rest/TranslationController.kt | 3 --- .../integration/DeltakerIntegrationTests.kt | 19 +++++++++++++++++++ .../processors/DeltakerProcessorTest.kt | 2 ++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt index 1c4a0836..3da0c96a 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt @@ -29,7 +29,6 @@ import java.util.* @RequestMapping("/api/translation") class TranslationController( private val authService: AuthService, - private val aktivitetRepository: AktivitetRepository, private val aktivitetskortIdService: AktivitetskortIdService ) { @@ -46,8 +45,6 @@ class TranslationController( ): UUID { authService.validerErM2MToken() return aktivitetskortIdService.getOrCreate(DeltakelseId(query.arenaId), query.aktivitetKategori) -// return aktivitetRepository.getCurrentAktivitetsId(DeltakelseId(query.arenaId), query.aktivitetKategori) -// ?: throw ResponseStatusException(HttpStatus.NOT_FOUND, "No mapping found") } } diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt index 2f066fd7..01ca48e0 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt @@ -779,6 +779,25 @@ class DeltakerIntegrationTests : IntegrationTestBase() { idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)).second shouldBe null } + @Test + fun `skal opprette mapping selvom aktivitet ikke finnes enda`() { + val (gjennomforingId, deltakerId) = setup() + idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)).second shouldNotBe null + val deltakerInput = DeltakerInput( + tiltakDeltakelseId = deltakerId, + tiltakgjennomforingId = gjennomforingId, + innsokBegrunnelse = "innsøkbegrunnelse", + datoFra = LocalDate.now().minusDays(1), + endretAv = Ident(ident = "SIG123"), + ) + val deltakerCommand = NyDeltakerCommand(deltakerInput) + deltakerExecutor.execute(deltakerCommand).arenaData { arenaData -> + arenaData.ingestStatus shouldBe IngestStatus.RETRY + arenaData.note shouldBe "LOL" + } + idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)).second shouldNotBe null + } + private val idMappingClient: IdMappingClient by lazy { val token = issueAzureAdM2MToken() IdMappingClient(port!!) { token } diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt index e5997bb2..2e120907 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt @@ -57,6 +57,7 @@ class DeltakerProcessorTest : FunSpec({ lateinit var arenaDataRepository: ArenaDataRepository lateinit var personSporingRepository: PersonSporingRepository lateinit var aktivitetRepository: AktivitetRepository + lateinit var aktivitetskortIdRespository: AktivitetskortIdRepository // Se SQL inserted før hver test val nonIgnoredGjennomforingArenaId = 1L @@ -86,6 +87,7 @@ class DeltakerProcessorTest : FunSpec({ tiltakService = TiltakService(TiltakRepository(template)), oppfolgingsperiodeService = OppfolgingsperiodeService(oppfolgingClient), personsporingService = PersonsporingService(personSporingRepository, ordsClient), + aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository) ) } From b7a493ccf0b0414e225f9fe19669ba71ff722f46 Mon Sep 17 00:00:00 2001 From: sigurdgroneng Date: Tue, 31 Oct 2023 13:13:40 +0100 Subject: [PATCH 03/11] Update tests --- .../processors/DeltakerProcessor.kt | 3 +- .../AktivitetskortIdRepository.kt | 14 +++++-- .../repositories/DeltakelseLockRepository.kt | 41 +++++++++++++++++++ .../services/AktivitetService.kt | 10 ++++- .../services/AktivitetskortIdService.kt | 17 +++++--- .../migration/V15__forelopig_aktivitet_id.sql | 7 ++++ .../integration/DeltakerIntegrationTests.kt | 19 +++++---- .../processors/DeltakerProcessorTest.kt | 7 +++- 8 files changed, 95 insertions(+), 23 deletions(-) create mode 100644 src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt create mode 100644 src/main/resources/db/migration/V15__forelopig_aktivitet_id.sql diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt index 5dcdee01..a832d78a 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessor.kt @@ -115,7 +115,7 @@ open class DeltakerProcessor( oppfolgingsperiode = periodeMatch.oppfolgingsperiode.uuid, oppfolgingsSluttDato = periodeMatch.oppfolgingsperiode.sluttDato ) - aktivitetService.upsert(aktivitet, aktivitetskortHeaders) + aktivitetService.upsert(aktivitet, aktivitetskortHeaders, deltakelse.tiltakdeltakelseId) if (endring.skalIgnoreres) { log.info("Deltakeren har status=${arenaDeltaker.DELTAKERSTATUSKODE} og administrasjonskode=${tiltak.administrasjonskode} som ikke skal håndteres") @@ -188,7 +188,6 @@ open class DeltakerProcessor( } fun getAkivitetskortId(deltakelseId: DeltakelseId): UUID { - // TODO: NOe service som fikser id-er for ikke opprettet return aktivitetskortIdService.getOrCreate(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET) } diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt index dfd5f7f0..b8b047de 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt @@ -4,8 +4,10 @@ import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategor import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId import no.nav.arena_tiltak_aktivitet_acl.utils.getUUID import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Component import java.util.UUID +@Component class AktivitetskortIdRepository( private val template: NamedParameterJdbcTemplate ) { @@ -15,24 +17,28 @@ class AktivitetskortIdRepository( val generatedId = UUID.randomUUID() val insertNewId = """ - INSERT INTO AKTIVITETSKORT_ID(id, kategori, deltakelse_id) VALUES (:id, :kategori, :deltakelseId) + INSERT INTO forelopig_aktivitet_id(id, kategori, deltakelse_id) VALUES (:id, :kategori, :deltakelseId) """.trimIndent() template.update(insertNewId, mapOf( "id" to generatedId, "kategori" to aktivitetKategori.name, - "deltakelse_id" to deltakelseId, + "deltakelseId" to deltakelseId.value, )) return generatedId } private fun getCurrentId(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID? { val getCurrentId = """ - SELECT id FROM AKTIVITETSKORT_ID WHERE deltakelse_id = :deltakelseId and kategori = :aktivitetKategori + SELECT id FROM forelopig_aktivitet_id WHERE deltakelse_id = :deltakelseId and kategori = :aktivitetKategori """.trimIndent() return template.query( getCurrentId, - mapOf("deltakelseId" to deltakelseId.value, "aktivitetKategori" to aktivitetKategori.name)) { row, _ -> row.getUUID("id") } + mapOf( + "deltakelseId" to deltakelseId.value, + "aktivitetKategori" to aktivitetKategori.name + ) + ) { row, _ -> row.getUUID("id") } .firstOrNull() } diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt new file mode 100644 index 00000000..511f3389 --- /dev/null +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt @@ -0,0 +1,41 @@ +package no.nav.arena_tiltak_aktivitet_acl.repositories + +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import org.intellij.lang.annotations.Language +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Component +import java.io.Closeable + +@Component +class DeltakelseLockRepository( + private val template: NamedParameterJdbcTemplate +) { + private fun lockDeltakelse(deltakelseId: DeltakelseId) { + @Language("postgresql") + val sql = """ + SELECT pg_advisory_lock(:deltakelseId); + """.trimIndent() + template.query(sql, mapOf("deltakelseId" to deltakelseId.value)) {} + } + + private fun unlockDeltakelse(deltakelseId: DeltakelseId) { + @Language("postgresql") + val sql = """ + SELECT pg_advisory_unlock(:deltakelseId); + """.trimIndent() + template.query(sql, mapOf("deltakelseId" to deltakelseId.value)) {} + } + + fun safeDeltakelse(deltakelseId: DeltakelseId): SafeDeltakelse { + lockDeltakelse(deltakelseId) + return SafeDeltakelse(this, deltakelseId) + } + + class SafeDeltakelse(val deltakelseLockRepository: DeltakelseLockRepository, val deltakelseId: DeltakelseId): Closeable { + override fun close() { + deltakelseLockRepository.unlockDeltakelse(deltakelseId) + } + } +} + + diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt index 5875b70d..9370eea6 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt @@ -7,14 +7,20 @@ import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Aktivitetskort import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetskortHeaders import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.DeltakelseLockRepository import org.springframework.stereotype.Service import java.util.UUID @Service class AktivitetService( - val aktivitetRepository: AktivitetRepository + val aktivitetRepository: AktivitetRepository, + val deltakerLockRepository: DeltakelseLockRepository ) { - fun upsert(aktivitet: Aktivitetskort, headers: AktivitetskortHeaders) = aktivitetRepository.upsert(aktivitet.toDbo(headers)) + fun upsert(aktivitet: Aktivitetskort, headers: AktivitetskortHeaders, deltakelseId: DeltakelseId) { + deltakerLockRepository.safeDeltakelse(deltakelseId).use { + aktivitetRepository.upsert(aktivitet.toDbo(headers)) + } + } fun get(aktivitetId: UUID) = aktivitetRepository.getAktivitet(aktivitetId) fun getAllBy(aktivitetId: DeltakelseId, aktivitetsKategori: AktivitetKategori) = aktivitetRepository.getAllBy(aktivitetId, aktivitetsKategori) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt index 4d2f22ba..be003797 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt @@ -4,16 +4,23 @@ import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategor import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.DeltakelseLockRepository +import org.springframework.stereotype.Service import java.util.* +@Service class AktivitetskortIdService( val aktivitetRepository: AktivitetRepository, - val aktivitetskortIdRepository: AktivitetskortIdRepository + val aktivitetskortIdRepository: AktivitetskortIdRepository, + val deltakelseLockRepository: DeltakelseLockRepository ) { fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID { - val currentId = aktivitetRepository.getCurrentAktivitetsId(deltakelseId, aktivitetKategori) - if (currentId != null) return currentId - // Opprett i ny tabell - return aktivitetskortIdRepository.getOrCreate(deltakelseId, aktivitetKategori) + // Lock on deltakelseId and force unlock + deltakelseLockRepository.safeDeltakelse(deltakelseId).use { + val currentId = aktivitetRepository.getCurrentAktivitetsId(deltakelseId, aktivitetKategori) + if (currentId != null) return currentId + // Opprett i ny tabell + return aktivitetskortIdRepository.getOrCreate(deltakelseId, aktivitetKategori) + } } } diff --git a/src/main/resources/db/migration/V15__forelopig_aktivitet_id.sql b/src/main/resources/db/migration/V15__forelopig_aktivitet_id.sql new file mode 100644 index 00000000..5f60d03c --- /dev/null +++ b/src/main/resources/db/migration/V15__forelopig_aktivitet_id.sql @@ -0,0 +1,7 @@ + +CREATE TABLE forelopig_aktivitet_id +( + id UUID PRIMARY KEY NOT NULL, + deltakelse_id BIGINT UNIQUE NOT NULL, + kategori VARCHAR NOT NULL +); \ No newline at end of file diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt index 01ca48e0..36262639 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt @@ -108,11 +108,11 @@ class DeltakerIntegrationTests : IntegrationTestBase() { } @Test - fun `skal gi 404 når id-mapping ikke finnes`() { + fun `skal gi 200 når id-mapping ikke finnes (og lage mapping)`() { val token = issueAzureAdM2MToken() val client = IdMappingClient(port!!) { token } val (response, _) = client.hentMapping(TranslationQuery(123123, AktivitetKategori.TILTAKSAKTIVITET)) - response.code shouldBe HttpStatus.NOT_FOUND.value() + response.code shouldBe HttpStatus.OK.value() } @Test @@ -762,7 +762,8 @@ class DeltakerIntegrationTests : IntegrationTestBase() { @Test fun `skal ikke opprettet aktivitetId (i mappingtabell) men ingeststatus oppdatereshvis sending av kafkamelding feiler`() { - doThrow(IllegalStateException("LOL")).`when`(kafkaProducerService).sendTilAktivitetskortTopic(this.any(UUID::class.java), any(KafkaMessageDto::class.java), any(AktivitetskortHeaders::class.java)) + doThrow(IllegalStateException("LOL")).`when`(kafkaProducerService) + .sendTilAktivitetskortTopic(this.any(UUID::class.java), any(KafkaMessageDto::class.java), any(AktivitetskortHeaders::class.java)) val (gjennomforingId, deltakerId) = setup() val deltakerInput = DeltakerInput( tiltakDeltakelseId = deltakerId, @@ -782,7 +783,9 @@ class DeltakerIntegrationTests : IntegrationTestBase() { @Test fun `skal opprette mapping selvom aktivitet ikke finnes enda`() { val (gjennomforingId, deltakerId) = setup() - idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)).second shouldNotBe null + val generertId = idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)) + .second + generertId shouldNotBe null val deltakerInput = DeltakerInput( tiltakDeltakelseId = deltakerId, tiltakgjennomforingId = gjennomforingId, @@ -791,11 +794,11 @@ class DeltakerIntegrationTests : IntegrationTestBase() { endretAv = Ident(ident = "SIG123"), ) val deltakerCommand = NyDeltakerCommand(deltakerInput) - deltakerExecutor.execute(deltakerCommand).arenaData { arenaData -> - arenaData.ingestStatus shouldBe IngestStatus.RETRY - arenaData.note shouldBe "LOL" + deltakerExecutor.execute(deltakerCommand).expectHandled { arenaData -> + arenaData.output.aktivitetskort.id shouldBe generertId } - idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)).second shouldNotBe null + idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)) + .second shouldBe generertId } private val idMappingClient: IdMappingClient by lazy { diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt index 2e120907..ab5f3fc0 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt @@ -58,6 +58,7 @@ class DeltakerProcessorTest : FunSpec({ lateinit var personSporingRepository: PersonSporingRepository lateinit var aktivitetRepository: AktivitetRepository lateinit var aktivitetskortIdRespository: AktivitetskortIdRepository + lateinit var deltakelseLockRepository: DeltakelseLockRepository // Se SQL inserted før hver test val nonIgnoredGjennomforingArenaId = 1L @@ -68,6 +69,8 @@ class DeltakerProcessorTest : FunSpec({ arenaDataRepository = ArenaDataRepository(template) personSporingRepository = PersonSporingRepository(template) aktivitetRepository = AktivitetRepository(template) + aktivitetskortIdRespository = AktivitetskortIdRepository(template) + deltakelseLockRepository = DeltakelseLockRepository(template) clearMocks(kafkaProducerService) DatabaseTestUtils.cleanAndInitDatabase(dataSource, "/deltaker-processor_test-data.sql") @@ -82,12 +85,12 @@ class DeltakerProcessorTest : FunSpec({ return DeltakerProcessor( arenaDataRepository = arenaDataRepository, kafkaProducerService = kafkaProducerService, - aktivitetService = AktivitetService(AktivitetRepository(template)), + aktivitetService = AktivitetService(AktivitetRepository(template), DeltakelseLockRepository(template)), gjennomforingRepository = GjennomforingRepository(template), tiltakService = TiltakService(TiltakRepository(template)), oppfolgingsperiodeService = OppfolgingsperiodeService(oppfolgingClient), personsporingService = PersonsporingService(personSporingRepository, ordsClient), - aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository) + aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) ) } From 1e4fa35e25de41ae91c941170183d469b579eaa1 Mon Sep 17 00:00:00 2001 From: sigurdgroneng Date: Tue, 31 Oct 2023 13:17:17 +0100 Subject: [PATCH 04/11] Update missing test --- .../integration/DeltakerIntegrationTests.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt index 36262639..0b271fe2 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt @@ -764,9 +764,9 @@ class DeltakerIntegrationTests : IntegrationTestBase() { fun `skal ikke opprettet aktivitetId (i mappingtabell) men ingeststatus oppdatereshvis sending av kafkamelding feiler`() { doThrow(IllegalStateException("LOL")).`when`(kafkaProducerService) .sendTilAktivitetskortTopic(this.any(UUID::class.java), any(KafkaMessageDto::class.java), any(AktivitetskortHeaders::class.java)) - val (gjennomforingId, deltakerId) = setup() + val (gjennomforingId, deltakelseId) = setup() val deltakerInput = DeltakerInput( - tiltakDeltakelseId = deltakerId, + tiltakDeltakelseId = deltakelseId, tiltakgjennomforingId = gjennomforingId, innsokBegrunnelse = "innsøkbegrunnelse", datoFra = LocalDate.now().minusDays(1), @@ -777,7 +777,7 @@ class DeltakerIntegrationTests : IntegrationTestBase() { arenaData.ingestStatus shouldBe IngestStatus.RETRY arenaData.note shouldBe "LOL" } - idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)).second shouldBe null + aktivitetRepository.getCurrentAktivitetsId(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET) shouldBe null } @Test From acc8fbb0a8c742a01c146fbfb1fe911a9afaed30 Mon Sep 17 00:00:00 2001 From: sigurdgroneng Date: Tue, 31 Oct 2023 13:30:47 +0100 Subject: [PATCH 05/11] Fix code smells --- .../arena_tiltak_aktivitet_acl/rest/TranslationController.kt | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt index 3da0c96a..10ac2860 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/rest/TranslationController.kt @@ -10,16 +10,13 @@ import no.nav.arena_tiltak_aktivitet_acl.auth.AuthService import no.nav.arena_tiltak_aktivitet_acl.auth.Issuer import no.nav.arena_tiltak_aktivitet_acl.domain.dto.TranslationQuery import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId -import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository import no.nav.arena_tiltak_aktivitet_acl.services.AktivitetskortIdService import no.nav.security.token.support.core.api.Protected import no.nav.security.token.support.core.api.ProtectedWithClaims -import org.springframework.http.HttpStatus import org.springframework.web.bind.annotation.PostMapping import org.springframework.web.bind.annotation.RequestBody import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.RestController -import org.springframework.web.server.ResponseStatusException import java.util.* From 3a568ba32312db7ab1dfb260e923242a3a9fa060 Mon Sep 17 00:00:00 2001 From: sigurdgroneng Date: Tue, 31 Oct 2023 14:01:16 +0100 Subject: [PATCH 06/11] Slett fra forelopig_aktivitet_id ved insert i aktivitet --- .../repositories/AktivitetskortIdRepository.kt | 12 ++++++++++++ .../services/AktivitetService.kt | 3 +++ .../processors/DeltakerProcessorTest.kt | 2 +- 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt index b8b047de..680733a3 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetskortIdRepository.kt @@ -11,6 +11,18 @@ import java.util.UUID class AktivitetskortIdRepository( private val template: NamedParameterJdbcTemplate ) { + + fun deleteDeltakelseId(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): Int { + val sql = """ + DELETE FROM forelopig_aktivitet_id WHERE deltakelse_id = :deltakelseId and kategori = :kategori + """.trimIndent() + return template.update(sql, + mapOf( + "kategori" to aktivitetKategori.name, + "deltakelseId" to deltakelseId.value, + )) + } + fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID { val currentId = getCurrentId(deltakelseId, aktivitetKategori) if (currentId != null) return currentId diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt index 9370eea6..22f4cf12 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt @@ -7,6 +7,7 @@ import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Aktivitetskort import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetskortHeaders import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.DeltakelseLockRepository import org.springframework.stereotype.Service import java.util.UUID @@ -14,11 +15,13 @@ import java.util.UUID @Service class AktivitetService( val aktivitetRepository: AktivitetRepository, + val aktivitetskortIdRepository: AktivitetskortIdRepository, val deltakerLockRepository: DeltakelseLockRepository ) { fun upsert(aktivitet: Aktivitetskort, headers: AktivitetskortHeaders, deltakelseId: DeltakelseId) { deltakerLockRepository.safeDeltakelse(deltakelseId).use { aktivitetRepository.upsert(aktivitet.toDbo(headers)) + aktivitetskortIdRepository.deleteDeltakelseId(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET) } } fun get(aktivitetId: UUID) = aktivitetRepository.getAktivitet(aktivitetId) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt index ab5f3fc0..27fe964b 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt @@ -85,7 +85,7 @@ class DeltakerProcessorTest : FunSpec({ return DeltakerProcessor( arenaDataRepository = arenaDataRepository, kafkaProducerService = kafkaProducerService, - aktivitetService = AktivitetService(AktivitetRepository(template), DeltakelseLockRepository(template)), + aktivitetService = AktivitetService(AktivitetRepository(template), AktivitetskortIdRepository(template), DeltakelseLockRepository(template)), gjennomforingRepository = GjennomforingRepository(template), tiltakService = TiltakService(TiltakRepository(template)), oppfolgingsperiodeService = OppfolgingsperiodeService(oppfolgingClient), From adc74fec782500b601482ffdf0ad38a630ea10a9 Mon Sep 17 00:00:00 2001 From: Hans Petter Simonsen Date: Wed, 1 Nov 2023 23:14:41 +0100 Subject: [PATCH 07/11] =?UTF-8?q?Testing=20av=20race=20conditions=20mellom?= =?UTF-8?q?=20deltaker-meldinger=20og=20komet-translation-sp=C3=B8rringer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../integration/DeltakerIntegrationTests.kt | 37 ++++++++++ .../processors/DeltakerProcessorTest.kt | 74 +++++++++++++++++-- 2 files changed, 106 insertions(+), 5 deletions(-) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt index 0b271fe2..98a1959d 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/DeltakerIntegrationTests.kt @@ -6,6 +6,10 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe import io.kotest.matchers.string.shouldContain import io.kotest.matchers.string.shouldMatch +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import no.nav.arena_tiltak_aktivitet_acl.clients.IdMappingClient import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.Oppfolgingsperiode import no.nav.arena_tiltak_aktivitet_acl.domain.db.IngestStatus @@ -801,6 +805,39 @@ class DeltakerIntegrationTests : IntegrationTestBase() { .second shouldBe generertId } + @Test + fun `skal ha riktig mapping selv om translationcontroller og deltakerprocessor kjoerer samtidig`() { + val (gjennomforingId, deltakerId) = setup() + val generertId = idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)) + .second + generertId shouldNotBe null + val deltakerInput = DeltakerInput( + tiltakDeltakelseId = deltakerId, + tiltakgjennomforingId = gjennomforingId, + innsokBegrunnelse = "innsøkbegrunnelse", + datoFra = LocalDate.now().minusDays(1), + endretAv = Ident(ident = "SIG123"), + ) + val deltakerCommand = NyDeltakerCommand(deltakerInput) + var generertId1: UUID? = null + var generertId2: UUID? = null + runBlocking { + async(Dispatchers.IO) {// NB Ikke bruk default-dispatcher for async. Den håndterer ikke blokkerende kall + deltakerExecutor.execute(deltakerCommand).expectHandled { arenaData -> + generertId1 = arenaData.output.aktivitetskort.id + } + } + async(Dispatchers.IO) { + var delayTime = Random.nextLong(50, 200) + delay(delayTime) + generertId2 = idMappingClient.hentMapping(TranslationQuery(deltakerId.value, AktivitetKategori.TILTAKSAKTIVITET)).second + } + } + generertId1 shouldNotBe null + generertId1 shouldBe generertId2 + + } + private val idMappingClient: IdMappingClient by lazy { val token = issueAzureAdM2MToken() IdMappingClient(port!!) { token } diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt index 27fe964b..c60accd6 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt @@ -4,18 +4,20 @@ import ArenaOrdsProxyClient import io.kotest.assertions.throwables.shouldNotThrowAny import io.kotest.assertions.throwables.shouldThrowExactly import io.kotest.core.spec.style.FunSpec -import io.kotest.matchers.reflection.beLateInit import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe import io.mockk.* +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.OppfolgingClient import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.Oppfolgingsperiode import no.nav.arena_tiltak_aktivitet_acl.database.DatabaseTestUtils import no.nav.arena_tiltak_aktivitet_acl.database.SingletonPostgresContainer import no.nav.arena_tiltak_aktivitet_acl.domain.db.ArenaDataDbo import no.nav.arena_tiltak_aktivitet_acl.domain.db.IngestStatus -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Operation +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.* import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId import no.nav.arena_tiltak_aktivitet_acl.exceptions.DependencyNotIngestedException import no.nav.arena_tiltak_aktivitet_acl.exceptions.IgnoredException @@ -24,13 +26,13 @@ import no.nav.arena_tiltak_aktivitet_acl.mocks.OppfolgingClientMock import no.nav.arena_tiltak_aktivitet_acl.repositories.* import no.nav.arena_tiltak_aktivitet_acl.services.* import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName +import org.slf4j.LoggerFactory import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate import java.time.LocalDateTime import java.time.ZonedDateTime import java.util.* class DeltakerProcessorTest : FunSpec({ - val dataSource = SingletonPostgresContainer.getDataSource() val ordsClient by lazy { @@ -60,6 +62,24 @@ class DeltakerProcessorTest : FunSpec({ lateinit var aktivitetskortIdRespository: AktivitetskortIdRepository lateinit var deltakelseLockRepository: DeltakelseLockRepository + val logger = LoggerFactory.getLogger(javaClass) + + val aktivitetDboSlot = slot() + val slowAktivitetRepository by lazy { + val spyRepo = mockk() + var delayed = true + coEvery {spyRepo.upsert(aktivitet = capture(aktivitetDboSlot))} coAnswers { + logger.info("In mockrepo for deltakelse: ${aktivitetDboSlot.captured.arenaId}") + if (delayed) { + delayed = false + logger.info("Sleeping 50ms for deltakelse ${aktivitetDboSlot.captured.arenaId}") + delay(50) + } + aktivitetRepository.upsert(aktivitetDboSlot.captured) + } + spyRepo + } + // Se SQL inserted før hver test val nonIgnoredGjennomforingArenaId = 1L val ignoredGjennomforingArenaId = 2L @@ -253,5 +273,49 @@ class DeltakerProcessorTest : FunSpec({ createDeltakerProcessor(oppfolgingsperioder).handleArenaMessage(newDeltaker) } } -}) + test("testing race condition in repo - not deltakerprocessor") { + val firstTimeSlowAktivitetskortService = AktivitetService(slowAktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) + val aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) + + val deltakelse = DeltakelseId(12345L) + val aktivitetskort1 = Aktivitetskort( + id = UUID.randomUUID(), + personIdent = "12345678901", + tittel = "Tittel", + aktivitetStatus = AktivitetStatus.GJENNOMFORES, + etiketter = emptyList(), + startDato = null, + sluttDato = null, + beskrivelse = null, + endretAv = Ident("ARENAIDENT","AKS999"), + endretTidspunkt = LocalDateTime.now(), + avtaltMedNav = true, + detaljer = emptyList() + ) + val headers1 = AktivitetskortHeaders( + arenaId = "ARENATA${deltakelse.value}", + tiltakKode = "VASV", + oppfolgingsperiode = UUID.randomUUID(), + oppfolgingsSluttDato = ZonedDateTime.now().minusDays(5) + ) + val aktivitetskort2 = aktivitetskort1.copy(id = UUID.randomUUID()) + val headers2 = headers1.copy( oppfolgingsperiode = UUID.randomUUID(), oppfolgingsSluttDato = null) + + coroutineScope { + async(Dispatchers.IO) {// Do not use available default dispatcher, as it is not meant for blocking calls + logger.info("First upsert start") + firstTimeSlowAktivitetskortService.upsert(aktivitetskort1, headers1, deltakelse) + logger.info("First upsert end") + } + async(Dispatchers.IO) { + logger.info("Second upsert start") + firstTimeSlowAktivitetskortService.upsert(aktivitetskort2, headers2, deltakelse) + logger.info("Second upsert end") + } + }.await() + val gimmeId = aktivitetskortIdService.getOrCreate(deltakelse, AktivitetKategori.TILTAKSAKTIVITET) + gimmeId shouldBe aktivitetskort2.id // den uten oppfølgingsluttdato + } + +}) From 4612f53c1c73b1808925fd0cacd379a41733d9ec Mon Sep 17 00:00:00 2001 From: Hans Petter Simonsen Date: Thu, 2 Nov 2023 11:04:33 +0100 Subject: [PATCH 08/11] Improved concurrency test --- .../processors/DeltakerProcessorTest.kt | 61 ++++++++++++++++++- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt index c60accd6..193d4491 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt @@ -4,6 +4,7 @@ import ArenaOrdsProxyClient import io.kotest.assertions.throwables.shouldNotThrowAny import io.kotest.assertions.throwables.shouldThrowExactly import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldContainInOrder import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe import io.mockk.* @@ -31,6 +32,7 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate import java.time.LocalDateTime import java.time.ZonedDateTime import java.util.* +import kotlin.random.Random class DeltakerProcessorTest : FunSpec({ val dataSource = SingletonPostgresContainer.getDataSource() @@ -72,8 +74,9 @@ class DeltakerProcessorTest : FunSpec({ logger.info("In mockrepo for deltakelse: ${aktivitetDboSlot.captured.arenaId}") if (delayed) { delayed = false - logger.info("Sleeping 50ms for deltakelse ${aktivitetDboSlot.captured.arenaId}") - delay(50) + val delayTime = Random.nextLong(50L, 200L) + logger.info("Sleeping $delayTime ms for deltakelse ${aktivitetDboSlot.captured.arenaId}") + delay(delayTime) } aktivitetRepository.upsert(aktivitetDboSlot.captured) } @@ -274,7 +277,7 @@ class DeltakerProcessorTest : FunSpec({ } } - test("testing race condition in repo - not deltakerprocessor") { + test("Block processing when concurrent calls on same deltakelseId") { val firstTimeSlowAktivitetskortService = AktivitetService(slowAktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) val aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) @@ -302,20 +305,72 @@ class DeltakerProcessorTest : FunSpec({ val aktivitetskort2 = aktivitetskort1.copy(id = UUID.randomUUID()) val headers2 = headers1.copy( oppfolgingsperiode = UUID.randomUUID(), oppfolgingsSluttDato = null) + + val processOrder = mutableListOf() coroutineScope { async(Dispatchers.IO) {// Do not use available default dispatcher, as it is not meant for blocking calls logger.info("First upsert start") firstTimeSlowAktivitetskortService.upsert(aktivitetskort1, headers1, deltakelse) logger.info("First upsert end") + processOrder.add(1L) } async(Dispatchers.IO) { logger.info("Second upsert start") firstTimeSlowAktivitetskortService.upsert(aktivitetskort2, headers2, deltakelse) logger.info("Second upsert end") + processOrder.add(2L) } }.await() val gimmeId = aktivitetskortIdService.getOrCreate(deltakelse, AktivitetKategori.TILTAKSAKTIVITET) gimmeId shouldBe aktivitetskort2.id // den uten oppfølgingsluttdato + processOrder shouldContainInOrder listOf(1L, 2L) + } + + test("Do not block processing when concurrent calls on different deltakelseId") { + val firstTimeSlowAktivitetskortService = AktivitetService(slowAktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) + val aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) + + val deltakelse1 = DeltakelseId(12345L) + val deltakelse2 = DeltakelseId(23456L) + val aktivitetskort1 = Aktivitetskort( + id = UUID.randomUUID(), + personIdent = "12345678901", + tittel = "Tittel", + aktivitetStatus = AktivitetStatus.GJENNOMFORES, + etiketter = emptyList(), + startDato = null, + sluttDato = null, + beskrivelse = null, + endretAv = Ident("ARENAIDENT","AKS999"), + endretTidspunkt = LocalDateTime.now(), + avtaltMedNav = true, + detaljer = emptyList() + ) + val headers1 = AktivitetskortHeaders( + arenaId = "ARENATA${deltakelse1.value}", + tiltakKode = "VASV", + oppfolgingsperiode = UUID.randomUUID(), + oppfolgingsSluttDato = null + ) + val aktivitetskort2 = aktivitetskort1.copy(id = UUID.randomUUID()) + val headers2 = headers1.copy( oppfolgingsperiode = UUID.randomUUID(), arenaId = "ARENATA${deltakelse2.value}") + + val processOrder = mutableListOf() + coroutineScope { + async(Dispatchers.IO) {// Do not use available default dispatcher, as it is not meant for blocking calls + logger.info("First upsert start") + firstTimeSlowAktivitetskortService.upsert(aktivitetskort1, headers1, deltakelse1) + logger.info("First upsert end") + processOrder.add(1L) + } + async(Dispatchers.IO) { + logger.info("Second upsert start") + firstTimeSlowAktivitetskortService.upsert(aktivitetskort2, headers2, deltakelse1) + logger.info("Second upsert end") + processOrder.add(2L) + } + }.await() + processOrder shouldContainInOrder listOf(2L, 1L) } }) From 9da5b366887a1180dba7fc9e7abd4ce5f25534f2 Mon Sep 17 00:00:00 2001 From: sigurdgroneng Date: Thu, 2 Nov 2023 15:53:51 +0100 Subject: [PATCH 09/11] WIP: Noe er galt med pg_avisory_lock --- .../repositories/AktivitetRepository.kt | 1 + .../repositories/DeltakelseLockRepository.kt | 6 + .../database/SingletonPostgresContainer.kt | 2 +- .../services/AktivitetServiceTest.kt | 143 ++++++++++++++++++ 4 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt index 9179dc4d..0dff223a 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt @@ -20,6 +20,7 @@ open class AktivitetRepository( ) { private val log = LoggerFactory.getLogger(javaClass) fun upsert(aktivitet: AktivitetDbo) { + log.info("In repo ${aktivitet.id} ${aktivitet.oppfolgingsperiodeUUID} ${aktivitet.arenaId}") @Language("PostgreSQL") val sql = """ INSERT INTO aktivitet(id, person_ident, kategori_type, data, arena_id, tiltak_kode, oppfolgingsperiode_uuid, oppfolgingsperiode_slutt_tidspunkt) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt index 511f3389..fd85c2b2 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt @@ -2,14 +2,17 @@ package no.nav.arena_tiltak_aktivitet_acl.repositories import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId import org.intellij.lang.annotations.Language +import org.slf4j.LoggerFactory import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate import org.springframework.stereotype.Component import java.io.Closeable +import java.sql.ResultSet @Component class DeltakelseLockRepository( private val template: NamedParameterJdbcTemplate ) { + private val log = LoggerFactory.getLogger(javaClass) private fun lockDeltakelse(deltakelseId: DeltakelseId) { @Language("postgresql") val sql = """ @@ -28,12 +31,15 @@ class DeltakelseLockRepository( fun safeDeltakelse(deltakelseId: DeltakelseId): SafeDeltakelse { lockDeltakelse(deltakelseId) + log.info("Lock on $deltakelseId") return SafeDeltakelse(this, deltakelseId) } class SafeDeltakelse(val deltakelseLockRepository: DeltakelseLockRepository, val deltakelseId: DeltakelseId): Closeable { + private val log = LoggerFactory.getLogger(javaClass) override fun close() { deltakelseLockRepository.unlockDeltakelse(deltakelseId) + log.info("Unlock on $deltakelseId") } } } diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/database/SingletonPostgresContainer.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/database/SingletonPostgresContainer.kt index 637b7a66..904c9842 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/database/SingletonPostgresContainer.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/database/SingletonPostgresContainer.kt @@ -66,7 +66,7 @@ object SingletonPostgresContainer { config.jdbcUrl = container.jdbcUrl config.username = container.username config.password = container.password - config.maximumPoolSize = 3 + config.maximumPoolSize = 10 config.minimumIdle = 1 return HikariDataSource(config) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt new file mode 100644 index 00000000..fcc3eb5e --- /dev/null +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt @@ -0,0 +1,143 @@ +package no.nav.arena_tiltak_aktivitet_acl.services + +import io.kotest.common.runBlocking +import io.kotest.core.spec.style.StringSpec +import io.kotest.matchers.shouldBe +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.* +import no.nav.arena_tiltak_aktivitet_acl.database.SingletonPostgresContainer +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.* +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetDbo +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository +import no.nav.arena_tiltak_aktivitet_acl.repositories.DeltakelseLockRepository +import org.slf4j.LoggerFactory +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import java.time.LocalDateTime +import java.time.ZonedDateTime +import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean + +class AktivitetServiceTest : StringSpec({ + val log = LoggerFactory.getLogger(javaClass) + + val dataSource = SingletonPostgresContainer.getDataSource() + val template = NamedParameterJdbcTemplate(dataSource) + val aktivitetRepository = AktivitetRepository(template) + + fun slowAktivitetRepository(): AktivitetRepository { + val mock = mockk() + val isSlow = AtomicBoolean(true) + + every { mock.upsert(any()) } answers { + // Make sure first call is slow + val isFirst = isSlow.getAndSet(false) + val aktivitet = firstArg() + log.info("isFirst $isFirst arenaId: ${aktivitet.arenaId} ${aktivitet}") + if (isFirst) { + runBlocking { delay(100) } + aktivitetRepository.upsert(aktivitet) + } else { + aktivitetRepository.upsert(aktivitet) + } + } + return mock + } + val aktivitetskortIdRepository = AktivitetskortIdRepository(template) + val deltakerLockRepository = DeltakelseLockRepository(template) + + val aktivitetskort = Aktivitetskort( + id = UUID.randomUUID(), + personIdent = "01234567890", + tittel = "Tittel", + aktivitetStatus = AktivitetStatus.GJENNOMFORES, + etiketter = emptyList(), + startDato = null, + sluttDato = null, + beskrivelse = null, + endretAv = Ident("IdentType", "ident"), + endretTidspunkt = LocalDateTime.now(), + avtaltMedNav = true, + detaljer = emptyList() + ) + + fun headers(deltakelseId: DeltakelseId): AktivitetskortHeaders { + return AktivitetskortHeaders( + "${AktivitetKategori.TILTAKSAKTIVITET.prefix}${deltakelseId.value}", + "ASDAS", + UUID.randomUUID(), + null + ) + } + fun aktivitetskort(): Aktivitetskort { + return aktivitetskort + .copy(id = UUID.randomUUID()) + } + fun firstSlowAktivitetsService() = AktivitetService( + slowAktivitetRepository(), + aktivitetskortIdRepository, + deltakerLockRepository + ) + + "skal blokkere prossessring på samme deltakelse-id" { + val firstSlowAktivitetsService = firstSlowAktivitetsService() + val deltakelseId = DeltakelseId(1123) + val aktivitetskort = aktivitetskort() + val headers = headers(deltakelseId) + + val startOrder = mutableListOf() + val excutionOrder = mutableListOf() + val first = async(Dispatchers.IO) { + startOrder.add(1) + firstSlowAktivitetsService.upsert( + aktivitetskort, + headers, + deltakelseId, + ) + excutionOrder.add(1) + } + val second = async(Dispatchers.IO) { + delay(10) + startOrder.add(2) + firstSlowAktivitetsService.upsert( + aktivitetskort, + headers.copy(oppfolgingsSluttDato = ZonedDateTime.now()), + deltakelseId, + ) + excutionOrder.add(2) + } + listOf(first, second).awaitAll() + startOrder shouldBe listOf(1,2) + excutionOrder shouldBe listOf(1,2) + } + + "skal ikke blokke andre deltakelse-id-er" { + val firstSlowAktivitetsService = firstSlowAktivitetsService() + val deltakelseId = DeltakelseId(2123) + val deltakelseId2 = DeltakelseId(2321) + + val excutionOrder = mutableListOf() + val first = async(Dispatchers.IO) { + firstSlowAktivitetsService.upsert( + aktivitetskort(), + headers(deltakelseId), + deltakelseId, + ) + excutionOrder.add(deltakelseId.value) + } + val second = async(Dispatchers.IO) { + delay(10) + firstSlowAktivitetsService.upsert( + aktivitetskort(), + headers(deltakelseId2), + deltakelseId2, + ) + excutionOrder.add(deltakelseId2.value) + } + listOf(first, second).awaitAll() + excutionOrder shouldBe listOf(deltakelseId2.value, deltakelseId.value) + } + +}) From 27443ea944c6187bdb36d9d1e6ee6d61fbf9e282 Mon Sep 17 00:00:00 2001 From: Hans Petter Simonsen Date: Fri, 3 Nov 2023 22:40:46 +0100 Subject: [PATCH 10/11] =?UTF-8?q?Implement=20advisory=20lock=20som=20trans?= =?UTF-8?q?actional=20lock.=20Session=20lock=20vil=20kunne=20feile=20fordi?= =?UTF-8?q?=20jdbctemplate=20gjenbruker=20db-sesjoner=20i=20h=C3=B8y=20gra?= =?UTF-8?q?d.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../repositories/AdvisoryLockRepository.kt | 51 ++++++ .../repositories/AktivitetRepository.kt | 7 +- .../repositories/DeltakelseLockRepository.kt | 47 ----- .../services/AktivitetService.kt | 24 ++- .../services/AktivitetskortIdService.kt | 19 +- .../processors/DeltakerProcessorTest.kt | 133 +------------- .../services/AktivitetServiceTest.kt | 170 ++++++++++++------ 7 files changed, 206 insertions(+), 245 deletions(-) create mode 100644 src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AdvisoryLockRepository.kt delete mode 100644 src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AdvisoryLockRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AdvisoryLockRepository.kt new file mode 100644 index 00000000..62379656 --- /dev/null +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AdvisoryLockRepository.kt @@ -0,0 +1,51 @@ +package no.nav.arena_tiltak_aktivitet_acl.repositories + +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import org.intellij.lang.annotations.Language +import org.slf4j.LoggerFactory +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Component +import org.springframework.transaction.annotation.Propagation +import org.springframework.transaction.annotation.Transactional +import java.io.Closeable + +@Component +open class AdvisoryLockRepository( + private val template: NamedParameterJdbcTemplate +) { + private val log = LoggerFactory.getLogger(javaClass) + + /** + * Acquire a transactional advisory lock on a lockId. The duration of the lock is that of the containing transaction, + * whence the methode requires a transaction already to be present before the call. + */ + @Transactional(propagation = Propagation.MANDATORY) + open fun aquireTransactionalAdvisoryLock(lockId: Long) { + @Language("postgresql") + val sql = """ + SELECT pg_advisory_xact_lock(:lockId); + """.trimIndent() + template.query(sql, mapOf("lockId" to lockId)) {} + } + + @Transactional(propagation = Propagation.MANDATORY) + open fun lockDeltakelse(deltakelseId: DeltakelseId) { + aquireTransactionalAdvisoryLock(deltakelseId.value) + } + + @Transactional(propagation = Propagation.MANDATORY) + open fun safeDeltakelse(deltakelseId: DeltakelseId): SafeDeltakelse { + lockDeltakelse(deltakelseId) + log.info("Acquired lock on deltakelseId: $deltakelseId") + return SafeDeltakelse(deltakelseId) + } + + class SafeDeltakelse(val deltakelseId: DeltakelseId): Closeable { + private val log = LoggerFactory.getLogger(javaClass) + override fun close() { + log.info("Safe to release lock on deltakelseId: $deltakelseId") + } + } +} + + diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt index 0dff223a..2d2cb17d 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/AktivitetRepository.kt @@ -3,7 +3,8 @@ package no.nav.arena_tiltak_aktivitet_acl.repositories import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.AvsluttetOppfolgingsperiode import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId -import no.nav.arena_tiltak_aktivitet_acl.utils.* +import no.nav.arena_tiltak_aktivitet_acl.utils.getNullableZonedDateTime +import no.nav.arena_tiltak_aktivitet_acl.utils.getUUID import org.intellij.lang.annotations.Language import org.slf4j.LoggerFactory import org.springframework.dao.IncorrectResultSizeDataAccessException @@ -15,12 +16,12 @@ import java.sql.ResultSet import java.util.* @Component -open class AktivitetRepository( +class AktivitetRepository( private val template: NamedParameterJdbcTemplate ) { private val log = LoggerFactory.getLogger(javaClass) fun upsert(aktivitet: AktivitetDbo) { - log.info("In repo ${aktivitet.id} ${aktivitet.oppfolgingsperiodeUUID} ${aktivitet.arenaId}") + log.info("In repo ${aktivitet.id} ${aktivitet.oppfolgingsperiodeUUID} ${aktivitet.arenaId} ${aktivitet.oppfolgingsSluttTidspunkt}") @Language("PostgreSQL") val sql = """ INSERT INTO aktivitet(id, person_ident, kategori_type, data, arena_id, tiltak_kode, oppfolgingsperiode_uuid, oppfolgingsperiode_slutt_tidspunkt) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt deleted file mode 100644 index fd85c2b2..00000000 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/repositories/DeltakelseLockRepository.kt +++ /dev/null @@ -1,47 +0,0 @@ -package no.nav.arena_tiltak_aktivitet_acl.repositories - -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId -import org.intellij.lang.annotations.Language -import org.slf4j.LoggerFactory -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate -import org.springframework.stereotype.Component -import java.io.Closeable -import java.sql.ResultSet - -@Component -class DeltakelseLockRepository( - private val template: NamedParameterJdbcTemplate -) { - private val log = LoggerFactory.getLogger(javaClass) - private fun lockDeltakelse(deltakelseId: DeltakelseId) { - @Language("postgresql") - val sql = """ - SELECT pg_advisory_lock(:deltakelseId); - """.trimIndent() - template.query(sql, mapOf("deltakelseId" to deltakelseId.value)) {} - } - - private fun unlockDeltakelse(deltakelseId: DeltakelseId) { - @Language("postgresql") - val sql = """ - SELECT pg_advisory_unlock(:deltakelseId); - """.trimIndent() - template.query(sql, mapOf("deltakelseId" to deltakelseId.value)) {} - } - - fun safeDeltakelse(deltakelseId: DeltakelseId): SafeDeltakelse { - lockDeltakelse(deltakelseId) - log.info("Lock on $deltakelseId") - return SafeDeltakelse(this, deltakelseId) - } - - class SafeDeltakelse(val deltakelseLockRepository: DeltakelseLockRepository, val deltakelseId: DeltakelseId): Closeable { - private val log = LoggerFactory.getLogger(javaClass) - override fun close() { - deltakelseLockRepository.unlockDeltakelse(deltakelseId) - log.info("Unlock on $deltakelseId") - } - } -} - - diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt index 22f4cf12..2a970791 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetService.kt @@ -6,29 +6,37 @@ import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategor import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Aktivitetskort import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetskortHeaders import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.repositories.AdvisoryLockRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository -import no.nav.arena_tiltak_aktivitet_acl.repositories.DeltakelseLockRepository import org.springframework.stereotype.Service -import java.util.UUID +import org.springframework.transaction.annotation.Propagation +import org.springframework.transaction.annotation.Transactional +import java.util.* @Service -class AktivitetService( +open class AktivitetService( val aktivitetRepository: AktivitetRepository, val aktivitetskortIdRepository: AktivitetskortIdRepository, - val deltakerLockRepository: DeltakelseLockRepository + val deltakerLockRepository: AdvisoryLockRepository ) { - fun upsert(aktivitet: Aktivitetskort, headers: AktivitetskortHeaders, deltakelseId: DeltakelseId) { + /** + * SafeDeltakelse will make sure no other transaction is processing the same deltakelse for the duration of the ongoing transaction. + * If another transaction is processing the same deltakelse (i.e. TranslationController, AktivitetskortIdService) this transaction will wait its turn until the other transaction is complete. + * @see no.nav.arena_tiltak_aktivitet_acl.services.AktivitetskortIdService.getOrCreate + */ + @Transactional(propagation = Propagation.MANDATORY) + open fun upsert(aktivitet: Aktivitetskort, headers: AktivitetskortHeaders, deltakelseId: DeltakelseId) { deltakerLockRepository.safeDeltakelse(deltakelseId).use { aktivitetRepository.upsert(aktivitet.toDbo(headers)) aktivitetskortIdRepository.deleteDeltakelseId(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET) } } - fun get(aktivitetId: UUID) = aktivitetRepository.getAktivitet(aktivitetId) - fun getAllBy(aktivitetId: DeltakelseId, aktivitetsKategori: AktivitetKategori) = + open fun get(aktivitetId: UUID) = aktivitetRepository.getAktivitet(aktivitetId) + open fun getAllBy(aktivitetId: DeltakelseId, aktivitetsKategori: AktivitetKategori) = aktivitetRepository.getAllBy(aktivitetId, aktivitetsKategori) - fun closeClosedPerioder(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori, oppfolgingsperioder: List) { + open fun closeClosedPerioder(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori, oppfolgingsperioder: List) { val avsluttedePerioder = oppfolgingsperioder .mapNotNull { it.sluttDato diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt index be003797..bfad6ad7 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetskortIdService.kt @@ -2,21 +2,28 @@ package no.nav.arena_tiltak_aktivitet_acl.services import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.repositories.AdvisoryLockRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository -import no.nav.arena_tiltak_aktivitet_acl.repositories.DeltakelseLockRepository import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional import java.util.* @Service -class AktivitetskortIdService( +open class AktivitetskortIdService( val aktivitetRepository: AktivitetRepository, val aktivitetskortIdRepository: AktivitetskortIdRepository, - val deltakelseLockRepository: DeltakelseLockRepository + val advisoryLockRepository: AdvisoryLockRepository ) { - fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID { - // Lock on deltakelseId and force unlock - deltakelseLockRepository.safeDeltakelse(deltakelseId).use { + /** + * SafeDeltakelse will make sure no other transaction is processing the same deltakelse for the duration of the ongoing transaction. + * If another transaction is processing the same deltakelse (i.e. AktivitetService) this transaction will wait its turn until the other transaction is complete. + * @see no.nav.arena_tiltak_aktivitet_acl.services.AktivitetService.upsert + */ + @Transactional + open fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID { + // Lock on deltakelseId. Gjelder så lenge den pågående transaksjonen er aktiv. + advisoryLockRepository.safeDeltakelse(deltakelseId).use { val currentId = aktivitetRepository.getCurrentAktivitetsId(deltakelseId, aktivitetKategori) if (currentId != null) return currentId // Opprett i ny tabell diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt index 193d4491..0bf6ced1 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/processors/DeltakerProcessorTest.kt @@ -4,21 +4,17 @@ import ArenaOrdsProxyClient import io.kotest.assertions.throwables.shouldNotThrowAny import io.kotest.assertions.throwables.shouldThrowExactly import io.kotest.core.spec.style.FunSpec -import io.kotest.matchers.collections.shouldContainInOrder import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe import io.mockk.* -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.OppfolgingClient import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.Oppfolgingsperiode import no.nav.arena_tiltak_aktivitet_acl.database.DatabaseTestUtils import no.nav.arena_tiltak_aktivitet_acl.database.SingletonPostgresContainer import no.nav.arena_tiltak_aktivitet_acl.domain.db.ArenaDataDbo import no.nav.arena_tiltak_aktivitet_acl.domain.db.IngestStatus -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.* +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Operation import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId import no.nav.arena_tiltak_aktivitet_acl.exceptions.DependencyNotIngestedException import no.nav.arena_tiltak_aktivitet_acl.exceptions.IgnoredException @@ -27,12 +23,10 @@ import no.nav.arena_tiltak_aktivitet_acl.mocks.OppfolgingClientMock import no.nav.arena_tiltak_aktivitet_acl.repositories.* import no.nav.arena_tiltak_aktivitet_acl.services.* import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName -import org.slf4j.LoggerFactory import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate import java.time.LocalDateTime import java.time.ZonedDateTime import java.util.* -import kotlin.random.Random class DeltakerProcessorTest : FunSpec({ val dataSource = SingletonPostgresContainer.getDataSource() @@ -62,26 +56,7 @@ class DeltakerProcessorTest : FunSpec({ lateinit var personSporingRepository: PersonSporingRepository lateinit var aktivitetRepository: AktivitetRepository lateinit var aktivitetskortIdRespository: AktivitetskortIdRepository - lateinit var deltakelseLockRepository: DeltakelseLockRepository - - val logger = LoggerFactory.getLogger(javaClass) - - val aktivitetDboSlot = slot() - val slowAktivitetRepository by lazy { - val spyRepo = mockk() - var delayed = true - coEvery {spyRepo.upsert(aktivitet = capture(aktivitetDboSlot))} coAnswers { - logger.info("In mockrepo for deltakelse: ${aktivitetDboSlot.captured.arenaId}") - if (delayed) { - delayed = false - val delayTime = Random.nextLong(50L, 200L) - logger.info("Sleeping $delayTime ms for deltakelse ${aktivitetDboSlot.captured.arenaId}") - delay(delayTime) - } - aktivitetRepository.upsert(aktivitetDboSlot.captured) - } - spyRepo - } + lateinit var advisoryLockRepository: AdvisoryLockRepository // Se SQL inserted før hver test val nonIgnoredGjennomforingArenaId = 1L @@ -93,7 +68,7 @@ class DeltakerProcessorTest : FunSpec({ personSporingRepository = PersonSporingRepository(template) aktivitetRepository = AktivitetRepository(template) aktivitetskortIdRespository = AktivitetskortIdRepository(template) - deltakelseLockRepository = DeltakelseLockRepository(template) + advisoryLockRepository = AdvisoryLockRepository(template) clearMocks(kafkaProducerService) DatabaseTestUtils.cleanAndInitDatabase(dataSource, "/deltaker-processor_test-data.sql") @@ -108,12 +83,12 @@ class DeltakerProcessorTest : FunSpec({ return DeltakerProcessor( arenaDataRepository = arenaDataRepository, kafkaProducerService = kafkaProducerService, - aktivitetService = AktivitetService(AktivitetRepository(template), AktivitetskortIdRepository(template), DeltakelseLockRepository(template)), + aktivitetService = AktivitetService(AktivitetRepository(template), AktivitetskortIdRepository(template), AdvisoryLockRepository(template)), gjennomforingRepository = GjennomforingRepository(template), tiltakService = TiltakService(TiltakRepository(template)), oppfolgingsperiodeService = OppfolgingsperiodeService(oppfolgingClient), personsporingService = PersonsporingService(personSporingRepository, ordsClient), - aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) + aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository, advisoryLockRepository) ) } @@ -277,100 +252,4 @@ class DeltakerProcessorTest : FunSpec({ } } - test("Block processing when concurrent calls on same deltakelseId") { - val firstTimeSlowAktivitetskortService = AktivitetService(slowAktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) - val aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) - - val deltakelse = DeltakelseId(12345L) - val aktivitetskort1 = Aktivitetskort( - id = UUID.randomUUID(), - personIdent = "12345678901", - tittel = "Tittel", - aktivitetStatus = AktivitetStatus.GJENNOMFORES, - etiketter = emptyList(), - startDato = null, - sluttDato = null, - beskrivelse = null, - endretAv = Ident("ARENAIDENT","AKS999"), - endretTidspunkt = LocalDateTime.now(), - avtaltMedNav = true, - detaljer = emptyList() - ) - val headers1 = AktivitetskortHeaders( - arenaId = "ARENATA${deltakelse.value}", - tiltakKode = "VASV", - oppfolgingsperiode = UUID.randomUUID(), - oppfolgingsSluttDato = ZonedDateTime.now().minusDays(5) - ) - val aktivitetskort2 = aktivitetskort1.copy(id = UUID.randomUUID()) - val headers2 = headers1.copy( oppfolgingsperiode = UUID.randomUUID(), oppfolgingsSluttDato = null) - - - val processOrder = mutableListOf() - coroutineScope { - async(Dispatchers.IO) {// Do not use available default dispatcher, as it is not meant for blocking calls - logger.info("First upsert start") - firstTimeSlowAktivitetskortService.upsert(aktivitetskort1, headers1, deltakelse) - logger.info("First upsert end") - processOrder.add(1L) - } - async(Dispatchers.IO) { - logger.info("Second upsert start") - firstTimeSlowAktivitetskortService.upsert(aktivitetskort2, headers2, deltakelse) - logger.info("Second upsert end") - processOrder.add(2L) - } - }.await() - val gimmeId = aktivitetskortIdService.getOrCreate(deltakelse, AktivitetKategori.TILTAKSAKTIVITET) - gimmeId shouldBe aktivitetskort2.id // den uten oppfølgingsluttdato - processOrder shouldContainInOrder listOf(1L, 2L) - } - - test("Do not block processing when concurrent calls on different deltakelseId") { - val firstTimeSlowAktivitetskortService = AktivitetService(slowAktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) - val aktivitetskortIdService = AktivitetskortIdService(aktivitetRepository, aktivitetskortIdRespository, deltakelseLockRepository) - - val deltakelse1 = DeltakelseId(12345L) - val deltakelse2 = DeltakelseId(23456L) - val aktivitetskort1 = Aktivitetskort( - id = UUID.randomUUID(), - personIdent = "12345678901", - tittel = "Tittel", - aktivitetStatus = AktivitetStatus.GJENNOMFORES, - etiketter = emptyList(), - startDato = null, - sluttDato = null, - beskrivelse = null, - endretAv = Ident("ARENAIDENT","AKS999"), - endretTidspunkt = LocalDateTime.now(), - avtaltMedNav = true, - detaljer = emptyList() - ) - val headers1 = AktivitetskortHeaders( - arenaId = "ARENATA${deltakelse1.value}", - tiltakKode = "VASV", - oppfolgingsperiode = UUID.randomUUID(), - oppfolgingsSluttDato = null - ) - val aktivitetskort2 = aktivitetskort1.copy(id = UUID.randomUUID()) - val headers2 = headers1.copy( oppfolgingsperiode = UUID.randomUUID(), arenaId = "ARENATA${deltakelse2.value}") - - val processOrder = mutableListOf() - coroutineScope { - async(Dispatchers.IO) {// Do not use available default dispatcher, as it is not meant for blocking calls - logger.info("First upsert start") - firstTimeSlowAktivitetskortService.upsert(aktivitetskort1, headers1, deltakelse1) - logger.info("First upsert end") - processOrder.add(1L) - } - async(Dispatchers.IO) { - logger.info("Second upsert start") - firstTimeSlowAktivitetskortService.upsert(aktivitetskort2, headers2, deltakelse1) - logger.info("Second upsert end") - processOrder.add(2L) - } - }.await() - processOrder shouldContainInOrder listOf(2L, 1L) - } - }) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt index fcc3eb5e..37e16aa9 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt @@ -1,31 +1,45 @@ package no.nav.arena_tiltak_aktivitet_acl.services import io.kotest.common.runBlocking -import io.kotest.core.spec.style.StringSpec +import io.kotest.matchers.collections.shouldContainInOrder import io.kotest.matchers.shouldBe import io.mockk.every import io.mockk.mockk -import kotlinx.coroutines.* -import no.nav.arena_tiltak_aktivitet_acl.database.SingletonPostgresContainer +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.delay import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.* import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId +import no.nav.arena_tiltak_aktivitet_acl.integration.IntegrationTestBase +import no.nav.arena_tiltak_aktivitet_acl.repositories.AdvisoryLockRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetDbo import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository -import no.nav.arena_tiltak_aktivitet_acl.repositories.DeltakelseLockRepository +import org.junit.jupiter.api.Test import org.slf4j.LoggerFactory -import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.transaction.support.TransactionTemplate import java.time.LocalDateTime import java.time.ZonedDateTime -import java.util.UUID +import java.util.* import java.util.concurrent.atomic.AtomicBoolean +import kotlin.random.Random -class AktivitetServiceTest : StringSpec({ +class AktivitetServiceTest : IntegrationTestBase() { val log = LoggerFactory.getLogger(javaClass) - val dataSource = SingletonPostgresContainer.getDataSource() - val template = NamedParameterJdbcTemplate(dataSource) - val aktivitetRepository = AktivitetRepository(template) + @Autowired + lateinit var advisoryLockRepository: AdvisoryLockRepository + + @Autowired + lateinit var aktivitetRepository: AktivitetRepository + + @Autowired + lateinit var aktivitetskortIdRepository: AktivitetskortIdRepository + + @Autowired + lateinit var transactionTemplate: TransactionTemplate fun slowAktivitetRepository(): AktivitetRepository { val mock = mockk() @@ -45,8 +59,6 @@ class AktivitetServiceTest : StringSpec({ } return mock } - val aktivitetskortIdRepository = AktivitetskortIdRepository(template) - val deltakerLockRepository = DeltakelseLockRepository(template) val aktivitetskort = Aktivitetskort( id = UUID.randomUUID(), @@ -71,17 +83,20 @@ class AktivitetServiceTest : StringSpec({ null ) } + fun aktivitetskort(): Aktivitetskort { return aktivitetskort .copy(id = UUID.randomUUID()) } + fun firstSlowAktivitetsService() = AktivitetService( slowAktivitetRepository(), aktivitetskortIdRepository, - deltakerLockRepository + advisoryLockRepository ) - "skal blokkere prossessring på samme deltakelse-id" { + @Test + fun `skal blokkere prossessring på samme deltakelse-id`() { val firstSlowAktivitetsService = firstSlowAktivitetsService() val deltakelseId = DeltakelseId(1123) val aktivitetskort = aktivitetskort() @@ -89,55 +104,102 @@ class AktivitetServiceTest : StringSpec({ val startOrder = mutableListOf() val excutionOrder = mutableListOf() - val first = async(Dispatchers.IO) { - startOrder.add(1) - firstSlowAktivitetsService.upsert( - aktivitetskort, - headers, - deltakelseId, - ) - excutionOrder.add(1) - } - val second = async(Dispatchers.IO) { - delay(10) - startOrder.add(2) - firstSlowAktivitetsService.upsert( - aktivitetskort, - headers.copy(oppfolgingsSluttDato = ZonedDateTime.now()), - deltakelseId, - ) - excutionOrder.add(2) + kotlinx.coroutines.runBlocking { + val first = async(Dispatchers.IO) { + transactionTemplate.executeWithoutResult { + startOrder.add(1) + firstSlowAktivitetsService.upsert( + aktivitetskort, + headers, + deltakelseId, + ) + excutionOrder.add(1) + } + } + val second = async(Dispatchers.IO) { + delay(100) + transactionTemplate.executeWithoutResult { + startOrder.add(2) + firstSlowAktivitetsService.upsert( + aktivitetskort, + headers.copy(oppfolgingsSluttDato = ZonedDateTime.now()), + deltakelseId, + ) + excutionOrder.add(2) + } + } + listOf(first, second).awaitAll() } - listOf(first, second).awaitAll() - startOrder shouldBe listOf(1,2) - excutionOrder shouldBe listOf(1,2) + startOrder shouldBe listOf(1, 2) + excutionOrder shouldBe listOf(1, 2) } - "skal ikke blokke andre deltakelse-id-er" { + @Test + fun `skal ikke blokke andre deltakelse-id-er`() { val firstSlowAktivitetsService = firstSlowAktivitetsService() val deltakelseId = DeltakelseId(2123) val deltakelseId2 = DeltakelseId(2321) val excutionOrder = mutableListOf() - val first = async(Dispatchers.IO) { - firstSlowAktivitetsService.upsert( - aktivitetskort(), - headers(deltakelseId), - deltakelseId, - ) - excutionOrder.add(deltakelseId.value) - } - val second = async(Dispatchers.IO) { - delay(10) - firstSlowAktivitetsService.upsert( - aktivitetskort(), - headers(deltakelseId2), - deltakelseId2, - ) - excutionOrder.add(deltakelseId2.value) + kotlinx.coroutines.runBlocking { + val first = async(Dispatchers.IO) { + transactionTemplate.executeWithoutResult { + firstSlowAktivitetsService.upsert( + aktivitetskort(), + headers(deltakelseId), + deltakelseId, + ) + excutionOrder.add(deltakelseId.value) + } + } + val second = async(Dispatchers.IO) { + delay(10) + transactionTemplate.executeWithoutResult { + firstSlowAktivitetsService.upsert( + aktivitetskort(), + headers(deltakelseId2), + deltakelseId2, + ) + excutionOrder.add(deltakelseId2.value) + } + } + listOf(first, second).awaitAll() } - listOf(first, second).awaitAll() excutionOrder shouldBe listOf(deltakelseId2.value, deltakelseId.value) } -}) + @Test + fun `test advisory locking`() { + val OFFSET = 99999L + val startOrder = mutableListOf() + val finishOrder = mutableListOf() + + val lockId = Random.nextLong(0, 1000) + OFFSET + kotlinx.coroutines.runBlocking { + val first = async(Dispatchers.IO) { + startOrder.add(1) + transactionTemplate.executeWithoutResult { + advisoryLockRepository.aquireTransactionalAdvisoryLock(lockId) + log.info("Lock acquired first") + log.info("waiting in first") + Thread.sleep(100) + log.info("Releasing first lock") + } + finishOrder.add(1) + } + val second = async(Dispatchers.IO) { + delay(20) + startOrder.add(2) + transactionTemplate.executeWithoutResult { + advisoryLockRepository.aquireTransactionalAdvisoryLock(lockId) + log.info("Lock acquired second") + } + finishOrder.add(2) + } + listOf(first, second).awaitAll() + } + log.info("Lock automatically released on transaction commit or rollback") + finishOrder shouldContainInOrder startOrder + + } +} From 9ffb7229de30065f5e2d446b6c00d1972813a3f4 Mon Sep 17 00:00:00 2001 From: Hans Petter Simonsen Date: Fri, 3 Nov 2023 22:52:56 +0100 Subject: [PATCH 11/11] More explicit assertion in test --- .../services/AktivitetServiceTest.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt index 37e16aa9..0641b3bb 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/AktivitetServiceTest.kt @@ -199,7 +199,8 @@ class AktivitetServiceTest : IntegrationTestBase() { listOf(first, second).awaitAll() } log.info("Lock automatically released on transaction commit or rollback") - finishOrder shouldContainInOrder startOrder + startOrder shouldContainInOrder listOf(1, 2) + finishOrder shouldContainInOrder listOf(1, 2) } }