Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Oppdatert consumer til lagre batches istedet for melding pr commit, e…
Browse files Browse the repository at this point in the history
…ndret sletting til å foregå midnatt, refaktorert getStartOfYear()
  • Loading branch information
robertkittilsen committed Mar 7, 2024
1 parent cc9312d commit eadaf82
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import no.nav.paw.config.kafka.KAFKA_CONFIG_WITH_SCHEME_REG
import no.nav.paw.config.kafka.KafkaConfig
import no.nav.paw.config.kafka.KafkaFactory
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.LongDeserializer
import org.jetbrains.exposed.sql.Database
import javax.sql.DataSource

Expand All @@ -35,10 +35,10 @@ fun createDependencies(): Dependencies {
val kafkaFactory = KafkaFactory(kafkaConfig)

val consumer =
kafkaFactory.createConsumer<String, Periode>(
kafkaFactory.createConsumer<Long, Periode>(
groupId = applicationConfig.gruppeId,
clientId = applicationConfig.gruppeId,
keyDeserializer = StringDeserializer::class,
keyDeserializer = LongDeserializer::class,
valueDeserializer = PeriodeDeserializer::class
)

Expand All @@ -62,5 +62,5 @@ data class Dependencies(
val dataSource: DataSource,
val scheduleDeletionService: ScheduleDeletionService,
val aktivePerioderGaugeScheduler: AktivePerioderGaugeScheduler,
val consumer: KafkaConsumer<String, Periode>
val consumer: KafkaConsumer<Long, Periode>
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain

import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.toLocalDateTime
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import java.time.LocalDateTime
import java.util.UUID

Expand All @@ -22,3 +24,11 @@ fun Arbeidssoekerperiode.toArbeidssoekerperiodeResponse() =
startet = startet,
avsluttet = avsluttet
)

fun Periode.toArbeidssoekerperiode() =
Arbeidssoekerperiode(
identitetsnummer = Identitetsnummer(identitetsnummer),
periodeId = id,
startet = startet.tidspunkt.toLocalDateTime(),
avsluttet = avsluttet?.tidspunkt?.toLocalDateTime()
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,39 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.kafka
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.services.ArbeidssoekerService
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.logger
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration

class PeriodeConsumer(
private val topic: String,
private val consumer: KafkaConsumer<String, Periode>,
private val consumer: KafkaConsumer<Long, Periode>,
private val arbeidssoekerService: ArbeidssoekerService
) {
fun start() {
logger.info("Lytter på topic $topic")
consumer.subscribe(listOf(topic))

while (true) {
consumer.poll(Duration.ofMillis(500)).forEach { post ->
try {
logger.info("Mottok melding fra $topic med offset ${post.offset()} partition ${post.partition()}")
val arbeidssoekerperiode = post.value()
arbeidssoekerService.opprettEllerOppdaterArbeidssoekerperiode(arbeidssoekerperiode)

consumer.commitSync()
} catch (error: Exception) {
throw Exception("Feil ved konsumering av melding fra $topic", error)
val records: ConsumerRecords<Long, Periode> =
consumer.poll(Duration.ofMillis(500))
.onEach {
logger.info("Mottok melding fra $topic med offset ${it.offset()} partition ${it.partition()}")
}
val perioder =
records.map { record: ConsumerRecord<Long, Periode> ->
record.value()
}
}
processAndCommitBatch(perioder)
}
}

private fun processAndCommitBatch(batch: Iterable<Periode>) =
try {
arbeidssoekerService.storeBatch(batch)
consumer.commitSync()
} catch (error: Exception) {
throw Exception("Feil ved konsumering av melding fra $topic", error)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.repositories
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.database.PeriodeTable
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.Arbeidssoekerperiode
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.Identitetsnummer
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.toArbeidssoekerperiode
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.logger
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.toInstant
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.toLocalDateTime
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SqlExpressionBuilder.less
import org.jetbrains.exposed.sql.deleteWhere
Expand All @@ -19,6 +21,21 @@ import java.time.LocalDate
import java.util.UUID

class ArbeidssoekerperiodeRepository(private val database: Database) {
fun storeBatch(arbeidssoekerperioder: Iterable<Periode>) {
transaction(database) {
repetitionAttempts = 2
minRepetitionDelay = 200

arbeidssoekerperioder.forEach { periode ->
if (finnesArbeidssoekerperiode(periode.id)) {
oppdaterArbeidssoekerperiode(periode.toArbeidssoekerperiode())
} else {
opprettArbeidssoekerperiode(periode.toArbeidssoekerperiode())
}
}
}
}

fun hentArbeidssoekerperioder(
identitetsnummer: Identitetsnummer,
fraStartetDato: LocalDate?
Expand Down Expand Up @@ -68,9 +85,6 @@ class ArbeidssoekerperiodeRepository(private val database: Database) {

fun opprettArbeidssoekerperiode(periode: Arbeidssoekerperiode) {
transaction(database) {
repetitionAttempts = 2
minRepetitionDelay = 200

PeriodeTable.insert {
it[periodeId] = periode.periodeId
it[identitetsnummer] = periode.identitetsnummer.verdi
Expand All @@ -81,18 +95,18 @@ class ArbeidssoekerperiodeRepository(private val database: Database) {
}

fun oppdaterArbeidssoekerperiode(periode: Arbeidssoekerperiode) {
transaction(database) {
if (periode.avsluttet == null) {
throw IllegalArgumentException("Avsluttet kan ikke være null ved oppdatering av periode")
}
try {
if (periode.avsluttet == null) {
throw IllegalArgumentException("Avsluttet kan ikke være null ved oppdatering av periode")
}
try {
transaction(database) {
PeriodeTable.update({ PeriodeTable.periodeId eq periode.periodeId }) {
it[avsluttet] = periode.avsluttet.toInstant()
}
} catch (e: SQLException) {
logger.error("Feil ved oppdatering av periode", e)
throw e
}
} catch (e: SQLException) {
logger.error("Feil ved oppdatering av periode", e)
throw e
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,15 @@
package no.nav.paw.arbeidssoekerregisteret.eksternt.api.services

import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.Arbeidssoekerperiode
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.ArbeidssoekerperiodeResponse
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.Identitetsnummer
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.toArbeidssoekerperiodeResponse
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.repositories.ArbeidssoekerperiodeRepository
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.ZoneId

class ArbeidssoekerService(private val arbeidssoekerperiodeRepository: ArbeidssoekerperiodeRepository) {
fun opprettEllerOppdaterArbeidssoekerperiode(periodeMelding: Periode) {
val eksisterendePeriode = arbeidssoekerperiodeRepository.finnesArbeidssoekerperiode(periodeMelding.id)
val periode =
Arbeidssoekerperiode(
identitetsnummer = Identitetsnummer(periodeMelding.identitetsnummer),
periodeId = periodeMelding.id,
startet = LocalDateTime.ofInstant(periodeMelding.startet.tidspunkt, ZoneId.systemDefault()),
avsluttet = periodeMelding.avsluttet?.tidspunkt?.let { LocalDateTime.ofInstant(it, ZoneId.systemDefault()) }
)
if (eksisterendePeriode) {
arbeidssoekerperiodeRepository.oppdaterArbeidssoekerperiode(periode)
} else {
arbeidssoekerperiodeRepository.opprettArbeidssoekerperiode(periode)
}
fun storeBatch(arbeidssoekerperioder: Iterable<Periode>) {
arbeidssoekerperiodeRepository.storeBatch(arbeidssoekerperioder)
}

fun hentArbeidssoekerperioder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.services

import no.nav.paw.arbeidssoekerregisteret.eksternt.api.repositories.ArbeidssoekerperiodeRepository
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.TimeUtils.getMaxDateForDatabaseStorage
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.getDelayUntilMidnight
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.getDeletionInterval
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.logger
import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.toInstant
Expand All @@ -22,5 +23,5 @@ class ScheduleDeletionService(arbeidssoekerperiodeRepository: Arbeidssoekerperio
}
}

fun scheduleDatabaseDeletionTask() = timer.scheduleAtFixedRate(task, 0L, getDeletionInterval())
fun scheduleDatabaseDeletionTask() = timer.scheduleAtFixedRate(task, getDelayUntilMidnight(), getDeletionInterval())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,27 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils

import java.time.Duration
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.ZoneId
import java.time.temporal.TemporalAdjusters

fun Instant.toLocalDateTime(): LocalDateTime = LocalDateTime.ofInstant(this, ZoneId.systemDefault())

fun LocalDateTime.toInstant(): Instant = this.atZone(ZoneId.systemDefault()).toInstant()

fun getDeletionInterval(): Long = 1000L * 60 * 60 * 24 // 24 timer

fun getDelayUntilMidnight(): Long {
val now = LocalDateTime.now()
val midnight = LocalDateTime.of(LocalDate.now().plusDays(1), LocalTime.of(0, 0, 0, 0))
return Duration.between(now, midnight).toMillis()
}

object TimeUtils {
private val now = LocalDateTime.now()

private fun getStartOfYear(): LocalDateTime = now.with(TemporalAdjusters.firstDayOfYear()).withHour(0).withMinute(0).withSecond(0).withNano(0)
private fun getStartOfYear(): LocalDateTime = LocalDateTime.of(LocalDate.of(now.year, 1, 1), LocalTime.of(0, 0, 0, 0))

private fun getDurationFromNowToStartOfYear(): Duration = Duration.between(now, getStartOfYear())

Expand Down

0 comments on commit eadaf82

Please sign in to comment.