From 7f7b32e1528f6cae939d69dfd5e72d68021f58a2 Mon Sep 17 00:00:00 2001 From: Jostein Holje Date: Fri, 10 Feb 2023 09:12:16 +0100 Subject: [PATCH 01/11] =?UTF-8?q?Begynnelse=20p=C3=A5=20kafka-producer=20f?= =?UTF-8?q?or=20automatisk=20reaktivering?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package-lock.json | 9 ++++ package.json | 1 + .../reaktivering/automatiskReaktivering.ts | 11 ++-- .../automatiskReaktiveringSvar.ts | 2 + src/config/index.ts | 6 +++ src/kafka/automatisk-reaktivert-producer.ts | 53 +++++++++++++++++++ 6 files changed, 79 insertions(+), 3 deletions(-) create mode 100644 src/kafka/automatisk-reaktivert-producer.ts diff --git a/package-lock.json b/package-lock.json index 87dfe18..08ac2c1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,6 +21,7 @@ "helmet": "6.0.1", "jose": "4.11.2", "jsonwebtoken": "9.0.0", + "kafkajs": "2.2.3", "node-jose": "2.1.1", "openid-client": "5.4.0", "pino": "8.8.0", @@ -5098,6 +5099,14 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/kafkajs": { + "version": "2.2.3", + "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.3.tgz", + "integrity": "sha512-JmzIiLHE/TdQ7b4I2B/DNMtfhTh66fmEaEg7gGkyQXBC6f1A7I2jSjeUsVIJfC8d9YcEIURyBjtOEKBO5OHVhg==", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/kleur": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/kleur/-/kleur-3.0.3.tgz", diff --git a/package.json b/package.json index 41c5263..ee6eb0e 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "helmet": "6.0.1", "jose": "4.11.2", "jsonwebtoken": "9.0.0", + "kafkajs": "2.2.3", "node-jose": "2.1.1", "openid-client": "5.4.0", "pino": "8.8.0", diff --git a/src/api/reaktivering/automatiskReaktivering.ts b/src/api/reaktivering/automatiskReaktivering.ts index dfdd59b..52943ee 100644 --- a/src/api/reaktivering/automatiskReaktivering.ts +++ b/src/api/reaktivering/automatiskReaktivering.ts @@ -15,9 +15,14 @@ function automatiskReaktiveringRoutes( res.status(400).send('mangler fnr'); return; } - - const result = await repository.lagre(fnr); - res.status(201).send(result); + try { + const result = await repository.lagre(fnr); + // kafkaProducer.send(result) + res.status(201).send(result); + } catch (e) { + // log + res.status(500).end(); + } }); return router; diff --git a/src/api/reaktivering/automatiskReaktiveringSvar.ts b/src/api/reaktivering/automatiskReaktiveringSvar.ts index 61f5a8c..daa3e32 100644 --- a/src/api/reaktivering/automatiskReaktiveringSvar.ts +++ b/src/api/reaktivering/automatiskReaktiveringSvar.ts @@ -62,6 +62,8 @@ function automatiskReaktiveringSvarRoutes( automatiskReaktiveringId: reaktivering.id, }); + // await kafkaProducer.send(result) + res.status(201).send({ opprettetDato: reaktivering.created_at, svar: { diff --git a/src/config/index.ts b/src/config/index.ts index 98dc629..6d6eed7 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -16,6 +16,12 @@ export interface IEnvironmentVariables { IDPORTEN_JWKS_URI: string; AZURE_APP_WELL_KNOWN_URL: string; AZURE_APP_CLIENT_ID: string; + APP_NAME: string; + KAFKA_TOPIC: string; + KAFKA_BROKERS: string; + KAFKA_CERTIFICATE: string; + KAFKA_CA: string; + KAFKA_PRIVATE_KEY: string; } const env = process.env as unknown as IEnvironmentVariables; diff --git a/src/kafka/automatisk-reaktivert-producer.ts b/src/kafka/automatisk-reaktivert-producer.ts new file mode 100644 index 0000000..e9eb5e0 --- /dev/null +++ b/src/kafka/automatisk-reaktivert-producer.ts @@ -0,0 +1,53 @@ +import { CompressionTypes, Kafka, KafkaConfig } from 'kafkajs'; +import config from '../config'; + +interface Producer { + start(): Promise; + stop(): Promise; + send(data: any): Promise; +} + +const kafkaConfig: KafkaConfig = { + clientId: config.APP_NAME, + brokers: [config.KAFKA_BROKERS], + ssl: !config.KAFKA_CA + ? false + : { + rejectUnauthorized: false, + ca: [config.KAFKA_CA], + key: config.KAFKA_PRIVATE_KEY, + cert: config.KAFKA_CERTIFICATE, + }, +}; +const createProducer = (): Producer => { + const kafka = new Kafka(kafkaConfig); + const producer = kafka.producer(); + + return { + async start(): Promise { + try { + await producer.connect(); + } catch (e) {} + }, + async send(data: any): Promise { + try { + await producer.send({ + topic: 'topic', + compression: CompressionTypes.GZIP, + messages: [ + { + value: JSON.stringify(data), + }, + ], + }); + } catch (e) {} + }, + async stop(): Promise { + try { + await producer.disconnect(); + } catch (e) {} + }, + }; +}; + +export default createProducer; From 7694ee534b9fba4f35ee14ead1221b7e0870942a Mon Sep 17 00:00:00 2001 From: Jonas Enge Date: Mon, 13 Feb 2023 12:43:18 +0100 Subject: [PATCH 02/11] Legger til kafka i nais.yaml --- nais/dev-gcp/nais.yaml | 2 ++ nais/prod-gcp/nais.yaml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/nais/dev-gcp/nais.yaml b/nais/dev-gcp/nais.yaml index a9c127d..59d901d 100644 --- a/nais/dev-gcp/nais.yaml +++ b/nais/dev-gcp/nais.yaml @@ -78,3 +78,5 @@ spec: - type: POSTGRES_14 databases: - name: aia-backend + kafka: + pool: nav-dev diff --git a/nais/prod-gcp/nais.yaml b/nais/prod-gcp/nais.yaml index 5170f7b..465e980 100644 --- a/nais/prod-gcp/nais.yaml +++ b/nais/prod-gcp/nais.yaml @@ -81,3 +81,5 @@ spec: - type: POSTGRES_14 databases: - name: aia-backend + kafka: + pool: nav-prod From 6a4459d9e848505c7be0d11f4e34ce182d84a318 Mon Sep 17 00:00:00 2001 From: Jonas Enge Date: Mon, 13 Feb 2023 12:46:42 +0100 Subject: [PATCH 03/11] f Legger til kafka topic env --- nais/dev-gcp/nais.yaml | 2 ++ nais/prod-gcp/nais.yaml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/nais/dev-gcp/nais.yaml b/nais/dev-gcp/nais.yaml index 59d901d..774d731 100644 --- a/nais/dev-gcp/nais.yaml +++ b/nais/dev-gcp/nais.yaml @@ -54,6 +54,8 @@ spec: value: dev - name: DAGPENGER_INNSYN_URL value: http://dp-innsyn.teamdagpenger.svc.cluster.local + - name: KAFKA_TOPIC + value: arbeidssoker-reaktivering-v1 resources: limits: cpu: "3" diff --git a/nais/prod-gcp/nais.yaml b/nais/prod-gcp/nais.yaml index 465e980..12ef12e 100644 --- a/nais/prod-gcp/nais.yaml +++ b/nais/prod-gcp/nais.yaml @@ -57,6 +57,8 @@ spec: value: prod - name: DAGPENGER_INNSYN_URL value: http://dp-innsyn.teamdagpenger.svc.cluster.local + - name: KAFKA_TOPIC + value: arbeidssoker-reaktivering-v1 resources: limits: cpu: "3" From 4e4f38df9cd7afd8dbed19c21b5713c2b46a1a00 Mon Sep 17 00:00:00 2001 From: Jonas Enge Date: Mon, 13 Feb 2023 12:57:17 +0100 Subject: [PATCH 04/11] r fjerner kompresjon --- src/kafka/automatisk-reaktivert-producer.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/kafka/automatisk-reaktivert-producer.ts b/src/kafka/automatisk-reaktivert-producer.ts index e9eb5e0..022c4a4 100644 --- a/src/kafka/automatisk-reaktivert-producer.ts +++ b/src/kafka/automatisk-reaktivert-producer.ts @@ -1,4 +1,4 @@ -import { CompressionTypes, Kafka, KafkaConfig } from 'kafkajs'; +import { Kafka, KafkaConfig } from 'kafkajs'; import config from '../config'; interface Producer { @@ -32,8 +32,7 @@ const createProducer = (): Producer => { async send(data: any): Promise { try { await producer.send({ - topic: 'topic', - compression: CompressionTypes.GZIP, + topic: config.KAFKA_TOPIC, messages: [ { value: JSON.stringify(data), From dcd27e3e3f06a1af7ab38e0bf3d2aa72d8e3861f Mon Sep 17 00:00:00 2001 From: Jonas Enge Date: Mon, 13 Feb 2023 14:39:35 +0100 Subject: [PATCH 05/11] f legger til kafka produser for automatisk reaktivering --- .../reaktivering/automatiskReaktivering.ts | 4 +- .../automatiskReaktiveringSvar.ts | 6 ++- src/deps.ts | 3 ++ src/index.ts | 11 +++- src/kafka/automatisk-reaktivert-producer.ts | 54 +++++++++++-------- 5 files changed, 51 insertions(+), 27 deletions(-) diff --git a/src/api/reaktivering/automatiskReaktivering.ts b/src/api/reaktivering/automatiskReaktivering.ts index 52943ee..0760585 100644 --- a/src/api/reaktivering/automatiskReaktivering.ts +++ b/src/api/reaktivering/automatiskReaktivering.ts @@ -1,9 +1,11 @@ import { RequestHandler, Router } from 'express'; import azureAdAuthentication from '../../middleware/azure-ad-authentication'; import { AutomatiskReaktiveringRepository } from '../../db/automatiskReaktiveringRepository'; +import { KafkaProducer } from '../../kafka/automatisk-reaktivert-producer'; function automatiskReaktiveringRoutes( repository: AutomatiskReaktiveringRepository, + automatiskReaktivertProducer: KafkaProducer, authMiddleware: RequestHandler = azureAdAuthentication ) { const router = Router(); @@ -17,7 +19,7 @@ function automatiskReaktiveringRoutes( } try { const result = await repository.lagre(fnr); - // kafkaProducer.send(result) + await automatiskReaktivertProducer.send([result]); res.status(201).send(result); } catch (e) { // log diff --git a/src/api/reaktivering/automatiskReaktiveringSvar.ts b/src/api/reaktivering/automatiskReaktiveringSvar.ts index daa3e32..e15e577 100644 --- a/src/api/reaktivering/automatiskReaktiveringSvar.ts +++ b/src/api/reaktivering/automatiskReaktiveringSvar.ts @@ -1,12 +1,14 @@ import { Router } from 'express'; import { AutomatiskReaktiveringRepository } from '../../db/automatiskReaktiveringRepository'; import { AutomatiskReaktiveringSvarRepository } from '../../db/automatiskReaktiveringSvarRepository'; +import { KafkaProducer } from '../../kafka/automatisk-reaktivert-producer'; import logger from '../../logger'; import { IdPortenRequest } from '../../middleware/idporten-authentication'; function automatiskReaktiveringSvarRoutes( reaktiveringRepository: AutomatiskReaktiveringRepository, - svarRepository: AutomatiskReaktiveringSvarRepository + svarRepository: AutomatiskReaktiveringSvarRepository, + automatiskReaktivertProducer: KafkaProducer ) { const router = Router(); @@ -62,7 +64,7 @@ function automatiskReaktiveringSvarRoutes( automatiskReaktiveringId: reaktivering.id, }); - // await kafkaProducer.send(result) + await automatiskReaktivertProducer.send([result]); res.status(201).send({ opprettetDato: reaktivering.created_at, diff --git a/src/deps.ts b/src/deps.ts index 56b9e87..7d3352e 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -9,6 +9,7 @@ import createAutomatiskReaktiveringRepository, { import createAutomatiskReaktiveringSvarRepository, { AutomatiskReaktiveringSvarRepository, } from './db/automatiskReaktiveringSvarRepository'; +import createProducer, { KafkaProducer } from './kafka/automatisk-reaktivert-producer'; export interface Dependencies { tokenDings: Promise; @@ -16,6 +17,7 @@ export interface Dependencies { behovRepository: BehovRepository; automatiskReaktiveringRepository: AutomatiskReaktiveringRepository; automatiskReaktiveringSvarRepository: AutomatiskReaktiveringSvarRepository; + automatiskReaktivertProducer: Promise; } function createDependencies(): Dependencies { @@ -32,6 +34,7 @@ function createDependencies(): Dependencies { behovRepository: createBehovRepository(prismaClient), automatiskReaktiveringRepository: createAutomatiskReaktiveringRepository(prismaClient), automatiskReaktiveringSvarRepository: createAutomatiskReaktiveringSvarRepository(prismaClient), + automatiskReaktivertProducer: createProducer(), }; } diff --git a/src/index.ts b/src/index.ts index f77fcf7..666cd40 100644 --- a/src/index.ts +++ b/src/index.ts @@ -44,6 +44,7 @@ async function setUpRoutes() { behovRepository, automatiskReaktiveringRepository, automatiskReaktiveringSvarRepository, + automatiskReaktivertProducer, } = createDependencies(); // Public routes @@ -52,7 +53,7 @@ async function setUpRoutes() { router.use(unleashApi()); // azure ad - router.use(automatiskReaktiveringApi(automatiskReaktiveringRepository)); + router.use(automatiskReaktiveringApi(automatiskReaktiveringRepository, await automatiskReaktivertProducer)); // id porten router.use(idportenAuthentication); @@ -68,7 +69,13 @@ async function setUpRoutes() { router.use(behovForVeiledningApi(behovRepository)); router.use(dagpengerStatusApi(await tokenDings)); router.use(meldekortInaktivering()); - router.use(reaktiveringApi(automatiskReaktiveringRepository, automatiskReaktiveringSvarRepository)); + router.use( + reaktiveringApi( + automatiskReaktiveringRepository, + automatiskReaktiveringSvarRepository, + await automatiskReaktivertProducer + ) + ); app.use(config.BASE_PATH || '', router); } diff --git a/src/kafka/automatisk-reaktivert-producer.ts b/src/kafka/automatisk-reaktivert-producer.ts index 022c4a4..665fbb4 100644 --- a/src/kafka/automatisk-reaktivert-producer.ts +++ b/src/kafka/automatisk-reaktivert-producer.ts @@ -1,10 +1,10 @@ +import { AutomatiskReaktivering, AutomatiskReaktiveringSvar } from '@prisma/client'; import { Kafka, KafkaConfig } from 'kafkajs'; import config from '../config'; +import logger from '../logger'; -interface Producer { - start(): Promise; - stop(): Promise; - send(data: any): Promise; +export interface KafkaProducer { + send(data: Array): Promise; } const kafkaConfig: KafkaConfig = { @@ -19,32 +19,42 @@ const kafkaConfig: KafkaConfig = { cert: config.KAFKA_CERTIFICATE, }, }; -const createProducer = (): Producer => { +const createProducer = async (): Promise => { const kafka = new Kafka(kafkaConfig); const producer = kafka.producer(); + await producer.connect(); + return { - async start(): Promise { - try { - await producer.connect(); - } catch (e) {} - }, - async send(data: any): Promise { + async send(data: Array): Promise { + const messages = data.map((d) => { + if ('svar' in d) { + return { + value: JSON.stringify({ + bruker_id: d.bruker_id, + created_at: d.created_at, + svar: d.svar, + type: 'AutomatiskReaktiveringSvar', + }), + }; + } + return { + value: JSON.stringify({ + bruker_id: d.bruker_id, + created_at: d.created_at, + type: 'AutomatiskReaktivering', + }), + }; + }); + try { await producer.send({ topic: config.KAFKA_TOPIC, - messages: [ - { - value: JSON.stringify(data), - }, - ], + messages, }); - } catch (e) {} - }, - async stop(): Promise { - try { - await producer.disconnect(); - } catch (e) {} + } catch (e) { + logger.error(`Fikk ikke sendt melding til kafka ${e}`); + } }, }; }; From bfba3e7ceeae7c96f752e53f1fc9ec3a7d06da5b Mon Sep 17 00:00:00 2001 From: Jonas Enge Date: Mon, 13 Feb 2023 14:41:36 +0100 Subject: [PATCH 06/11] f legger til kafka produser for automatisk reaktivering --- .../reaktivering/automatiskReaktivering.test.ts | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/test/api/reaktivering/automatiskReaktivering.test.ts b/test/api/reaktivering/automatiskReaktivering.test.ts index fed4a26..f116b83 100644 --- a/test/api/reaktivering/automatiskReaktivering.test.ts +++ b/test/api/reaktivering/automatiskReaktivering.test.ts @@ -4,6 +4,7 @@ import bodyParser from 'body-parser'; import { mockDeep } from 'jest-mock-extended'; import { AutomatiskReaktiveringRepository } from '../../../src/db/automatiskReaktiveringRepository'; import automatiskReaktiveringRoutes from '../../../src/api/reaktivering/automatiskReaktivering'; +import { KafkaProducer } from '../../../src/kafka/automatisk-reaktivert-producer'; describe('automatisk reaktivering api', () => { describe('POST /azure/automatisk-reaktivering', () => { @@ -16,7 +17,9 @@ describe('automatisk reaktivering api', () => { }; const repository = mockDeep(); - app.use(automatiskReaktiveringRoutes(repository, authMiddleware)); + const kafkaProducer = mockDeep(); + + app.use(automatiskReaktiveringRoutes(repository, kafkaProducer, authMiddleware)); const response = await request(app).post('/azure/automatisk-reaktivering'); expect(response.statusCode).toEqual(401); @@ -32,7 +35,9 @@ describe('automatisk reaktivering api', () => { const repository = mockDeep(); - app.use(automatiskReaktiveringRoutes(repository, authMiddleware)); + const kafkaProducer = mockDeep(); + + app.use(automatiskReaktiveringRoutes(repository, kafkaProducer, authMiddleware)); const response = await request(app).post('/azure/automatisk-reaktivering'); expect(response.statusCode).toEqual(400); @@ -47,13 +52,16 @@ describe('automatisk reaktivering api', () => { }; const repository = mockDeep(); + + const kafkaProducer = mockDeep(); + repository.lagre.mockResolvedValue({ id: 42, bruker_id: 'fnr-123', created_at: new Date('2022-12-12T11:30:28.603Z'), }); - app.use(automatiskReaktiveringRoutes(repository, authMiddleware)); + app.use(automatiskReaktiveringRoutes(repository, kafkaProducer, authMiddleware)); const response = await request(app).post('/azure/automatisk-reaktivering').send({ fnr: 'fnr-123' }); From fed42d38547ce691ef2d87d24bc3dffba92e9fcc Mon Sep 17 00:00:00 2001 From: Jonas Enge Date: Mon, 13 Feb 2023 14:55:17 +0100 Subject: [PATCH 07/11] r legger til namespace i kafka-topic --- nais/dev-gcp/nais.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nais/dev-gcp/nais.yaml b/nais/dev-gcp/nais.yaml index 774d731..e1cb4f3 100644 --- a/nais/dev-gcp/nais.yaml +++ b/nais/dev-gcp/nais.yaml @@ -55,7 +55,7 @@ spec: - name: DAGPENGER_INNSYN_URL value: http://dp-innsyn.teamdagpenger.svc.cluster.local - name: KAFKA_TOPIC - value: arbeidssoker-reaktivering-v1 + value: paw.arbeidssoker-reaktivering-v1 resources: limits: cpu: "3" From 87b0ec4a1de0057929eb3abfb2757a3ec911c4c1 Mon Sep 17 00:00:00 2001 From: Jostein Holje Date: Thu, 23 Feb 2023 13:18:03 +0100 Subject: [PATCH 08/11] =?UTF-8?q?Script=20for=20=C3=A5=20dumpe=20db=20til?= =?UTF-8?q?=20kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/batch/dump-db-to-kafka.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 src/batch/dump-db-to-kafka.ts diff --git a/src/batch/dump-db-to-kafka.ts b/src/batch/dump-db-to-kafka.ts new file mode 100644 index 0000000..cfc188f --- /dev/null +++ b/src/batch/dump-db-to-kafka.ts @@ -0,0 +1,13 @@ +import { PrismaClient } from '@prisma/client'; +import createProducer from '../kafka/automatisk-reaktivert-producer'; + +const prismaClient = new PrismaClient(); +const kafkaProducer = await createProducer(); + +console.log('Starter med å dumpe automatiskReaktivering...'); +await kafkaProducer.send(await prismaClient.automatiskReaktivering.findMany()); +console.log('Ferdig med automatiskReaktivering'); + +console.log('Starter med å dumpe automatiskReaktiveringSvar...'); +await kafkaProducer.send(await prismaClient.automatiskReaktiveringSvar.findMany()); +console.log('Ferdig med automatiskReaktiveringSvar'); From e37f5da211ade8a9884b70817c8891401858416c Mon Sep 17 00:00:00 2001 From: Jostein Holje Date: Thu, 23 Feb 2023 13:25:19 +0100 Subject: [PATCH 09/11] Legg til threshold dato i batch script --- src/batch/dump-db-to-kafka.ts | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/batch/dump-db-to-kafka.ts b/src/batch/dump-db-to-kafka.ts index cfc188f..356a7ce 100644 --- a/src/batch/dump-db-to-kafka.ts +++ b/src/batch/dump-db-to-kafka.ts @@ -5,9 +5,25 @@ const prismaClient = new PrismaClient(); const kafkaProducer = await createProducer(); console.log('Starter med å dumpe automatiskReaktivering...'); -await kafkaProducer.send(await prismaClient.automatiskReaktivering.findMany()); +await kafkaProducer.send( + await prismaClient.automatiskReaktivering.findMany({ + where: { + created_at: { + lt: new Date(), + }, + }, + }) +); console.log('Ferdig med automatiskReaktivering'); console.log('Starter med å dumpe automatiskReaktiveringSvar...'); -await kafkaProducer.send(await prismaClient.automatiskReaktiveringSvar.findMany()); +await kafkaProducer.send( + await prismaClient.automatiskReaktiveringSvar.findMany({ + where: { + created_at: { + lt: new Date(), + }, + }, + }) +); console.log('Ferdig med automatiskReaktiveringSvar'); From 7ac4a6243f9107b80852895aa9a6f826c703d195 Mon Sep 17 00:00:00 2001 From: Jostein Holje Date: Thu, 23 Feb 2023 13:29:56 +0100 Subject: [PATCH 10/11] Chatty batch jobb --- src/batch/dump-db-to-kafka.ts | 41 +++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/src/batch/dump-db-to-kafka.ts b/src/batch/dump-db-to-kafka.ts index 356a7ce..669d568 100644 --- a/src/batch/dump-db-to-kafka.ts +++ b/src/batch/dump-db-to-kafka.ts @@ -4,26 +4,29 @@ import createProducer from '../kafka/automatisk-reaktivert-producer'; const prismaClient = new PrismaClient(); const kafkaProducer = await createProducer(); -console.log('Starter med å dumpe automatiskReaktivering...'); -await kafkaProducer.send( - await prismaClient.automatiskReaktivering.findMany({ - where: { - created_at: { - lt: new Date(), - }, +console.log('Starter med å hente automatiskReaktivering...'); +const automatiskReaktivering = await prismaClient.automatiskReaktivering.findMany({ + where: { + created_at: { + lt: new Date(), }, - }) -); -console.log('Ferdig med automatiskReaktivering'); + }, +}); +console.log(`Fant ${automatiskReaktivering.length} rader`); +console.log('Dumper automatiskReaktivering på kafka...'); +await kafkaProducer.send(automatiskReaktivering); +console.log('Ferdig med automatiskReaktivering\n\n\n'); -console.log('Starter med å dumpe automatiskReaktiveringSvar...'); -await kafkaProducer.send( - await prismaClient.automatiskReaktiveringSvar.findMany({ - where: { - created_at: { - lt: new Date(), - }, +console.log('Starter med å hente automatiskReaktiveringSvar...'); +const automatiskReaktiveringSvar = await prismaClient.automatiskReaktiveringSvar.findMany({ + where: { + created_at: { + lt: new Date(), }, - }) -); + }, +}); + +console.log(`Fant ${automatiskReaktiveringSvar.length} rader`); +console.log('Dumper automatiskReaktiveringSvar på kafka...'); +await kafkaProducer.send(automatiskReaktiveringSvar); console.log('Ferdig med automatiskReaktiveringSvar'); From 76e4e3031be48f718f913b0f6ef7577b4a540b4a Mon Sep 17 00:00:00 2001 From: Jostein Holje Date: Mon, 27 Feb 2023 12:34:29 +0100 Subject: [PATCH 11/11] r Wrap script i IIFE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - for å slippe top-level-await --- src/batch/dump-db-to-kafka.ts | 50 ++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/batch/dump-db-to-kafka.ts b/src/batch/dump-db-to-kafka.ts index 669d568..26de67e 100644 --- a/src/batch/dump-db-to-kafka.ts +++ b/src/batch/dump-db-to-kafka.ts @@ -1,32 +1,34 @@ import { PrismaClient } from '@prisma/client'; import createProducer from '../kafka/automatisk-reaktivert-producer'; -const prismaClient = new PrismaClient(); -const kafkaProducer = await createProducer(); +(async function () { + const prismaClient = new PrismaClient(); + const kafkaProducer = await createProducer(); -console.log('Starter med å hente automatiskReaktivering...'); -const automatiskReaktivering = await prismaClient.automatiskReaktivering.findMany({ - where: { - created_at: { - lt: new Date(), + console.log('Starter med å hente automatiskReaktivering...'); + const automatiskReaktivering = await prismaClient.automatiskReaktivering.findMany({ + where: { + created_at: { + lt: new Date(), + }, }, - }, -}); -console.log(`Fant ${automatiskReaktivering.length} rader`); -console.log('Dumper automatiskReaktivering på kafka...'); -await kafkaProducer.send(automatiskReaktivering); -console.log('Ferdig med automatiskReaktivering\n\n\n'); + }); + console.log(`Fant ${automatiskReaktivering.length} rader`); + console.log('Dumper automatiskReaktivering på kafka...'); + await kafkaProducer.send(automatiskReaktivering); + console.log('Ferdig med automatiskReaktivering\n\n\n'); -console.log('Starter med å hente automatiskReaktiveringSvar...'); -const automatiskReaktiveringSvar = await prismaClient.automatiskReaktiveringSvar.findMany({ - where: { - created_at: { - lt: new Date(), + console.log('Starter med å hente automatiskReaktiveringSvar...'); + const automatiskReaktiveringSvar = await prismaClient.automatiskReaktiveringSvar.findMany({ + where: { + created_at: { + lt: new Date(), + }, }, - }, -}); + }); -console.log(`Fant ${automatiskReaktiveringSvar.length} rader`); -console.log('Dumper automatiskReaktiveringSvar på kafka...'); -await kafkaProducer.send(automatiskReaktiveringSvar); -console.log('Ferdig med automatiskReaktiveringSvar'); + console.log(`Fant ${automatiskReaktiveringSvar.length} rader`); + console.log('Dumper automatiskReaktiveringSvar på kafka...'); + await kafkaProducer.send(automatiskReaktiveringSvar); + console.log('Ferdig med automatiskReaktiveringSvar'); +})();