Skip to content

Commit

Permalink
Fjern OperationPos type fra KafkaDto
Browse files Browse the repository at this point in the history
  • Loading branch information
holymaloney committed Nov 14, 2023
1 parent 0249320 commit 7b97fa0
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ data class ArenaKafkaMessageDto(
@JsonProperty("op_ts")
val opTs: String,

val pos: OperationPos,
val pos: String,
val before: JsonNode?,
val after: JsonNode?
)
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ open class ArenaMessageProcessorService(
arenaTableName = messageDto.table,
operationType = Operation.fromArenaOperationString(messageDto.opType),
operationTimestamp = parseArenaDateTime(messageDto.opTs),
operationPosition = messageDto.pos,
operationPosition = OperationPos.of(messageDto.pos),
before = messageDto.before?.let { mapper.treeToValue(it, D::class.java) },
after = messageDto.after?.let { mapper.treeToValue(it, D::class.java) }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker

import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos
import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName
import java.time.LocalDateTime

Expand All @@ -11,7 +10,7 @@ class NyDeltakerCommand(private val input: DeltakerInput) : DeltakerCommand(inpu
table = ArenaTableName.DELTAKER,
opType = ArenaOperation.I.name,
opTs = LocalDateTime.now().format(opTsFormatter),
pos = OperationPos.of(pos),
pos = pos,
before = null,
after = createPayload(input)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker

import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos
import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName
import java.time.LocalDateTime

Expand All @@ -15,7 +14,7 @@ class OppdaterDeltakerCommand(
table = ArenaTableName.DELTAKER,
opType = ArenaOperation.U.name,
opTs = LocalDateTime.now().format(opTsFormatter),
pos = OperationPos.of(pos),
pos = pos,
before = createPayload(oldDeltakerData),
after = createPayload(updatedDeltakerData)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing

import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos
import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName
import java.time.LocalDateTime

Expand All @@ -15,7 +14,7 @@ class NyGjennomforingCommand(
table = ArenaTableName.GJENNOMFORING,
opType = ArenaOperation.I.name,
opTs = LocalDateTime.now().format(opTsFormatter),
pos = OperationPos.of(pos),
pos = pos,
before = null,
after = createPayload(input)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Tiltak
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos
import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName
import java.time.LocalDateTime
import java.util.*
Expand All @@ -17,7 +16,7 @@ class NyttTiltakCommand(
table = ArenaTableName.TILTAK,
opType = ArenaOperation.I.name,
opTs = LocalDateTime.now().format(opTsFormatter),
pos = OperationPos.of(pos),
pos = pos,
before = null,
after = createPayload(kode, navn, administrasjonskode.name)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Tiltak
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos
import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName
import java.time.LocalDateTime

Expand All @@ -18,7 +17,7 @@ class OppdaterTiltakCommand(
table = ArenaTableName.TILTAK,
opType = ArenaOperation.U.name,
opTs = LocalDateTime.now().format(opTsFormatter),
pos = OperationPos.of(pos),
pos = pos,
before = createPayload(kode, gammeltNavn, administrasjonskode.name),
after = createPayload(kode, nyttNavn, administrasjonskode.name)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Tiltak
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaOperation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos
import no.nav.arena_tiltak_aktivitet_acl.utils.ArenaTableName
import java.time.LocalDateTime

Expand All @@ -17,7 +16,7 @@ class SlettTiltakCommand(
table = ArenaTableName.TILTAK,
opType = ArenaOperation.D.name,
opTs = LocalDateTime.now().format(opTsFormatter),
pos = OperationPos.of(pos),
pos = pos,
before = createPayload(kode, navn, administrasjonskode.name),
after = null
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package no.nav.arena_tiltak_aktivitet_acl.integration.executors

import io.kotest.common.runBlocking
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withTimeout
import no.nav.arena_tiltak_aktivitet_acl.domain.db.IngestStatus
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetskortHeaders
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.KafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Operation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.DeltakelseId
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.*
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.AktivitetResult
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.DeltakerCommand
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.HandledAndIgnored
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.deltaker.HandledResult
import no.nav.arena_tiltak_aktivitet_acl.integration.kafka.KafkaAktivitetskortIntegrationConsumer
import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetRepository
import no.nav.arena_tiltak_aktivitet_acl.repositories.ArenaDataRepository
Expand Down Expand Up @@ -59,7 +64,7 @@ class DeltakerTestExecutor(
val arenaData = pollArenaData(
ArenaTableName.DELTAKER,
Operation.fromArenaOperationString(wrapper.opType),
wrapper.pos
OperationPos.of(wrapper.pos)
)

val deltakelseId = DeltakelseId(arenaData.arenaId.toLong())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.executors

import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Operation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing.GjennomforingCommand
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.gjennomforing.GjennomforingResult
import no.nav.arena_tiltak_aktivitet_acl.repositories.ArenaDataRepository
Expand Down Expand Up @@ -36,11 +37,11 @@ class GjennomforingTestExecutor(
val arenaData = pollArenaData(
ArenaTableName.GJENNOMFORING,
Operation.fromArenaOperationString(arenaWrapper.opType),
arenaWrapper.pos
OperationPos.of(arenaWrapper.pos)
)

val output = gjennomforingRepository.get(arenaData.arenaId.toLong())
return GjennomforingResult(arenaWrapper.pos, arenaData, output)
return GjennomforingResult(OperationPos.of(arenaWrapper.pos), arenaData, output)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nav.arena_tiltak_aktivitet_acl.integration.executors

import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Operation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.ArenaKafkaMessageDto
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.OperationPos
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak.TiltakCommand
import no.nav.arena_tiltak_aktivitet_acl.integration.commands.tiltak.TiltakResult
import no.nav.arena_tiltak_aktivitet_acl.integration.utils.Retry.nullableAsyncRetryHandler
Expand Down Expand Up @@ -34,7 +35,7 @@ class TiltakTestExecutor(
val data = pollArenaData(
ArenaTableName.TILTAK,
Operation.fromArenaOperationString(arenaWrapper.opType),
arenaWrapper.pos
OperationPos.of(arenaWrapper.pos)
)
val storedTiltak = nullableAsyncRetryHandler("get tiltak by kode: $kode") { tiltakRepository.getByKode(kode) }
?: fail("Forventet at tiltak med kode $kode ligger i tiltak databasen.")
Expand Down

0 comments on commit 7b97fa0

Please sign in to comment.