Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hendelse filter backend opensearch #1807

Merged
merged 11 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import no.nav.pto.veilarbportefolje.arbeidsliste.Arbeidsliste;
import no.nav.pto.veilarbportefolje.hendelsesfilter.Hendelse;
import no.nav.pto.veilarbportefolje.opensearch.domene.Endring;
import no.nav.pto.veilarbportefolje.opensearch.domene.OppfolgingsBruker;
import no.nav.pto.veilarbportefolje.persononinfo.barnUnder18Aar.BarnUnder18AarData;
Expand Down Expand Up @@ -115,6 +116,7 @@ public class Bruker {

TiltakshendelseForBruker tiltakshendelse;
GjeldendeVedtak14a gjeldendeVedtak14a;
Hendelse.HendelseInnhold utgattVarsel;

public static Bruker of(OppfolgingsBruker bruker, boolean ufordelt, boolean erVedtakstottePilotPa) {

Expand Down Expand Up @@ -213,7 +215,8 @@ public static Bruker of(OppfolgingsBruker bruker, boolean ufordelt, boolean erVe
.setFargekategori(bruker.getFargekategori())
.setFargekategoriEnhetId(bruker.getFargekategori_enhetId())
.setTiltakshendelse(TiltakshendelseForBruker.of(bruker.getTiltakshendelse()))
.setGjeldendeVedtak14a(bruker.getGjeldendeVedtak14a());
.setGjeldendeVedtak14a(bruker.getGjeldendeVedtak14a())
.setUtgattVarsel(bruker.getUtgatt_varsel());
}

public void kalkulerNesteUtlopsdatoAvValgtAktivitetFornklet(List<String> aktiviteterForenklet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,26 @@ enum class Operasjon {
OPPDATER
}

data class Hendelse(
data class Hendelse @JsonCreator constructor(
@JsonProperty("id")
val id: UUID,
@JsonProperty("personIdent")
val personIdent: NorskIdent,
@JsonProperty("avsender")
val avsender: String,
@JsonProperty("kategori")
val kategori: Kategori,
@JsonProperty("hendelse")
val hendelse: HendelseInnhold
) {
data class HendelseInnhold(
data class HendelseInnhold @JsonCreator constructor(
@JsonProperty("beskrivelse")
val beskrivelse: String,
@JsonProperty("dato")
val dato: ZonedDateTime,
@JsonProperty("lenke")
val lenke: URL,
@JsonProperty("detaljer")
val detaljer: String?
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package no.nav.pto.veilarbportefolje.hendelsesfilter

import no.nav.common.types.identer.Fnr
import no.nav.pto.veilarbportefolje.kafka.KafkaCommonKeyedConsumerService
import no.nav.pto.veilarbportefolje.kafka.KafkaConfigCommon.Topic
import no.nav.pto.veilarbportefolje.opensearch.OpensearchIndexerV2
import no.nav.pto.veilarbportefolje.persononinfo.PdlIdentRepository
import org.jetbrains.annotations.TestOnly
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.util.*

/**
Expand All @@ -23,7 +26,8 @@ import java.util.*
@Service
class HendelseService(
@Autowired private val hendelseRepository: HendelseRepository,
@Autowired private val pdlIdentRepository: PdlIdentRepository
@Autowired private val pdlIdentRepository: PdlIdentRepository,
@Autowired private val opensearchIndexerV2: OpensearchIndexerV2
) : KafkaCommonKeyedConsumerService<HendelseRecordValue>() {
private val logger: Logger = LoggerFactory.getLogger(HendelseService::class.java)

Expand All @@ -35,6 +39,7 @@ class HendelseService(
* * dersom `hendelseRecordValue.operasjon` = [Operasjon.OPPDATER] vil lagret hendelse identifisert med `hendelseId` oppdateres
* * dersom `hendelseRecordValue.operasjon` = [Operasjon.STOPP] vil lagret hendelse identifisert med `hendelseId` slettes
*/
@Transactional
override fun behandleKafkaRecordLogikk(hendelseRecordValue: HendelseRecordValue, hendelseId: String) {
val operasjon = hendelseRecordValue.operasjon
val hendelse = toHendelse(hendelseRecordValue, hendelseId)
Expand Down Expand Up @@ -63,40 +68,112 @@ class HendelseService(
}

private fun startHendelse(hendelse: Hendelse) {
try {
val resultatAvInsertNyHendelse = try {
hendelseRepository.insert(hendelse)

logger.info("Hendelse med id ${hendelse.id} ble startet")
} catch (ex: HendelseIdEksistererAlleredeException) {
ex
}

if (resultatAvInsertNyHendelse is HendelseIdEksistererAlleredeException) {
logger.info("Hendelse med ID ${hendelse.id} allerede startet. Ignorerer melding.")
return
}

val eldsteHendelse = hendelseRepository.getEldste(hendelse.personIdent)

if (eldsteHendelse.id == hendelse.id) {
oppdaterUgattVarselForBrukerIOpenSearch(hendelse)

logger.info("Hendelse med id ${hendelse.id} ble lagret i DB og OpenSearch ble oppdatert med ny eldste utgåtte varsel for person.")
} else {
logger.info("Hendelse med id ${hendelse.id} ble lagret i DB")
}
}

private fun oppdaterHendelse(hendelse: Hendelse) {
try {
val resultatAvUpdateHendelse = try {
hendelseRepository.update(hendelse)

logger.info("Hendelse med id ${hendelse.id} ble oppdatert")
} catch (ex: IngenHendelseMedIdException) {
ex
}

if (resultatAvUpdateHendelse is IngenHendelseMedIdException) {
// 2024-12-02, Sondre:
// Per no ignorer vi melding, då vi forventar å alltid få ei "START"-melding før ei eventuell "OPPDATER"- eller "STOPP"-melding.
// Dette går fint så lenge vi ikkje har skrudd på "compaction" på topicet. Dersom vi har "compaction" på er det ikkje gitt
// Dette går fint så lenge vi ikkje har skrudd på "compaction" på topic-et. Dersom vi har "compaction" på er det ikkje gitt
// at vi berre kan ignorere, sidan vi då potensielt går glipp av hendelsar ved ein eventuell rewind på topic-et.
logger.warn("Fikk hendelse med operasjon ${Operasjon.OPPDATER} og ID ${hendelse.id}, men ingen hendelse med denne ID-en finnes. Ignorerer melding.")
return
}

val eldsteHendelse = hendelseRepository.getEldste(hendelse.personIdent)
if (eldsteHendelse.id == hendelse.id) {
oppdaterUgattVarselForBrukerIOpenSearch(hendelse)
logger.info("Hendelse med id ${hendelse.id} ble oppdatert i DB og OpenSearch ble oppdatert med ny eldste utgåtte varsel for person.")
} else {
logger.info("Hendelse med id ${hendelse.id} ble oppdatert i DB")
}
}

private fun stoppHendelse(hendelse: Hendelse) {
try {
val resultatAvDeleteHendelse = try {
hendelseRepository.delete(hendelse.id)

logger.info("Hendelse med id ${hendelse.id} ble stoppet")
} catch (ex: IngenHendelseMedIdException) {
ex
}

if (resultatAvDeleteHendelse is IngenHendelseMedIdException) {
// 2024-12-02, Sondre:
// Per no ignorer vi melding, då vi forventar å alltid få ei "START"-melding før ei eventuell "OPPDATER"- eller "STOPP"-melding.
// Dette går fint så lenge vi ikkje har skrudd på "compaction" på topicet. Dersom vi har "compaction" på er det ikkje gitt
// Dette går fint så lenge vi ikkje har skrudd på "compaction" på topic-et. Dersom vi har "compaction" på er det ikkje gitt
// at vi berre kan ignorere, sidan vi då potensielt går glipp av hendelsar ved ein eventuell rewind på topic-et.
logger.warn("Fikk hendelse med operasjon ${Operasjon.STOPP} og ID ${hendelse.id}, men ingen hendelse med denne ID-en finnes. Ignorerer melding.")
slovrid marked this conversation as resolved.
Show resolved Hide resolved
return
}

val resultatAvGetEldsteHendelse = try {
hendelseRepository.getEldste(hendelse.personIdent)
} catch (ex: IngenHendelseForPersonException) {
ex
}

if (resultatAvGetEldsteHendelse is IngenHendelseForPersonException) {
// All good - det var ingen flere hendelser for personen etter at vi slettet den som kom inn som argument
slettUgattVarselForBrukerIOpenSearch(hendelse)
logger.info("Hendelse med id ${hendelse.id} ble slettet i DB og utgått varsel ble fjernet for person i OpenSearch siden personen ikke hadde andre hendelser.")
return
}

if (resultatAvGetEldsteHendelse is Hendelse) {
oppdaterUgattVarselForBrukerIOpenSearch(resultatAvGetEldsteHendelse)

logger.info("Hendelse med id ${hendelse.id} ble slettet i DB og OpenSearch ble oppdatert med ny eldste utgåtte varsel for person, med id ${resultatAvGetEldsteHendelse.id}")
}
}

private fun oppdaterUgattVarselForBrukerIOpenSearch(hendelse: Hendelse) {
// 2024-11-29, Sondre
// Egentlig unødvendig if-sjekk så lenge kun Team DAB er på med "utgåtte varsel"
// Men har den med likevel for å tydeliggjøre at det er "utgått varsel"-feltet i OpenSearch
// som oppdateres her. Vi må huske å oppdatere håndtering etterhvert som denne tjenesten
// blir mer generalisert/får flere produsenter
if (Kategori.UTGATT_VARSEL == hendelse.kategori) {
// TODO: 2024-11-29, Sondre - Her konverterer vi bare ukritisk til Fnr, selv om NorskIdent også kan være f.eks. D-nummer
val aktorId = pdlIdentRepository.hentAktorIdForAktivBruker(Fnr.of(hendelse.personIdent.get()))
opensearchIndexerV2.oppdaterUtgattVarsel(hendelse, aktorId)
}
}

private fun slettUgattVarselForBrukerIOpenSearch(hendelse: Hendelse) {
// 2024-11-29, Sondre
// Egentlig unødvendig if-sjekk så lenge kun Team DAB er på med "utgåtte varsel"
// Men har den med likevel for å tydeliggjøre at det er "utgått varsel"-feltet i OpenSearch
// som oppdateres her. Vi må huske å oppdatere håndtering etterhvert som denne tjenesten
// blir mer generalisert/får flere produsenter
if (Kategori.UTGATT_VARSEL == hendelse.kategori) {
// TODO: 2024-11-29, Sondre - Her konverterer vi bare ukritisk til Fnr, selv om NorskIdent også kan være f.eks. D-nummer
val aktorId = pdlIdentRepository.hentAktorIdForAktivBruker(Fnr.of(hendelse.personIdent.get()))
opensearchIndexerV2.slettUtgattVarsel(aktorId)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private void flettInnNodvendigData(List<OppfolgingsBruker> brukere) {
postgresOpensearchMapper.flettInnBarnUnder18Aar(brukere);
postgresOpensearchMapper.flettInnTiltakshendelser(brukere);
postgresOpensearchMapper.flettInnSiste14aVedtak(brukere);
postgresOpensearchMapper.flettInnEldsteUtgattVarsel(brukere);
if (FeatureToggle.brukNyttArbeidssoekerregister(defaultUnleash)) {
postgresOpensearchMapper.flettInnOpplysningerOmArbeidssoekerData(brukere);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import no.nav.pto.veilarbportefolje.domene.HuskelappForBruker;
import no.nav.pto.veilarbportefolje.domene.value.VeilederId;
import no.nav.pto.veilarbportefolje.ensligforsorger.dto.output.EnsligeForsorgerOvergangsstønadTiltakDto;
import no.nav.pto.veilarbportefolje.hendelsesfilter.Hendelse;
import no.nav.pto.veilarbportefolje.oppfolging.OppfolgingRepositoryV2;
import no.nav.pto.veilarbportefolje.oppfolgingsbruker.OppfolgingsbrukerEntity;
import no.nav.pto.veilarbportefolje.siste14aVedtak.GjeldendeVedtak14a;
Expand Down Expand Up @@ -390,6 +391,31 @@ public void updateGjeldendeVedtak14a(GjeldendeVedtak14a gjeldendeVedtak14a, Akto
update(aktorId, content, format("Oppdaterte gjeldendeVedtak14a for aktorId: %s", aktorId.get()));
}

@SneakyThrows
public void oppdaterUtgattVarsel(Hendelse hendelse, AktorId aktorId) {
final XContentBuilder content = jsonBuilder()
.startObject()
.startObject("utgatt_varsel")
.field("beskrivelse", hendelse.getHendelse().getBeskrivelse())
.field("dato", hendelse.getHendelse().getDato())
.field("lenke", hendelse.getHendelse().getLenke().toString())
.field("detaljer", hendelse.getHendelse().getDetaljer())
.endObject()
.endObject();

update(aktorId, content, format("Oppdaterte utgått varsel for aktorId: %s", aktorId.get()));
}

@SneakyThrows
public void slettUtgattVarsel(AktorId aktorId) {
final XContentBuilder content = jsonBuilder()
.startObject()
.nullField("utgatt_varsel")
.endObject();

update(aktorId, content, format("Slettet utgått varsel for aktorId: %s", aktorId.get()));
}

private void update(AktorId aktoerId, XContentBuilder content, String logInfo) throws IOException {
if (!oppfolgingRepositoryV2.erUnderOppfolgingOgErAktivIdent(aktoerId)) {
secureLog.info("Oppdaterte ikke OS for brukere som ikke er under oppfolging, heller ikke for historiske identer: {}, med info {}", aktoerId, logInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import no.nav.pto.veilarbportefolje.domene.EnsligeForsorgereOvergangsstonad;
import no.nav.pto.veilarbportefolje.domene.HuskelappForBruker;
import no.nav.pto.veilarbportefolje.domene.Statsborgerskap;
import no.nav.pto.veilarbportefolje.hendelsesfilter.Hendelse;
import no.nav.pto.veilarbportefolje.persononinfo.barnUnder18Aar.BarnUnder18AarData;
import no.nav.pto.veilarbportefolje.siste14aVedtak.Avvik14aVedtak;
import no.nav.pto.veilarbportefolje.siste14aVedtak.GjeldendeVedtak14a;
Expand Down Expand Up @@ -134,4 +135,5 @@ public class OppfolgingsBruker {
Tiltakshendelse tiltakshendelse;

GjeldendeVedtak14a gjeldendeVedtak14a;
Hendelse.HendelseInnhold utgatt_varsel;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import lombok.extern.slf4j.Slf4j;
import no.nav.common.types.identer.AktorId;
import no.nav.common.types.identer.Fnr;
import no.nav.common.types.identer.NorskIdent;
import no.nav.pto.veilarbportefolje.arbeidssoeker.v2.*;
import no.nav.pto.veilarbportefolje.domene.GjeldendeIdenter;
import no.nav.pto.veilarbportefolje.domene.Statsborgerskap;
import no.nav.pto.veilarbportefolje.ensligforsorger.EnsligeForsorgereService;
import no.nav.pto.veilarbportefolje.ensligforsorger.dto.output.EnsligeForsorgerOvergangsstønadTiltakDto;
import no.nav.pto.veilarbportefolje.hendelsesfilter.Hendelse;
import no.nav.pto.veilarbportefolje.hendelsesfilter.HendelseRepository;
import no.nav.pto.veilarbportefolje.hendelsesfilter.IngenHendelseForPersonException;
import no.nav.pto.veilarbportefolje.kodeverk.KodeverkService;
import no.nav.pto.veilarbportefolje.opensearch.domene.Endring;
import no.nav.pto.veilarbportefolje.opensearch.domene.OppfolgingsBruker;
Expand Down Expand Up @@ -47,6 +51,7 @@ public class PostgresOpensearchMapper {
private final ArbeidssoekerService arbeidssoekerService;
private final TiltakshendelseRepository tiltakshendelseRepository;
private final Siste14aVedtakRepository siste14aVedtakRepository;
private final HendelseRepository hendelseRepository;

public void flettInnAktivitetsData(List<OppfolgingsBruker> brukere) {
List<AktorId> aktoerIder = brukere.stream().map(OppfolgingsBruker::getAktoer_id).map(AktorId::of).toList();
Expand Down Expand Up @@ -240,4 +245,15 @@ public void flettInnSiste14aVedtak(List<OppfolgingsBruker> brukere) {
)).orElse(null));
});
}

public void flettInnEldsteUtgattVarsel(List<OppfolgingsBruker> brukere) {
brukere.forEach(bruker -> {
try {
Hendelse eldsteHendelsePaPerson = hendelseRepository.getEldste(NorskIdent.of(bruker.getFnr()));
bruker.setUtgatt_varsel(eldsteHendelsePaPerson.getHendelse());
} catch (IngenHendelseForPersonException ex) {
log.info("Fant ingen hendelse/utgått varsel for person, så ingen data å flette inn.");
}
});
}
}
Loading
Loading