Skip to content

Latest commit

 

History

History
100 lines (82 loc) · 4.7 KB

README.md

File metadata and controls

100 lines (82 loc) · 4.7 KB

Java Kafka Consumer

Konfigurasjon

package no.nav.config;

import java.io.File;
import java.net.InetSocketAddress;
import java.util.Properties;
import java.util.Optional;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;

public class KafkaConfig {

    public static Properties getKafkaProps() {
        Properties kafkaProps = new Properties();
        kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getEnvVar("KAFKA_BOOTSTRAP_SERVERS"));
        kafkaProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, getEnvVar("KAFKA_SCHEMAREGISTRY_SERVERS"));
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, getEnvVar("GROUP_ID"));
        kafkaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, getEnvVar("GROUP_ID") + InetSocketAddress(0).getHostString());
        kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*Hvis man har denne satt til false, så må man selv sørge for å gjøre                                                                                 consumer.commitSync() eller consumer.commitAsync() */
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializerConfig.class);
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" | "latest"); /*Hvis Kafka aldri har sett groupId før, skal vi begynne å lese fra                                                                                   starten av topicet (earliest) eller bare bry oss om meldinger som                                                                                   kommer etter at vi har registrert consumeren vår (latest)? */
        kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        kafkaProps.putAll(credentialProps());
        return kafkaProps;
    }

    private static Properties credentialProps() {
        Properties credProps = new Properties();
        credProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN")
        credProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
        credProps.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=" +getEnvVar("USERNAME")+ "password=" +getEnvVar("PASSWORD") +";");
        if (System.getenv("NAV_TRUSTSTORE_PATH") != null) {
            credProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            credProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, new File(it).getAbsolutePath());
            credProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, System.getenv("NAV_TRUSTSTORE_PASSWORD"));
        }
    }

    private static String getEnvVar(String varName) {
        return Optional.ofNullable(System.getenv(varName)).filter(l -> l.length > 0).orElseThrow(new IllegalArgumentException("Kunne ikke finne miljøvariablen " + varName));
    }
}

Consumer

package no.nav.kafka;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Properties;

import no.nav.leesah.MittAvroObject;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.RetriableException;

public class LeesahConsumer {

    public static void consumeMessages(Properties kafkaProps) {
        try(KafkaConsumer<Long, MittAvroObject> consumer = new KafkaConsumer<>(kafkaProps)) {
            consumer.subscribe(Arrays.asList("aapen.leesah.hendelse"));
            while(true) {
                try {
                    ConsumerRecords<Long, MittAvroObject> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
                    for(ConsumerRecord<Long, MittAvroObject> record : records) {
                        gjorNoeMedMelding(record);
                    }
                    consumer.commitSync();
                } catch (RetriableException re) {
                    //Ikke la RetriableExceptions bryte while løkken, kafka gjør det riktige hvis vi bare begynner på neste iterasjon av poll løkken
                }
            }
        }
    }

    private static void gjorNoeMedMelding(ConsumerRecord<Long, MittAvroObject> record) {
        System.out.println(record.value());
    }
}

public class App {

    public static void main(String[] args) {
        LeesahConsumer.consumeMessages(KafkaConfig.getKafkaProps()); //Blocking, kan wrappes i en Runnable for å kjøre på separat tråd
    }
}