Skip to content

Commit

Permalink
Merge pull request #91 from navikt/slettemeldinger
Browse files Browse the repository at this point in the history
Håndter slettemeldinger
  • Loading branch information
tu55eladd authored Feb 23, 2024
2 parents 7dd2761 + b2f855e commit 0a0f9ef
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,9 @@ open class DeltakerProcessor(
val arenaGjennomforingId = arenaDeltaker.TILTAKGJENNOMFORING_ID
val deltakelse = arenaDeltaker.mapTiltakDeltakelse()

if (message.operationType == Operation.DELETED) {
throw IgnoredException("Skal ignorere deltakelse med operation type DELETE")
}
// Ikke behandle aktiviteter som ikke var "aktive" ved lansering
deltakelse.sjekkIkkeFerdigFørLansering()

var opprettetFoerMenAktivEtterLansering = false
if (deltakelse.regDato.isBefore(AKTIVITETSPLAN_LANSERINGSDATO) && deltakelse.modDato.isBefore(
AKTIVITETSPLAN_LANSERINGSDATO)) {
// Hvis deltakelsen er opprettet før aktivitetsplan lanseringsdato,
// _men_ datoTil er etter aktivitetsplan lanseringsdato,
// _og_ bruker hadde en aktiv oppfølgingsperiode ved aktivitetsplan lanseringsdato
// så skal vi opprette aktivitetskort
if (deltakelse.datoTil?.isAfter(AKTIVITETSPLAN_LANSERINGSDATO.toLocalDate()) == true) {
opprettetFoerMenAktivEtterLansering = true
} else throw IgnoredException("Deltakeren registrert=${deltakelse.regDato} opprettet før aktivitetsplan skal ikke håndteres")
}
val ingestStatus: IngestStatus? = runCatching {
arenaDataRepository.get(
message.arenaTableName,
Expand All @@ -87,14 +75,20 @@ open class DeltakerProcessor(
val tiltak = tiltakService.getByKode(gjennomforing.tiltakKode)
?: throw DependencyNotIngestedException("Venter på at tiltak med id=${gjennomforing.tiltakKode} skal bli håndtert")

if (message.operationType == Operation.DELETED && deltakelse.erAvsluttet()) {
log.info("Mottok slettemelding men deltaker var allerede i en ferdig-status")
arenaDataRepository.upsert(message.toUpsertInputWithStatusHandled(deltakelse.tiltakdeltakelseId, "ignorert slettemelding"))
return
}

val personIdent = personsporingService.get(deltakelse.personId, arenaGjennomforingId).fodselsnummer

/*
Hvis oppfølgingsperiode ikke finnes,
hopper vi ut her, enten med retry eller ignored, siden handleOppfolgingsperiodeNull kaster exception alltid.
*/
val periodeMatch =
if (opprettetFoerMenAktivEtterLansering) {
if (deltakelse.opprettetFørMenAktivEtterLansering()) {
getOppfolgingsperiodeForPersonVedLansering(personIdent)
} else getOppfolgingsPeriodeOrThrow(deltakelse, personIdent)
val endring = utledEndringsType(periodeMatch, deltakelse.tiltakdeltakelseId, arenaDeltaker.DELTAKERSTATUSKODE, tiltak.administrasjonskode)
Expand All @@ -119,7 +113,9 @@ open class DeltakerProcessor(
arrangorNavn = gjennomforing.arrangorNavn,
gjennomforingNavn = gjennomforing.navn ?: "Ukjent navn",
tiltak = tiltak,
isDelete = message.operationType == Operation.DELETED
)

val aktivitetskortHeaders = AktivitetskortHeaders(
arenaId = "${KafkaProducerService.TILTAK_ID_PREFIX}${deltakelse.tiltakdeltakelseId}",
tiltakKode = tiltak.kode,
Expand Down Expand Up @@ -152,12 +148,11 @@ open class DeltakerProcessor(
&& administrasjonskode in listOf(Tiltak.Administrasjonskode.IND, Tiltak.Administrasjonskode.INST)
}

private fun handleOppfolgingsperiodeNull(deltaker: TiltakDeltakelse, personIdent: String, tidspunkt: LocalDateTime, tiltakDeltakelseId: DeltakelseId): Nothing {
private fun handleOppfolgingsperiodeNull(deltakelse: TiltakDeltakelse, personIdent: String, tidspunkt: LocalDateTime, tiltakDeltakelseId: DeltakelseId): Nothing {
secureLog.info("Fant ikke oppfølgingsperiode for personIdent=$personIdent")
val aktivitetStatus = ArenaDeltakerConverter.toAktivitetStatus(deltaker.deltakerStatusKode)
val erFerdig = deltaker.datoTil?.isBefore(LocalDate.now()) ?: false
val erFerdig = deltakelse.datoTil?.isBefore(LocalDate.now()) ?: false
when {
aktivitetStatus.erAvsluttet() || erFerdig ->
deltakelse.erAvsluttet() || erFerdig ->
throw IgnoredException("Avsluttet deltakelse og ingen oppfølgingsperiode, id=${tiltakDeltakelseId.value}")
tidspunktTidligereEnnRettFoerStartDato(tidspunkt, LocalDateTime.now(), defaultSlakk) ->
throw IgnoredException("Opprettet for mer enn $defaultSlakk siden og ingen oppfølgingsperiode, id=${tiltakDeltakelseId.value}")
Expand All @@ -171,7 +166,7 @@ open class DeltakerProcessor(
val funnetPeriode = oppfolgingsperiodeService.finnOppfolgingsperiode(personIdent, oppslagsDato)
return when (funnetPeriode) {
is FinnOppfolgingResult.FunnetPeriodeResult -> funnetPeriode
is FinnOppfolgingResult.IngenPeriodeResult -> handleOppfolgingsperiodeNull(deltaker, personIdent, deltaker.modDato ?: deltaker.regDato, deltaker.tiltakdeltakelseId)
is FinnOppfolgingResult.IngenPeriodeResult -> handleOppfolgingsperiodeNull(deltaker, personIdent, deltaker.modDato, deltaker.tiltakdeltakelseId)
}
}

Expand Down Expand Up @@ -212,6 +207,30 @@ open class DeltakerProcessor(
fun syncOppfolgingsperioder(deltakelseId: DeltakelseId, oppfolginsperioder: List<Oppfolgingsperiode>) {
aktivitetService.closeClosedPerioder(deltakelseId, AktivitetKategori.TILTAKSAKTIVITET, oppfolginsperioder)
}

private fun TiltakDeltakelse.opprettetFørLansering(): Boolean {
return this.regDato.isBefore(AKTIVITETSPLAN_LANSERINGSDATO)
&& this.modDato.isBefore(AKTIVITETSPLAN_LANSERINGSDATO)
}
private fun TiltakDeltakelse.varAktivEtterLansering(): Boolean {
return this.datoTil?.isAfter(AKTIVITETSPLAN_LANSERINGSDATO.toLocalDate()) == true
}
private fun TiltakDeltakelse.sjekkIkkeFerdigFørLansering() {
if (this.opprettetFørLansering() && !this.varAktivEtterLansering()) {
throw IgnoredException("Deltakeren registrert=${this.regDato} opprettet før aktivitetsplan skal ikke håndteres")
}
}
private fun TiltakDeltakelse.opprettetFørMenAktivEtterLansering(): Boolean {
// Hvis deltakelsen er opprettet før aktivitetsplan lanseringsdato,
// _men_ datoTil er etter aktivitetsplan lanseringsdato,
// _og_ bruker hadde en aktiv oppfølgingsperiode ved aktivitetsplan lanseringsdato
// så skal vi opprette aktivitetskort
return this.opprettetFørLansering() && this.varAktivEtterLansering()
}

private fun TiltakDeltakelse.erAvsluttet(): Boolean {
return ArenaDeltakerConverter.toAktivitetStatus(this.deltakerStatusKode).erAvsluttet()
}
}

sealed class EndringsType(val aktivitetskortId: UUID, val skalIgnoreres: Boolean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ object ArenaDeltakerConverter {
}
}

fun toDeleteStatus(deltakerStatusKode: String): AktivitetStatus {
val lastStatus = toAktivitetStatus(deltakerStatusKode)
return when (lastStatus) {
AktivitetStatus.FULLFORT -> AktivitetStatus.FULLFORT
AktivitetStatus.GJENNOMFORES -> AktivitetStatus.FULLFORT
else -> AktivitetStatus.AVBRUTT
}
}

fun toDeltakelseStatus(status: String): DeltakelseStatus? {
return when (status) {
"AKTUELL" -> DeltakelseStatus.SOKT_INN
Expand Down Expand Up @@ -63,12 +72,13 @@ object ArenaDeltakerConverter {
arrangorNavn: String?,
gjennomforingNavn: String,
tiltak: Tiltak,
isDelete: Boolean // Slettemeldinger inneholder bare forrige state og skal settes i en ferdig-status
): Aktivitetskort {
return Aktivitetskort(
id = aktivitetskortId,
personIdent = personIdent,
tittel = toTittel(gjennomforingNavn, tiltak.kode),
aktivitetStatus = toAktivitetStatus(deltaker.deltakerStatusKode),
aktivitetStatus = if (isDelete) toDeleteStatus(deltaker.deltakerStatusKode) else toAktivitetStatus(deltaker.deltakerStatusKode),
startDato = deltaker.datoFra,
sluttDato = deltaker.datoTil,
avtaltMedNav = true, // Arenatiltak er alltid Avtalt med NAV
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName
import no.nav.arena_tiltak_aktivitet_acl.utils.DatabaseUtils.sqlParameters
import org.springframework.jdbc.core.RowMapper
import org.springframework.jdbc.core.namedparam.EmptySqlParameterSource
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
import org.springframework.stereotype.Component
import java.sql.ResultSet
Expand Down Expand Up @@ -253,9 +252,7 @@ open class ArenaDataRepository(
return template.update(sql, mapOf("arenaTableName" to arenaTableName.tableName))
}

fun alreadyProcessed(deltakelseArenaId: String, tableName: ArenaTableName, after: JsonNode?): Boolean {
if (after == null) return true // Should not reach this function but if it does, ignore it
//language=PostgreSQL
fun alreadyProcessed(deltakelseArenaId: String, tableName: ArenaTableName, before: JsonNode?, after: JsonNode?): Boolean {
val sql = """
WITH latestRow AS (
SELECT arena_id, MAX(id) latestId
Expand All @@ -267,12 +264,13 @@ open class ArenaDataRepository(
SELECT 1
FROM arena_data
JOIN latestRow ON arena_data.id = latestRow.latestId
AND after @> :after::jsonb
${if (after != null) "AND after @> :after::jsonb" else "AND after IS NULL"}
${if (before != null) "AND before @> :before::jsonb" else "AND before IS NULL"}
)
""".trimIndent()
return template.queryForObject(
sql,
mapOf("arenaId" to deltakelseArenaId, "tableName" to tableName.tableName, "after" to after.toString()),
mapOf("arenaId" to deltakelseArenaId, "tableName" to tableName.tableName, "before" to before?.toString(), "after" to after?.toString()),
) { row: ResultSet, _ -> row.getBoolean(1) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ open class ArenaDataSchedules(

private val log = LoggerFactory.getLogger(javaClass)

@Scheduled(fixedDelay = 10 * 1000L, initialDelay = ONE_MINUTE)
@Scheduled(fixedDelay = 10 * 1000L, initialDelayString = "\${app.env.scheduled.default.initialDelay}")
open fun processArenaMessages() {
if (leaderElectionClient.isLeader && unleash.isEnabled("aktivitet-arena-acl.batch.enabled")) {
JobRunner.run("process_arena_messages", retryArenaMessageProcessorService::processMessages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ open class ArenaMessageProcessorService(
fun handleArenaGoldenGateRecord(record: ConsumerRecord<String, String>) {
val recordValue = record.value().removeNullCharacters()
val messageDto = mapper.readValue(recordValue, ArenaKafkaMessageDto::class.java)
val messageAlreadyInStore = arenaDataRepository.alreadyProcessed(record.key(), messageDto.table, messageDto.after)
val messageAlreadyInStore = arenaDataRepository.alreadyProcessed(record.key(), messageDto.table, messageDto.before, messageDto.after)
if (messageAlreadyInStore) {
log.warn("Ignorerer melding topic:${record.topic()} partition:${record.partition()} offset:${record.offset()} op_ts: ${messageDto.opTs} allerede lagret under table:${messageDto.table} optype:${messageDto.opType} arenaId:${record.key()} pos:${messageDto.pos}")
return
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ app.env.arenaTiltaksgruppeTopic=${ARENA_TILTAKSGRUPPE_TOPIC:null}
app.env.arenaTiltakGjennomforingTopic=${ARENA_TILTAKGJENNOMFORING_TOPIC:null}
app.env.arenaTiltakDeltakerTopic=${ARENA_TILTAK_DELTAKER_TOPIC:null}
app.env.aktivitetskortTopic=${AKTIVITETSKORT_TOPIC:null}
app.env.scheduled.default.initialDelay=60000
app.unleashUrl=https://unleash.nais.io/api/

unleash.appName=${NAIS_APP_NAME}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ import no.nav.arena_tiltak_aktivitet_acl.domain.db.IngestStatus
import no.nav.arena_tiltak_aktivitet_acl.domain.dto.TranslationQuery
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.*
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.AktivitetResult
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.DeltakerInput
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.NyDeltakerCommand
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.OppdaterDeltakerCommand
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.*
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing.GjennomforingInput
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing.NyGjennomforingCommand
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak.NyttTiltakCommand
Expand Down Expand Up @@ -1004,6 +1001,62 @@ class DeltakerIntegrationTests : IntegrationTestBase() {

}

@Test
fun `Skal sette aktivitet med planlegger-status til avbrutt-status når vi mottar melding med status DELETED`() {
val (gjennomforingId, deltakerId) = setup()
val deltakerInput = DeltakerInput(
tiltakDeltakelseId = deltakerId,
tiltakgjennomforingId = gjennomforingId,
innsokBegrunnelse = "innsøkbegrunnelse",
datoFra = LocalDate.now().minusDays(1),
endretAv = Ident(ident = "SIG123"),
deltakerStatusKode = "TILBUD"
)
val deltakerCommand = NyDeltakerCommand(deltakerInput)
deltakerExecutor.execute(deltakerCommand).expectHandled { arenaData ->
arenaData.output.aktivitetskort.aktivitetStatus shouldBe AktivitetStatus.PLANLAGT
}

val slettetDeltakerCommand = SletteDeltakerCommand(deltakerInput)
deltakerExecutor.execute(slettetDeltakerCommand).expectHandled { arenaData ->
arenaData.arenaDataDbo.operation shouldBe Operation.DELETED
arenaData.output.aktivitetskort.aktivitetStatus shouldBe AktivitetStatus.AVBRUTT
}
}

@Test
fun `Skal ignorere (handled men ingen aktivitetskort) slettemelding hvis aktivitet allerede er i ferdig-status`() {
val (gjennomforingId, deltakerId) = setup()
val deltakerInput = DeltakerInput(
tiltakDeltakelseId = deltakerId,
tiltakgjennomforingId = gjennomforingId,
innsokBegrunnelse = "innsøkbegrunnelse",
datoFra = LocalDate.now().minusDays(1),
endretAv = Ident(ident = "SIG123"),
deltakerStatusKode = "FULLF"
)
val deltakerCommand = NyDeltakerCommand(deltakerInput)
deltakerExecutor.execute(deltakerCommand).expectHandled { arenaData ->
arenaData.output.aktivitetskort.aktivitetStatus shouldBe AktivitetStatus.FULLFORT
}
val slettetDeltakerCommand = SletteDeltakerCommand(deltakerInput)
deltakerExecutor.execute(slettetDeltakerCommand, expectAktivitetskortOnTopic = false).expectHandledAndIngored {}
}

@Test
fun `Skal ikke lage aktivitetskort hvis eneste melding på deltakelse er slettemelding`() {
// Dette burde aldri skje
val (gjennomforingId, deltakerId) = setup()
val deltakerInput = DeltakerInput(
tiltakDeltakelseId = deltakerId,
tiltakgjennomforingId = gjennomforingId,
endretAv = Ident(ident = "SIG123"),
deltakerStatusKode = "FULLF"
)
val slettetDeltakerCommand = SletteDeltakerCommand(deltakerInput)
deltakerExecutor.execute(slettetDeltakerCommand, expectAktivitetskortOnTopic = false).expectHandledAndIngored {}
}

private val idMappingClient: IdMappingClient by lazy {
val token = issueAzureAdM2MToken()
IdMappingClient(port!!) { token }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ import no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing.Gjen
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing.NyGjennomforingCommand
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak.NyttTiltakCommand
import no.nav.arena_tiltak_aktivitet_acl.mocks.OrdsClientMock
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import java.util.*

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class GjennomforingIntegrationTests : IntegrationTestBase() {
@BeforeAll
fun setupTiltak() {
tiltakExecutor.execute(NyttTiltakCommand())
}

@Test
fun `Konsumer gjennomføring - gyldig gjennomføring - ingestes uten feil`() {

val gjennomforingInput = GjennomforingInput(
gjennomforingId = Random().nextLong()
)
val gjennomforingInput = GjennomforingInput(gjennomforingId = Random().nextLong())
val arbgivId = gjennomforingInput.arbeidsgiverIdArrangor
val virksomhetsnummer = "123"
val expected = gjennomforingInput.toDbo(gjennomforingInput.gjennomforingId, virksomhetsnummer, "virksomhetnavn")
Expand All @@ -33,26 +37,18 @@ class GjennomforingIntegrationTests : IntegrationTestBase() {
@Test
fun `Konsumer gjennomføring - Feilet på første forsøk - Skal settes til RETRY`() {
val virksomhetsId = 456785618L

tiltakExecutor.execute(NyttTiltakCommand())

OrdsClientMock.virksomhetsHandler[virksomhetsId] = { throw RuntimeException() }

val input = GjennomforingInput(
gjennomforingId = Random().nextLong(),
arbeidsgiverIdArrangor = virksomhetsId
)

gjennomforingExecutor.execute(NyGjennomforingCommand(input))
.arenaData { it.ingestStatus shouldBe IngestStatus.RETRY }
.result { _, output -> output shouldBe null }

}

@Test
fun `Konsumer gjennomføring - lokaltnavn er null - gjennomføringsnavn blir satt til tiltaksnavn i deltakerprocessor`() {
tiltakExecutor.execute(NyttTiltakCommand())

val input = GjennomforingInput(
gjennomforingId = Random().nextLong(),
navn = null
Expand All @@ -65,8 +61,6 @@ class GjennomforingIntegrationTests : IntegrationTestBase() {

@Test
fun `Konsumer gjennomføring - lokaltnavn har fnr - gjennomføringsnavn skal vaskes`() {
tiltakExecutor.execute(NyttTiltakCommand())

val input = GjennomforingInput(
gjennomforingId = Random().nextLong(),
navn = "10108094523 Brua frisør"
Expand All @@ -79,13 +73,10 @@ class GjennomforingIntegrationTests : IntegrationTestBase() {

@Test
fun `Konsumer gjennomføring - lokaltnavn har kun spesialkarakterer - gjennomføringsnavn skal vaskes`() {
tiltakExecutor.execute(NyttTiltakCommand())

val input = GjennomforingInput(
gjennomforingId = Random().nextLong(),
navn = "....-#$%___"
)

gjennomforingExecutor.execute(NyGjennomforingCommand(input))
.arenaData { it.ingestStatus shouldBe IngestStatus.HANDLED }
.result { _, output -> output?.navn shouldBe null }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker

import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation
import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName
import java.time.LocalDateTime

class SletteDeltakerCommand(private val input: DeltakerInput) : DeltakerCommand(input.tiltakDeltakelseId) {
override fun toArenaKafkaMessageDto(pos: String): ArenaKafkaMessageDto = ArenaKafkaMessageDto(
table = ArenaTableName.DELTAKER,
opType = ArenaOperation.D.name,
opTs = LocalDateTime.now().format(opTsFormatter),
pos = pos,
before = createPayload(input),
after = null
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,13 @@ class DeltakerProcessorTest : FunSpec({
}

test("Should process deleted deltaker") {
shouldThrowExactly<IgnoredException> {
createDeltakerProcessor().handleArenaMessage(
createArenaDeltakerKafkaMessage(
tiltakGjennomforingArenaId = nonIgnoredGjennomforingArenaId,
deltakerArenaId = 1L,
operation = Operation.DELETED
)
createDeltakerProcessor().handleArenaMessage(
createArenaDeltakerKafkaMessage(
tiltakGjennomforingArenaId = nonIgnoredGjennomforingArenaId,
deltakerArenaId = 1L,
operation = Operation.DELETED
)
}
)
}

test("Skal opprette translation hvis regDato (opprettetTidspunkt) er innen en oppfølgingsperiode") {
Expand Down
Loading

0 comments on commit 0a0f9ef

Please sign in to comment.