diff --git a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/Dependencies.kt b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/Dependencies.kt index 8ae53f8..c67d50a 100644 --- a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/Dependencies.kt +++ b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/Dependencies.kt @@ -17,7 +17,7 @@ import no.nav.paw.config.kafka.KAFKA_CONFIG_WITH_SCHEME_REG import no.nav.paw.config.kafka.KafkaConfig import no.nav.paw.config.kafka.KafkaFactory import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.LongDeserializer import org.jetbrains.exposed.sql.Database import javax.sql.DataSource @@ -35,10 +35,10 @@ fun createDependencies(): Dependencies { val kafkaFactory = KafkaFactory(kafkaConfig) val consumer = - kafkaFactory.createConsumer( + kafkaFactory.createConsumer( groupId = applicationConfig.gruppeId, clientId = applicationConfig.gruppeId, - keyDeserializer = StringDeserializer::class, + keyDeserializer = LongDeserializer::class, valueDeserializer = PeriodeDeserializer::class ) @@ -62,5 +62,5 @@ data class Dependencies( val dataSource: DataSource, val scheduleDeletionService: ScheduleDeletionService, val aktivePerioderGaugeScheduler: AktivePerioderGaugeScheduler, - val consumer: KafkaConsumer + val consumer: KafkaConsumer ) diff --git a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/domain/Arbeidssoekerperiode.kt b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/domain/Arbeidssoekerperiode.kt index 806e85d..7e1af07 100644 --- a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/domain/Arbeidssoekerperiode.kt +++ b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/domain/Arbeidssoekerperiode.kt @@ -1,5 +1,7 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain +import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.toLocalDateTime +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode import java.time.LocalDateTime import java.util.UUID @@ -22,3 +24,11 @@ fun Arbeidssoekerperiode.toArbeidssoekerperiodeResponse() = startet = startet, avsluttet = avsluttet ) + +fun Periode.toArbeidssoekerperiode() = + Arbeidssoekerperiode( + identitetsnummer = Identitetsnummer(identitetsnummer), + periodeId = id, + startet = startet.tidspunkt.toLocalDateTime(), + avsluttet = avsluttet?.tidspunkt?.toLocalDateTime() + ) diff --git a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/kafka/PeriodeConsumer.kt b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/kafka/PeriodeConsumer.kt index cc486d4..ee02d25 100644 --- a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/kafka/PeriodeConsumer.kt +++ b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/kafka/PeriodeConsumer.kt @@ -3,12 +3,14 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.kafka import no.nav.paw.arbeidssoekerregisteret.eksternt.api.services.ArbeidssoekerService import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.logger import no.nav.paw.arbeidssokerregisteret.api.v1.Periode +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.clients.consumer.KafkaConsumer import java.time.Duration class PeriodeConsumer( private val topic: String, - private val consumer: KafkaConsumer, + private val consumer: KafkaConsumer, private val arbeidssoekerService: ArbeidssoekerService ) { fun start() { @@ -16,17 +18,24 @@ class PeriodeConsumer( consumer.subscribe(listOf(topic)) while (true) { - consumer.poll(Duration.ofMillis(500)).forEach { post -> - try { - logger.info("Mottok melding fra $topic med offset ${post.offset()} partition ${post.partition()}") - val arbeidssoekerperiode = post.value() - arbeidssoekerService.opprettEllerOppdaterArbeidssoekerperiode(arbeidssoekerperiode) - - consumer.commitSync() - } catch (error: Exception) { - throw Exception("Feil ved konsumering av melding fra $topic", error) + val records: ConsumerRecords = + consumer.poll(Duration.ofMillis(500)) + .onEach { + logger.info("Mottok melding fra $topic med offset ${it.offset()} partition ${it.partition()}") + } + val perioder = + records.map { record: ConsumerRecord -> + record.value() } - } + processAndCommitBatch(perioder) } } + + private fun processAndCommitBatch(batch: Iterable) = + try { + arbeidssoekerService.storeBatch(batch) + consumer.commitSync() + } catch (error: Exception) { + throw Exception("Feil ved konsumering av melding fra $topic", error) + } } diff --git a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/repositories/ArbeidssoekerperiodeRepository.kt b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/repositories/ArbeidssoekerperiodeRepository.kt index 857343e..57f97cb 100644 --- a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/repositories/ArbeidssoekerperiodeRepository.kt +++ b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/repositories/ArbeidssoekerperiodeRepository.kt @@ -3,9 +3,11 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.repositories import no.nav.paw.arbeidssoekerregisteret.eksternt.api.database.PeriodeTable import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.Arbeidssoekerperiode import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.Identitetsnummer +import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.toArbeidssoekerperiode import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.logger import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.toInstant import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.toLocalDateTime +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.SqlExpressionBuilder.less import org.jetbrains.exposed.sql.deleteWhere @@ -19,6 +21,21 @@ import java.time.LocalDate import java.util.UUID class ArbeidssoekerperiodeRepository(private val database: Database) { + fun storeBatch(arbeidssoekerperioder: Iterable) { + transaction(database) { + repetitionAttempts = 2 + minRepetitionDelay = 200 + + arbeidssoekerperioder.forEach { periode -> + if (finnesArbeidssoekerperiode(periode.id)) { + oppdaterArbeidssoekerperiode(periode.toArbeidssoekerperiode()) + } else { + opprettArbeidssoekerperiode(periode.toArbeidssoekerperiode()) + } + } + } + } + fun hentArbeidssoekerperioder( identitetsnummer: Identitetsnummer, fraStartetDato: LocalDate? @@ -68,9 +85,6 @@ class ArbeidssoekerperiodeRepository(private val database: Database) { fun opprettArbeidssoekerperiode(periode: Arbeidssoekerperiode) { transaction(database) { - repetitionAttempts = 2 - minRepetitionDelay = 200 - PeriodeTable.insert { it[periodeId] = periode.periodeId it[identitetsnummer] = periode.identitetsnummer.verdi @@ -81,18 +95,18 @@ class ArbeidssoekerperiodeRepository(private val database: Database) { } fun oppdaterArbeidssoekerperiode(periode: Arbeidssoekerperiode) { - transaction(database) { - if (periode.avsluttet == null) { - throw IllegalArgumentException("Avsluttet kan ikke være null ved oppdatering av periode") - } - try { + if (periode.avsluttet == null) { + throw IllegalArgumentException("Avsluttet kan ikke være null ved oppdatering av periode") + } + try { + transaction(database) { PeriodeTable.update({ PeriodeTable.periodeId eq periode.periodeId }) { it[avsluttet] = periode.avsluttet.toInstant() } - } catch (e: SQLException) { - logger.error("Feil ved oppdatering av periode", e) - throw e } + } catch (e: SQLException) { + logger.error("Feil ved oppdatering av periode", e) + throw e } } diff --git a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/services/ArbeidssoekerService.kt b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/services/ArbeidssoekerService.kt index def7bef..8103fcf 100644 --- a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/services/ArbeidssoekerService.kt +++ b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/services/ArbeidssoekerService.kt @@ -1,30 +1,15 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.services -import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.Arbeidssoekerperiode import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.ArbeidssoekerperiodeResponse import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.Identitetsnummer import no.nav.paw.arbeidssoekerregisteret.eksternt.api.domain.toArbeidssoekerperiodeResponse import no.nav.paw.arbeidssoekerregisteret.eksternt.api.repositories.ArbeidssoekerperiodeRepository import no.nav.paw.arbeidssokerregisteret.api.v1.Periode import java.time.LocalDate -import java.time.LocalDateTime -import java.time.ZoneId class ArbeidssoekerService(private val arbeidssoekerperiodeRepository: ArbeidssoekerperiodeRepository) { - fun opprettEllerOppdaterArbeidssoekerperiode(periodeMelding: Periode) { - val eksisterendePeriode = arbeidssoekerperiodeRepository.finnesArbeidssoekerperiode(periodeMelding.id) - val periode = - Arbeidssoekerperiode( - identitetsnummer = Identitetsnummer(periodeMelding.identitetsnummer), - periodeId = periodeMelding.id, - startet = LocalDateTime.ofInstant(periodeMelding.startet.tidspunkt, ZoneId.systemDefault()), - avsluttet = periodeMelding.avsluttet?.tidspunkt?.let { LocalDateTime.ofInstant(it, ZoneId.systemDefault()) } - ) - if (eksisterendePeriode) { - arbeidssoekerperiodeRepository.oppdaterArbeidssoekerperiode(periode) - } else { - arbeidssoekerperiodeRepository.opprettArbeidssoekerperiode(periode) - } + fun storeBatch(arbeidssoekerperioder: Iterable) { + arbeidssoekerperiodeRepository.storeBatch(arbeidssoekerperioder) } fun hentArbeidssoekerperioder( diff --git a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/services/ScheduleDeletionService.kt b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/services/ScheduleDeletionService.kt index 221be88..c76e940 100644 --- a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/services/ScheduleDeletionService.kt +++ b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/services/ScheduleDeletionService.kt @@ -2,6 +2,7 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.services import no.nav.paw.arbeidssoekerregisteret.eksternt.api.repositories.ArbeidssoekerperiodeRepository import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.TimeUtils.getMaxDateForDatabaseStorage +import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.getDelayUntilMidnight import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.getDeletionInterval import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.logger import no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils.toInstant @@ -22,5 +23,5 @@ class ScheduleDeletionService(arbeidssoekerperiodeRepository: Arbeidssoekerperio } } - fun scheduleDatabaseDeletionTask() = timer.scheduleAtFixedRate(task, 0L, getDeletionInterval()) + fun scheduleDatabaseDeletionTask() = timer.scheduleAtFixedRate(task, getDelayUntilMidnight(), getDeletionInterval()) } diff --git a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/utils/TimeUtils.kt b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/utils/TimeUtils.kt index 07518be..af3187b 100644 --- a/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/utils/TimeUtils.kt +++ b/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/eksternt/api/utils/TimeUtils.kt @@ -2,9 +2,10 @@ package no.nav.paw.arbeidssoekerregisteret.eksternt.api.utils import java.time.Duration import java.time.Instant +import java.time.LocalDate import java.time.LocalDateTime +import java.time.LocalTime import java.time.ZoneId -import java.time.temporal.TemporalAdjusters fun Instant.toLocalDateTime(): LocalDateTime = LocalDateTime.ofInstant(this, ZoneId.systemDefault()) @@ -12,10 +13,16 @@ fun LocalDateTime.toInstant(): Instant = this.atZone(ZoneId.systemDefault()).toI fun getDeletionInterval(): Long = 1000L * 60 * 60 * 24 // 24 timer +fun getDelayUntilMidnight(): Long { + val now = LocalDateTime.now() + val midnight = LocalDateTime.of(LocalDate.now().plusDays(1), LocalTime.of(0, 0, 0, 0)) + return Duration.between(now, midnight).toMillis() +} + object TimeUtils { private val now = LocalDateTime.now() - private fun getStartOfYear(): LocalDateTime = now.with(TemporalAdjusters.firstDayOfYear()).withHour(0).withMinute(0).withSecond(0).withNano(0) + private fun getStartOfYear(): LocalDateTime = LocalDateTime.of(LocalDate.of(now.year, 1, 1), LocalTime.of(0, 0, 0, 0)) private fun getDurationFromNowToStartOfYear(): Duration = Duration.between(now, getStartOfYear())