Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Foreløpige aktivitetskort-id-er #78

Merged
merged 11 commits into from
Nov 6, 2023

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ open class DeltakerProcessor(
private val tiltakService: TiltakService,
private val personsporingService: PersonsporingService,
private val oppfolgingsperiodeService: OppfolgingsperiodeService,
private val aktivitetskortIdService: AktivitetskortIdService
) : ArenaMessageProcessor<ArenaDeltakerKafkaMessage> {

companion object {
Expand Down Expand Up @@ -114,7 +115,7 @@ open class DeltakerProcessor(
oppfolgingsperiode = periodeMatch.oppfolgingsperiode.uuid,
oppfolgingsSluttDato = periodeMatch.oppfolgingsperiode.sluttDato
)
aktivitetService.upsert(aktivitet, aktivitetskortHeaders)
aktivitetService.upsert(aktivitet, aktivitetskortHeaders, deltakelse.tiltakdeltakelseId)

if (endring.skalIgnoreres) {
log.info("Deltakeren har status=${arenaDeltaker.DELTAKERSTATUSKODE} og administrasjonskode=${tiltak.administrasjonskode} som ikke skal håndteres")
Expand Down Expand Up @@ -178,22 +179,26 @@ open class DeltakerProcessor(
// Har tidligere deltakelse på samme oppfolgingsperiode
eksisterendeAktivitetsId != null -> EndringsType.OppdaterAktivitet(eksisterendeAktivitetsId, skalIgnoreres)
// Har ingen tidligere aktivitetskort
oppfolgingsperiodeTilAktivitetskortId.isEmpty() -> EndringsType.NyttAktivitetskort(periodeMatch.oppfolgingsperiode, skalIgnoreres)
oppfolgingsperiodeTilAktivitetskortId.isEmpty() -> EndringsType.NyttAktivitetskort(getAkivitetskortId(deltakelseId), periodeMatch.oppfolgingsperiode, skalIgnoreres)
// Har tidligere deltakelse men ikke på samme oppfølgingsperiode
else -> {
EndringsType.NyttAktivitetskortByttPeriode(periodeMatch.oppfolgingsperiode, skalIgnoreres)
}
}
}

fun getAkivitetskortId(deltakelseId: DeltakelseId): UUID {
return aktivitetskortIdService.getOrCreate(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET)
}

fun syncOppfolgingsperioder(deltakelseId: DeltakelseId, oppfolginsperioder: List<Oppfolgingsperiode>) {
aktivitetService.closeClosedPerioder(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET, oppfolginsperioder)
}
}

sealed class EndringsType(val aktivitetskortId: UUID, val skalIgnoreres: Boolean) {
class OppdaterAktivitet(aktivitetskortId: UUID, skalIgnoreres: Boolean): EndringsType(aktivitetskortId, skalIgnoreres)
class NyttAktivitetskort(val oppfolgingsperiode: Oppfolgingsperiode, skalIgnoreres: Boolean): EndringsType(UUID.randomUUID(), skalIgnoreres)
class NyttAktivitetskort(aktivitetskortId:UUID, val oppfolgingsperiode: Oppfolgingsperiode, skalIgnoreres: Boolean): EndringsType(aktivitetskortId, skalIgnoreres)
class NyttAktivitetskortByttPeriode(val oppfolgingsperiode: Oppfolgingsperiode, skalIgnoreres: Boolean): EndringsType(UUID.randomUUID(), skalIgnoreres)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package no.nav.arena_tiltak_aktivitet_acl.repositories

import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId
import org.intellij.lang.annotations.Language
import org.slf4j.LoggerFactory
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Propagation
import org.springframework.transaction.annotation.Transactional
import java.io.Closeable

@Component
open class AdvisoryLockRepository(
private val template: NamedParameterJdbcTemplate
) {
private val log = LoggerFactory.getLogger(javaClass)

/**
* Acquire a transactional advisory lock on a lockId. The duration of the lock is that of the containing transaction,
* whence the methode requires a transaction already to be present before the call.
*/
@Transactional(propagation = Propagation.MANDATORY)
open fun aquireTransactionalAdvisoryLock(lockId: Long) {
@Language("postgresql")
val sql = """
SELECT pg_advisory_xact_lock(:lockId);
""".trimIndent()
template.query(sql, mapOf("lockId" to lockId)) {}
}

@Transactional(propagation = Propagation.MANDATORY)
open fun lockDeltakelse(deltakelseId: DeltakelseId) {
aquireTransactionalAdvisoryLock(deltakelseId.value)
}

@Transactional(propagation = Propagation.MANDATORY)
open fun safeDeltakelse(deltakelseId: DeltakelseId): SafeDeltakelse {
lockDeltakelse(deltakelseId)
log.info("Acquired lock on deltakelseId: $deltakelseId")
return SafeDeltakelse(deltakelseId)
}

class SafeDeltakelse(val deltakelseId: DeltakelseId): Closeable {
private val log = LoggerFactory.getLogger(javaClass)
override fun close() {
log.info("Safe to release lock on deltakelseId: $deltakelseId")
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package no.nav.arena_tiltak_aktivitet_acl.repositories
import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.AvsluttetOppfolgingsperiode
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId
import no.nav.arena_tiltak_aktivitet_acl.utils.*
import no.nav.arena_tiltak_aktivitet_acl.utils.getNullableZonedDateTime
import no.nav.arena_tiltak_aktivitet_acl.utils.getUUID
import org.intellij.lang.annotations.Language
import org.slf4j.LoggerFactory
import org.springframework.dao.IncorrectResultSizeDataAccessException
Expand All @@ -15,11 +16,12 @@ import java.sql.ResultSet
import java.util.*

@Component
open class AktivitetRepository(
class AktivitetRepository(
private val template: NamedParameterJdbcTemplate
) {
private val log = LoggerFactory.getLogger(javaClass)
fun upsert(aktivitet: AktivitetDbo) {
log.info("In repo ${aktivitet.id} ${aktivitet.oppfolgingsperiodeUUID} ${aktivitet.arenaId} ${aktivitet.oppfolgingsSluttTidspunkt}")
@Language("PostgreSQL")
val sql = """
INSERT INTO aktivitet(id, person_ident, kategori_type, data, arena_id, tiltak_kode, oppfolgingsperiode_uuid, oppfolgingsperiode_slutt_tidspunkt)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package no.nav.arena_tiltak_aktivitet_acl.repositories

import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId
import no.nav.arena_tiltak_aktivitet_acl.utils.getUUID
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
import org.springframework.stereotype.Component
import java.util.UUID

@Component
class AktivitetskortIdRepository(
private val template: NamedParameterJdbcTemplate
) {

fun deleteDeltakelseId(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): Int {
val sql = """
DELETE FROM forelopig_aktivitet_id WHERE deltakelse_id = :deltakelseId and kategori = :kategori
""".trimIndent()
return template.update(sql,
mapOf(
"kategori" to aktivitetKategori.name,
"deltakelseId" to deltakelseId.value,
))
}

fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID {
val currentId = getCurrentId(deltakelseId, aktivitetKategori)
if (currentId != null) return currentId

val generatedId = UUID.randomUUID()
val insertNewId = """
INSERT INTO forelopig_aktivitet_id(id, kategori, deltakelse_id) VALUES (:id, :kategori, :deltakelseId)
""".trimIndent()
template.update(insertNewId,
mapOf(
"id" to generatedId,
"kategori" to aktivitetKategori.name,
"deltakelseId" to deltakelseId.value,
))
return generatedId
}

private fun getCurrentId(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID? {
val getCurrentId = """
SELECT id FROM forelopig_aktivitet_id WHERE deltakelse_id = :deltakelseId and kategori = :aktivitetKategori
""".trimIndent()
return template.query(
getCurrentId,
mapOf(
"deltakelseId" to deltakelseId.value,
"aktivitetKategori" to aktivitetKategori.name
)
) { row, _ -> row.getUUID("id") }
.firstOrNull()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ import no.nav.arena_tiltak_aktivitet_acl.auth.AuthService
import no.nav.arena_tiltak_aktivitet_acl.auth.Issuer
import no.nav.arena_tiltak_aktivitet_acl.domain.dto.TranslationQuery
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId
import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository
import no.nav.arena_tiltak_aktivitet_acl.services.AktivitetskortIdService
import no.nav.security.token.support.core.api.Protected
import no.nav.security.token.support.core.api.ProtectedWithClaims
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.server.ResponseStatusException
import java.util.*


Expand All @@ -28,7 +26,7 @@ import java.util.*
@RequestMapping("/api/translation")
class TranslationController(
private val authService: AuthService,
private val aktivitetRepository: AktivitetRepository,
private val aktivitetskortIdService: AktivitetskortIdService
) {

@ProtectedWithClaims(issuer = Issuer.AZURE_AD)
Expand All @@ -43,8 +41,7 @@ class TranslationController(
@RequestBody query: TranslationQuery
): UUID {
authService.validerErM2MToken()
return aktivitetRepository.getCurrentAktivitetsId(DeltakelseId(query.arenaId), query.aktivitetKategori)
?: throw ResponseStatusException(HttpStatus.NOT_FOUND, "No mapping found")
return aktivitetskortIdService.getOrCreate(DeltakelseId(query.arenaId), query.aktivitetKategori)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,37 @@ import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategor
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Aktivitetskort
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetskortHeaders
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId
import no.nav.arena_tiltak_aktivitet_acl.repositories.AdvisoryLockRepository
import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository
import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository
import org.springframework.stereotype.Service
import java.util.UUID
import org.springframework.transaction.annotation.Propagation
import org.springframework.transaction.annotation.Transactional
import java.util.*

@Service
class AktivitetService(
val aktivitetRepository: AktivitetRepository
open class AktivitetService(
val aktivitetRepository: AktivitetRepository,
val aktivitetskortIdRepository: AktivitetskortIdRepository,
val deltakerLockRepository: AdvisoryLockRepository
) {
fun upsert(aktivitet: Aktivitetskort, headers: AktivitetskortHeaders) = aktivitetRepository.upsert(aktivitet.toDbo(headers))
fun get(aktivitetId: UUID) = aktivitetRepository.getAktivitet(aktivitetId)
fun getAllBy(aktivitetId: DeltakelseId, aktivitetsKategori: AktivitetKategori) =
/**
* SafeDeltakelse will make sure no other transaction is processing the same deltakelse for the duration of the ongoing transaction.
* If another transaction is processing the same deltakelse (i.e. TranslationController, AktivitetskortIdService) this transaction will wait its turn until the other transaction is complete.
* @see no.nav.arena_tiltak_aktivitet_acl.services.AktivitetskortIdService.getOrCreate
*/
@Transactional(propagation = Propagation.MANDATORY)
open fun upsert(aktivitet: Aktivitetskort, headers: AktivitetskortHeaders, deltakelseId: DeltakelseId) {
deltakerLockRepository.safeDeltakelse(deltakelseId).use {
aktivitetRepository.upsert(aktivitet.toDbo(headers))
aktivitetskortIdRepository.deleteDeltakelseId(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET)
}
}
open fun get(aktivitetId: UUID) = aktivitetRepository.getAktivitet(aktivitetId)
open fun getAllBy(aktivitetId: DeltakelseId, aktivitetsKategori: AktivitetKategori) =
aktivitetRepository.getAllBy(aktivitetId, aktivitetsKategori)

fun closeClosedPerioder(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori, oppfolgingsperioder: List<Oppfolgingsperiode>) {
open fun closeClosedPerioder(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori, oppfolgingsperioder: List<Oppfolgingsperiode>) {
val avsluttedePerioder = oppfolgingsperioder
.mapNotNull {
it.sluttDato
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package no.nav.arena_tiltak_aktivitet_acl.services

import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId
import no.nav.arena_tiltak_aktivitet_acl.repositories.AdvisoryLockRepository
import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository
import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetskortIdRepository
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.*

@Service
open class AktivitetskortIdService(
val aktivitetRepository: AktivitetRepository,
val aktivitetskortIdRepository: AktivitetskortIdRepository,
val advisoryLockRepository: AdvisoryLockRepository
) {
/**
* SafeDeltakelse will make sure no other transaction is processing the same deltakelse for the duration of the ongoing transaction.
* If another transaction is processing the same deltakelse (i.e. AktivitetService) this transaction will wait its turn until the other transaction is complete.
* @see no.nav.arena_tiltak_aktivitet_acl.services.AktivitetService.upsert
*/
@Transactional
open fun getOrCreate(deltakelseId: DeltakelseId, aktivitetKategori: AktivitetKategori): UUID {
// Lock on deltakelseId. Gjelder så lenge den pågående transaksjonen er aktiv.
advisoryLockRepository.safeDeltakelse(deltakelseId).use {
val currentId = aktivitetRepository.getCurrentAktivitetsId(deltakelseId, aktivitetKategori)
if (currentId != null) return currentId
// Opprett i ny tabell
return aktivitetskortIdRepository.getOrCreate(deltakelseId, aktivitetKategori)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

CREATE TABLE forelopig_aktivitet_id
(
id UUID PRIMARY KEY NOT NULL,
deltakelse_id BIGINT UNIQUE NOT NULL,
kategori VARCHAR NOT NULL
);
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object SingletonPostgresContainer {
config.jdbcUrl = container.jdbcUrl
config.username = container.username
config.password = container.password
config.maximumPoolSize = 3
config.maximumPoolSize = 10
config.minimumIdle = 1

return HikariDataSource(config)
Expand Down
Loading