diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/kafka/arena/ArenaKafkaMessageDto.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/kafka/arena/ArenaKafkaMessageDto.kt index 70de0472..c5750b0c 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/kafka/arena/ArenaKafkaMessageDto.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/domain/kafka/arena/ArenaKafkaMessageDto.kt @@ -21,7 +21,7 @@ data class ArenaKafkaMessageDto( @JsonProperty("op_ts") val opTs: String, - val pos: OperationPos, + val pos: String, val before: JsonNode?, val after: JsonNode? ) diff --git a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/ArenaMessageProcessorService.kt b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/ArenaMessageProcessorService.kt index f1f5173b..b833fc66 100644 --- a/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/ArenaMessageProcessorService.kt +++ b/src/main/kotlin/no/nav/arena_tiltak_aktivitet_acl/services/ArenaMessageProcessorService.kt @@ -110,7 +110,7 @@ open class ArenaMessageProcessorService( arenaTableName = messageDto.table, operationType = Operation.fromArenaOperationString(messageDto.opType), operationTimestamp = parseArenaDateTime(messageDto.opTs), - operationPosition = messageDto.pos, + operationPosition = OperationPos.of(messageDto.pos), before = messageDto.before?.let { mapper.treeToValue(it, D::class.java) }, after = messageDto.after?.let { mapper.treeToValue(it, D::class.java) } ) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/deltaker/NyDeltakerCommand.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/deltaker/NyDeltakerCommand.kt index 28c74c56..e88c1eee 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/deltaker/NyDeltakerCommand.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/deltaker/NyDeltakerCommand.kt @@ -2,7 +2,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName import java.time.LocalDateTime @@ -11,7 +10,7 @@ class NyDeltakerCommand(private val input: DeltakerInput) : DeltakerCommand(inpu table = ArenaTableName.DELTAKER, opType = ArenaOperation.I.name, opTs = LocalDateTime.now().format(opTsFormatter), - pos = OperationPos.of(pos), + pos = pos, before = null, after = createPayload(input) ) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/deltaker/OppdaterDeltakerCommand.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/deltaker/OppdaterDeltakerCommand.kt index e815f568..c5d4a3bd 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/deltaker/OppdaterDeltakerCommand.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/deltaker/OppdaterDeltakerCommand.kt @@ -2,7 +2,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName import java.time.LocalDateTime @@ -15,7 +14,7 @@ class OppdaterDeltakerCommand( table = ArenaTableName.DELTAKER, opType = ArenaOperation.U.name, opTs = LocalDateTime.now().format(opTsFormatter), - pos = OperationPos.of(pos), + pos = pos, before = createPayload(oldDeltakerData), after = createPayload(updatedDeltakerData) ) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/gjennomforing/NyGjennomforingCommand.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/gjennomforing/NyGjennomforingCommand.kt index ce7435fe..923c9f41 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/gjennomforing/NyGjennomforingCommand.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/gjennomforing/NyGjennomforingCommand.kt @@ -2,7 +2,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName import java.time.LocalDateTime @@ -15,7 +14,7 @@ class NyGjennomforingCommand( table = ArenaTableName.GJENNOMFORING, opType = ArenaOperation.I.name, opTs = LocalDateTime.now().format(opTsFormatter), - pos = OperationPos.of(pos), + pos = pos, before = null, after = createPayload(input) ) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/NyttTiltakCommand.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/NyttTiltakCommand.kt index 41cc67ce..b5530901 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/NyttTiltakCommand.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/NyttTiltakCommand.kt @@ -3,7 +3,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Tiltak import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName import java.time.LocalDateTime import java.util.* @@ -17,7 +16,7 @@ class NyttTiltakCommand( table = ArenaTableName.TILTAK, opType = ArenaOperation.I.name, opTs = LocalDateTime.now().format(opTsFormatter), - pos = OperationPos.of(pos), + pos = pos, before = null, after = createPayload(kode, navn, administrasjonskode.name) ) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/OppdaterTiltakCommand.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/OppdaterTiltakCommand.kt index dc8fb5be..895890ae 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/OppdaterTiltakCommand.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/OppdaterTiltakCommand.kt @@ -3,7 +3,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Tiltak import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName import java.time.LocalDateTime @@ -18,7 +17,7 @@ class OppdaterTiltakCommand( table = ArenaTableName.TILTAK, opType = ArenaOperation.U.name, opTs = LocalDateTime.now().format(opTsFormatter), - pos = OperationPos.of(pos), + pos = pos, before = createPayload(kode, gammeltNavn, administrasjonskode.name), after = createPayload(kode, nyttNavn, administrasjonskode.name) ) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/SlettTiltakCommand.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/SlettTiltakCommand.kt index 5516d1e4..afb47c63 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/SlettTiltakCommand.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/commands/tiltak/SlettTiltakCommand.kt @@ -3,7 +3,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Tiltak import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation -import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName import java.time.LocalDateTime @@ -17,7 +16,7 @@ class SlettTiltakCommand( table = ArenaTableName.TILTAK, opType = ArenaOperation.D.name, opTs = LocalDateTime.now().format(opTsFormatter), - pos = OperationPos.of(pos), + pos = pos, before = createPayload(kode, navn, administrasjonskode.name), after = null ) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/DeltakerTestExecutor.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/DeltakerTestExecutor.kt index b32b826f..21b135b2 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/DeltakerTestExecutor.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/DeltakerTestExecutor.kt @@ -1,7 +1,8 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.executors import io.kotest.common.runBlocking -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.first import kotlinx.coroutines.withTimeout import no.nav.arena_tiltak_aktivitet_acl.domain.db.IngestStatus import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori @@ -9,8 +10,12 @@ import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetskortHe import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.KafkaMessageDto import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Operation import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId -import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.* +import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.AktivitetResult +import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.DeltakerCommand +import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.HandledAndIgnored +import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.HandledResult import no.nav.arena_tiltak_aktivitet_acl.integration.kafka.KafkaAktivitetskortIntegrationConsumer import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository import no.nav.arena_tiltak_aktivitet_acl.repositories.ArenaDataRepository @@ -59,7 +64,7 @@ class DeltakerTestExecutor( val arenaData = pollArenaData( ArenaTableName.DELTAKER, Operation.fromArenaOperationString(wrapper.opType), - wrapper.pos + OperationPos.of(wrapper.pos) ) val deltakelseId = DeltakelseId(arenaData.arenaId.toLong()) diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/GjennomforingTestExecutor.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/GjennomforingTestExecutor.kt index 63306d3a..382632fa 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/GjennomforingTestExecutor.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/GjennomforingTestExecutor.kt @@ -2,6 +2,7 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.executors import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Operation import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos import no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing.GjennomforingCommand import no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing.GjennomforingResult import no.nav.arena_tiltak_aktivitet_acl.repositories.ArenaDataRepository @@ -36,11 +37,11 @@ class GjennomforingTestExecutor( val arenaData = pollArenaData( ArenaTableName.GJENNOMFORING, Operation.fromArenaOperationString(arenaWrapper.opType), - arenaWrapper.pos + OperationPos.of(arenaWrapper.pos) ) val output = gjennomforingRepository.get(arenaData.arenaId.toLong()) - return GjennomforingResult(arenaWrapper.pos, arenaData, output) + return GjennomforingResult(OperationPos.of(arenaWrapper.pos), arenaData, output) } } diff --git a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/TiltakTestExecutor.kt b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/TiltakTestExecutor.kt index aa34de1d..e3973ec8 100644 --- a/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/TiltakTestExecutor.kt +++ b/src/test/kotlin/no/nav/arena_tiltak_aktivitet_acl/integration/executors/TiltakTestExecutor.kt @@ -2,6 +2,7 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.executors import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Operation import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto +import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos import no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak.TiltakCommand import no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak.TiltakResult import no.nav.arena_tiltak_aktivitet_acl.integration.utils.Retry.nullableAsyncRetryHandler @@ -34,7 +35,7 @@ class TiltakTestExecutor( val data = pollArenaData( ArenaTableName.TILTAK, Operation.fromArenaOperationString(arenaWrapper.opType), - arenaWrapper.pos + OperationPos.of(arenaWrapper.pos) ) val storedTiltak = nullableAsyncRetryHandler("get tiltak by kode: $kode") { tiltakRepository.getByKode(kode) } ?: fail("Forventet at tiltak med kode $kode ligger i tiltak databasen.")