Skip to content

Commit

Permalink
Merge pull request #38 from navikt/dev/kafka
Browse files Browse the repository at this point in the history
Dev/kafka
  • Loading branch information
jstnhlj authored Feb 27, 2023
2 parents 6956c3e + 76e4e30 commit 93ae1da
Show file tree
Hide file tree
Showing 12 changed files with 6,262 additions and 85 deletions.
4 changes: 4 additions & 0 deletions nais/dev-gcp/nais.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ spec:
value: dev
- name: DAGPENGER_INNSYN_URL
value: http://dp-innsyn.teamdagpenger.svc.cluster.local
- name: KAFKA_TOPIC
value: paw.arbeidssoker-reaktivering-v1
resources:
limits:
cpu: "3"
Expand Down Expand Up @@ -83,3 +85,5 @@ spec:
- type: POSTGRES_14
databases:
- name: aia-backend
kafka:
pool: nav-dev
4 changes: 4 additions & 0 deletions nais/prod-gcp/nais.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ spec:
value: prod
- name: DAGPENGER_INNSYN_URL
value: http://dp-innsyn.teamdagpenger.svc.cluster.local
- name: KAFKA_TOPIC
value: arbeidssoker-reaktivering-v1
resources:
limits:
cpu: "3"
Expand Down Expand Up @@ -87,3 +89,5 @@ spec:
tier: db-custom-1-3840
databases:
- name: aia-backend
kafka:
pool: nav-prod
6,189 changes: 6,113 additions & 76 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"helmet": "6.0.1",
"jose": "4.12.0",
"jsonwebtoken": "9.0.0",
"kafkajs": "2.2.3",
"node-jose": "2.2.0",
"openid-client": "5.4.0",
"pino": "8.8.0",
Expand Down
13 changes: 10 additions & 3 deletions src/api/reaktivering/automatiskReaktivering.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { RequestHandler, Router } from 'express';
import azureAdAuthentication from '../../middleware/azure-ad-authentication';
import { AutomatiskReaktiveringRepository } from '../../db/automatiskReaktiveringRepository';
import { KafkaProducer } from '../../kafka/automatisk-reaktivert-producer';

function automatiskReaktiveringRoutes(
repository: AutomatiskReaktiveringRepository,
automatiskReaktivertProducer: KafkaProducer,
authMiddleware: RequestHandler = azureAdAuthentication
) {
const router = Router();
Expand All @@ -15,9 +17,14 @@ function automatiskReaktiveringRoutes(
res.status(400).send('mangler fnr');
return;
}

const result = await repository.lagre(fnr);
res.status(201).send(result);
try {
const result = await repository.lagre(fnr);
await automatiskReaktivertProducer.send([result]);
res.status(201).send(result);
} catch (e) {
// log
res.status(500).end();
}
});

return router;
Expand Down
6 changes: 5 additions & 1 deletion src/api/reaktivering/automatiskReaktiveringSvar.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { Router } from 'express';
import { AutomatiskReaktiveringRepository } from '../../db/automatiskReaktiveringRepository';
import { AutomatiskReaktiveringSvarRepository } from '../../db/automatiskReaktiveringSvarRepository';
import { KafkaProducer } from '../../kafka/automatisk-reaktivert-producer';
import logger from '../../logger';
import { IdPortenRequest } from '../../middleware/idporten-authentication';

function automatiskReaktiveringSvarRoutes(
reaktiveringRepository: AutomatiskReaktiveringRepository,
svarRepository: AutomatiskReaktiveringSvarRepository
svarRepository: AutomatiskReaktiveringSvarRepository,
automatiskReaktivertProducer: KafkaProducer
) {
const router = Router();

Expand Down Expand Up @@ -62,6 +64,8 @@ function automatiskReaktiveringSvarRoutes(
automatiskReaktiveringId: reaktivering.id,
});

await automatiskReaktivertProducer.send([result]);

res.status(201).send({
opprettetDato: reaktivering.created_at,
svar: {
Expand Down
34 changes: 34 additions & 0 deletions src/batch/dump-db-to-kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { PrismaClient } from '@prisma/client';
import createProducer from '../kafka/automatisk-reaktivert-producer';

(async function () {
const prismaClient = new PrismaClient();
const kafkaProducer = await createProducer();

console.log('Starter med å hente automatiskReaktivering...');
const automatiskReaktivering = await prismaClient.automatiskReaktivering.findMany({
where: {
created_at: {
lt: new Date(),
},
},
});
console.log(`Fant ${automatiskReaktivering.length} rader`);
console.log('Dumper automatiskReaktivering på kafka...');
await kafkaProducer.send(automatiskReaktivering);
console.log('Ferdig med automatiskReaktivering\n\n\n');

console.log('Starter med å hente automatiskReaktiveringSvar...');
const automatiskReaktiveringSvar = await prismaClient.automatiskReaktiveringSvar.findMany({
where: {
created_at: {
lt: new Date(),
},
},
});

console.log(`Fant ${automatiskReaktiveringSvar.length} rader`);
console.log('Dumper automatiskReaktiveringSvar på kafka...');
await kafkaProducer.send(automatiskReaktiveringSvar);
console.log('Ferdig med automatiskReaktiveringSvar');
})();
6 changes: 6 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ export interface IEnvironmentVariables {
IDPORTEN_JWKS_URI: string;
AZURE_APP_WELL_KNOWN_URL: string;
AZURE_APP_CLIENT_ID: string;
APP_NAME: string;
KAFKA_TOPIC: string;
KAFKA_BROKERS: string;
KAFKA_CERTIFICATE: string;
KAFKA_CA: string;
KAFKA_PRIVATE_KEY: string;
}

const env = process.env as unknown as IEnvironmentVariables;
Expand Down
3 changes: 3 additions & 0 deletions src/deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import createAutomatiskReaktiveringRepository, {
import createAutomatiskReaktiveringSvarRepository, {
AutomatiskReaktiveringSvarRepository,
} from './db/automatiskReaktiveringSvarRepository';
import createProducer, { KafkaProducer } from './kafka/automatisk-reaktivert-producer';

export interface Dependencies {
tokenDings: Promise<Auth>;
profilRepository: ProfilRepository;
behovRepository: BehovRepository;
automatiskReaktiveringRepository: AutomatiskReaktiveringRepository;
automatiskReaktiveringSvarRepository: AutomatiskReaktiveringSvarRepository;
automatiskReaktivertProducer: Promise<KafkaProducer>;
}

function createDependencies(): Dependencies {
Expand All @@ -32,6 +34,7 @@ function createDependencies(): Dependencies {
behovRepository: createBehovRepository(prismaClient),
automatiskReaktiveringRepository: createAutomatiskReaktiveringRepository(prismaClient),
automatiskReaktiveringSvarRepository: createAutomatiskReaktiveringSvarRepository(prismaClient),
automatiskReaktivertProducer: createProducer(),
};
}

Expand Down
11 changes: 9 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async function setUpRoutes() {
behovRepository,
automatiskReaktiveringRepository,
automatiskReaktiveringSvarRepository,
automatiskReaktivertProducer,
} = createDependencies();

// Public routes
Expand All @@ -53,7 +54,7 @@ async function setUpRoutes() {
router.use(unleashApi());

// azure ad
router.use(automatiskReaktiveringApi(automatiskReaktiveringRepository));
router.use(automatiskReaktiveringApi(automatiskReaktiveringRepository, await automatiskReaktivertProducer));

// id porten
router.use(idportenAuthentication);
Expand All @@ -70,7 +71,13 @@ async function setUpRoutes() {
router.use(behovForVeiledningApi(behovRepository));
router.use(dagpengerStatusApi(await tokenDings));
router.use(meldekortInaktivering());
router.use(reaktiveringApi(automatiskReaktiveringRepository, automatiskReaktiveringSvarRepository));
router.use(
reaktiveringApi(
automatiskReaktiveringRepository,
automatiskReaktiveringSvarRepository,
await automatiskReaktivertProducer
)
);

app.use(config.BASE_PATH || '', router);
}
Expand Down
62 changes: 62 additions & 0 deletions src/kafka/automatisk-reaktivert-producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { AutomatiskReaktivering, AutomatiskReaktiveringSvar } from '@prisma/client';
import { Kafka, KafkaConfig } from 'kafkajs';
import config from '../config';
import logger from '../logger';

export interface KafkaProducer {
send(data: Array<AutomatiskReaktivering | AutomatiskReaktiveringSvar>): Promise<void>;
}

const kafkaConfig: KafkaConfig = {
clientId: config.APP_NAME,
brokers: [config.KAFKA_BROKERS],
ssl: !config.KAFKA_CA
? false
: {
rejectUnauthorized: false,
ca: [config.KAFKA_CA],
key: config.KAFKA_PRIVATE_KEY,
cert: config.KAFKA_CERTIFICATE,
},
};
const createProducer = async (): Promise<KafkaProducer> => {
const kafka = new Kafka(kafkaConfig);
const producer = kafka.producer();

await producer.connect();

return {
async send(data: Array<AutomatiskReaktivering | AutomatiskReaktiveringSvar>): Promise<void> {
const messages = data.map((d) => {
if ('svar' in d) {
return {
value: JSON.stringify({
bruker_id: d.bruker_id,
created_at: d.created_at,
svar: d.svar,
type: 'AutomatiskReaktiveringSvar',
}),
};
}
return {
value: JSON.stringify({
bruker_id: d.bruker_id,
created_at: d.created_at,
type: 'AutomatiskReaktivering',
}),
};
});

try {
await producer.send({
topic: config.KAFKA_TOPIC,
messages,
});
} catch (e) {
logger.error(`Fikk ikke sendt melding til kafka ${e}`);
}
},
};
};

export default createProducer;
14 changes: 11 additions & 3 deletions test/api/reaktivering/automatiskReaktivering.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import bodyParser from 'body-parser';
import { mockDeep } from 'jest-mock-extended';
import { AutomatiskReaktiveringRepository } from '../../../src/db/automatiskReaktiveringRepository';
import automatiskReaktiveringRoutes from '../../../src/api/reaktivering/automatiskReaktivering';
import { KafkaProducer } from '../../../src/kafka/automatisk-reaktivert-producer';

describe('automatisk reaktivering api', () => {
describe('POST /azure/automatisk-reaktivering', () => {
Expand All @@ -16,7 +17,9 @@ describe('automatisk reaktivering api', () => {
};
const repository = mockDeep<AutomatiskReaktiveringRepository>();

app.use(automatiskReaktiveringRoutes(repository, authMiddleware));
const kafkaProducer = mockDeep<KafkaProducer>();

app.use(automatiskReaktiveringRoutes(repository, kafkaProducer, authMiddleware));
const response = await request(app).post('/azure/automatisk-reaktivering');

expect(response.statusCode).toEqual(401);
Expand All @@ -32,7 +35,9 @@ describe('automatisk reaktivering api', () => {

const repository = mockDeep<AutomatiskReaktiveringRepository>();

app.use(automatiskReaktiveringRoutes(repository, authMiddleware));
const kafkaProducer = mockDeep<KafkaProducer>();

app.use(automatiskReaktiveringRoutes(repository, kafkaProducer, authMiddleware));
const response = await request(app).post('/azure/automatisk-reaktivering');

expect(response.statusCode).toEqual(400);
Expand All @@ -47,13 +52,16 @@ describe('automatisk reaktivering api', () => {
};

const repository = mockDeep<AutomatiskReaktiveringRepository>();

const kafkaProducer = mockDeep<KafkaProducer>();

repository.lagre.mockResolvedValue({
id: 42,
bruker_id: 'fnr-123',
created_at: new Date('2022-12-12T11:30:28.603Z'),
});

app.use(automatiskReaktiveringRoutes(repository, authMiddleware));
app.use(automatiskReaktiveringRoutes(repository, kafkaProducer, authMiddleware));

const response = await request(app).post('/azure/automatisk-reaktivering').send({ fnr: 'fnr-123' });

Expand Down

0 comments on commit 93ae1da

Please sign in to comment.