From a0b121bd39324ea158ce879a3a31c24583228349 Mon Sep 17 00:00:00 2001 From: adithya Sv Date: Sat, 14 Sep 2024 22:19:51 +0530 Subject: [PATCH 1/3] Health indicator --- .../controller/HealthController.java | 20 +++++++ .../controller/model/AppService.java | 8 +++ .../kafkastudy/health/AppHealthIndicator.java | 54 +++++++++++++++++++ 3 files changed, 82 insertions(+) create mode 100644 src/main/java/com/kafka/learn/kafkastudy/controller/HealthController.java create mode 100644 src/main/java/com/kafka/learn/kafkastudy/controller/model/AppService.java create mode 100644 src/main/java/com/kafka/learn/kafkastudy/health/AppHealthIndicator.java diff --git a/src/main/java/com/kafka/learn/kafkastudy/controller/HealthController.java b/src/main/java/com/kafka/learn/kafkastudy/controller/HealthController.java new file mode 100644 index 0000000..1d4f464 --- /dev/null +++ b/src/main/java/com/kafka/learn/kafkastudy/controller/HealthController.java @@ -0,0 +1,20 @@ +package com.kafka.learn.kafkastudy.controller; + +import com.kafka.learn.kafkastudy.controller.model.AppService; +import com.kafka.learn.kafkastudy.health.AppHealthIndicator; +import lombok.AllArgsConstructor; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.ResponseBody; + +@Controller +@AllArgsConstructor +public class HealthController { + private AppHealthIndicator appHealthIndicator; + @GetMapping("/health/{service}") + public ResponseBody healthCheck(@PathVariable AppService service){ + appHealthIndicator.getHealth(Boolean.TRUE); + return null; + } +} diff --git a/src/main/java/com/kafka/learn/kafkastudy/controller/model/AppService.java b/src/main/java/com/kafka/learn/kafkastudy/controller/model/AppService.java new file mode 100644 index 0000000..3dd32f4 --- /dev/null +++ b/src/main/java/com/kafka/learn/kafkastudy/controller/model/AppService.java @@ -0,0 +1,8 @@ +package com.kafka.learn.kafkastudy.controller.model; + +public enum AppService { + MONGO, + KAFKA, + APP, + OTHERS +} diff --git a/src/main/java/com/kafka/learn/kafkastudy/health/AppHealthIndicator.java b/src/main/java/com/kafka/learn/kafkastudy/health/AppHealthIndicator.java new file mode 100644 index 0000000..1f6cfd4 --- /dev/null +++ b/src/main/java/com/kafka/learn/kafkastudy/health/AppHealthIndicator.java @@ -0,0 +1,54 @@ +package com.kafka.learn.kafkastudy.health; + +import com.mongodb.reactivestreams.client.MongoClient; +import lombok.AllArgsConstructor; +import org.apache.kafka.clients.admin.AdminClient; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.stereotype.Component; + +@AllArgsConstructor +@Component +public class AppHealthIndicator implements HealthIndicator { + private final MongoClient mongoClient; + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + //private final AdminClient kafkaAdminClient; + + + @Override + public Health getHealth(boolean includeDetails) { + return HealthIndicator.super.getHealth(includeDetails); + } + + @Override + public Health health() { + return Health.up() + .withDetails(kafkaHealth().getDetails()) + .withDetails(mongoHealth().getDetails()) + .build(); + } + + private Health kafkaHealth() { + try { + //kafkaAdminClient.describeCluster().nodes().get(); + MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("listenerOne"); + + if (listenerContainer != null && listenerContainer.isRunning()) + return Health.up().withDetail("RegularKafkaListener", "UP").build(); + return Health.down().withDetail("RegularKafkaListener", "DOWN").build(); + } catch (Exception e) { + return Health.down().withDetail("Kafka", "Not Available").withException(e).build(); + } + } + + private Health mongoHealth() { + try { + mongoClient.listDatabaseNames(); + return Health.up().withDetail("MongoDB", "Available").build(); + } catch (Exception e) { + return Health.down().withDetail("MongoDB", "Not Available").withException(e).build(); + } + } +} From d69a74e4b5d5462aa4c8bbcdc70935a8445821c2 Mon Sep 17 00:00:00 2001 From: adithya Sv Date: Sat, 14 Sep 2024 22:20:07 +0530 Subject: [PATCH 2/3] exception handler --- .../kafkastudy/exception/ExceptionCode.java | 15 ++++++ .../handler/GlobalExceptionHandler.java | 54 +++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 src/main/java/com/kafka/learn/kafkastudy/exception/ExceptionCode.java create mode 100644 src/main/java/com/kafka/learn/kafkastudy/exception/handler/GlobalExceptionHandler.java diff --git a/src/main/java/com/kafka/learn/kafkastudy/exception/ExceptionCode.java b/src/main/java/com/kafka/learn/kafkastudy/exception/ExceptionCode.java new file mode 100644 index 0000000..eca4788 --- /dev/null +++ b/src/main/java/com/kafka/learn/kafkastudy/exception/ExceptionCode.java @@ -0,0 +1,15 @@ +package com.kafka.learn.kafkastudy.exception; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +@Getter +public enum ExceptionCode { + E5001_M("MONGO_DB_EXCEPTION :"), + E5002_K("KAFKA_EXCEPTION :"), + E5003_GEN("APPLICATION_EXCEPTION :"), + E5004_DB("DATABASE_CONNECTION_EXCEPTION :"); + + String value; +} diff --git a/src/main/java/com/kafka/learn/kafkastudy/exception/handler/GlobalExceptionHandler.java b/src/main/java/com/kafka/learn/kafkastudy/exception/handler/GlobalExceptionHandler.java new file mode 100644 index 0000000..d9287cc --- /dev/null +++ b/src/main/java/com/kafka/learn/kafkastudy/exception/handler/GlobalExceptionHandler.java @@ -0,0 +1,54 @@ +package com.kafka.learn.kafkastudy.exception.handler; + + +import com.kafka.learn.kafkastudy.exception.ExceptionCode; +import com.mongodb.MongoException; +import org.apache.kafka.common.KafkaException; +import org.springframework.dao.DataAccessException; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; + +import java.util.HashMap; +import java.util.Map; + +@ControllerAdvice +public class GlobalExceptionHandler { + + // Handle global exceptions + @ExceptionHandler(Exception.class) + public ResponseEntity handleGlobalException(Exception ex) { + Map response = new HashMap<>(); + response.put("message", ExceptionCode.E5003_GEN.getValue() + ex.getMessage()); + response.put("status", ExceptionCode.E5003_GEN.toString()); + return new ResponseEntity<>(response, HttpStatus.INTERNAL_SERVER_ERROR); + } + + // Handle data access exceptions (for MongoDB or other databases) + @ExceptionHandler(DataAccessException.class) + public ResponseEntity handleDataAccessException(DataAccessException ex) { + Map response = new HashMap<>(); + response.put("message", ExceptionCode.E5004_DB.getValue() + ex.getMessage()); + response.put("status", ExceptionCode.E5004_DB.toString()); + return new ResponseEntity<>(response, HttpStatus.INTERNAL_SERVER_ERROR); + } + + // Handle Kafka exceptions + @ExceptionHandler(KafkaException.class) + public ResponseEntity handleKafkaException(KafkaException ex) { + Map response = new HashMap<>(); + response.put("message", ExceptionCode.E5002_K.getValue() + ex.getMessage()); + response.put("status", ExceptionCode.E5002_K.toString()); + return new ResponseEntity<>(response, HttpStatus.SERVICE_UNAVAILABLE); + } + + // Handle MongoDB exceptions + @ExceptionHandler(MongoException.class) + public ResponseEntity handleMongoException(MongoException ex) { + Map response = new HashMap<>(); + response.put("message", ExceptionCode.E5001_M.getValue() + ex.getMessage()); + response.put("status", ExceptionCode.E5001_M.toString()); + return new ResponseEntity<>(response, HttpStatus.SERVICE_UNAVAILABLE); + } +} From 7ad7a24e39ca8e88d1eb22bf5c4b3c4d87a848bb Mon Sep 17 00:00:00 2001 From: adithya Sv Date: Sun, 22 Sep 2024 19:16:27 +0530 Subject: [PATCH 3/3] regular kafka Health and test endpoint --- .../controller/HealthController.java | 20 ++++-- .../controller/model/AppService.java | 3 +- .../kafkastudy/health/AppHealthIndicator.java | 66 ++++++++++++------- .../listener/RegularKafkaListener.java | 14 ++-- 4 files changed, 67 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/kafka/learn/kafkastudy/controller/HealthController.java b/src/main/java/com/kafka/learn/kafkastudy/controller/HealthController.java index 1d4f464..e2c262e 100644 --- a/src/main/java/com/kafka/learn/kafkastudy/controller/HealthController.java +++ b/src/main/java/com/kafka/learn/kafkastudy/controller/HealthController.java @@ -1,20 +1,28 @@ package com.kafka.learn.kafkastudy.controller; +import com.kafka.learn.kafkastudy.config.KafkaConsumerConfiguration; import com.kafka.learn.kafkastudy.controller.model.AppService; import com.kafka.learn.kafkastudy.health.AppHealthIndicator; import lombok.AllArgsConstructor; -import org.springframework.stereotype.Controller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; -@Controller +import java.time.ZonedDateTime; + +@RestController @AllArgsConstructor public class HealthController { + private static final Logger logger = LoggerFactory.getLogger(HealthController.class); + private AppHealthIndicator appHealthIndicator; @GetMapping("/health/{service}") - public ResponseBody healthCheck(@PathVariable AppService service){ - appHealthIndicator.getHealth(Boolean.TRUE); - return null; + public String healthCheck(@PathVariable AppService service){ + logger.info("Start time : {}", ZonedDateTime.now()); + String s = appHealthIndicator.health().toString(); + logger.info("Health update : {}, {}",ZonedDateTime.now(), s); + return s; } } diff --git a/src/main/java/com/kafka/learn/kafkastudy/controller/model/AppService.java b/src/main/java/com/kafka/learn/kafkastudy/controller/model/AppService.java index 3dd32f4..50343fc 100644 --- a/src/main/java/com/kafka/learn/kafkastudy/controller/model/AppService.java +++ b/src/main/java/com/kafka/learn/kafkastudy/controller/model/AppService.java @@ -2,7 +2,8 @@ public enum AppService { MONGO, - KAFKA, + REGULAR_KAFKA, + REACTIVE_KAFKA, APP, OTHERS } diff --git a/src/main/java/com/kafka/learn/kafkastudy/health/AppHealthIndicator.java b/src/main/java/com/kafka/learn/kafkastudy/health/AppHealthIndicator.java index 1f6cfd4..039bbbf 100644 --- a/src/main/java/com/kafka/learn/kafkastudy/health/AppHealthIndicator.java +++ b/src/main/java/com/kafka/learn/kafkastudy/health/AppHealthIndicator.java @@ -1,54 +1,72 @@ package com.kafka.learn.kafkastudy.health; -import com.mongodb.reactivestreams.client.MongoClient; +import com.kafka.learn.kafkastudy.controller.model.AppService; import lombok.AllArgsConstructor; -import org.apache.kafka.clients.admin.AdminClient; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.boot.actuate.health.Status; +import org.springframework.data.mongodb.core.ReactiveMongoTemplate; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.Optional; @AllArgsConstructor @Component public class AppHealthIndicator implements HealthIndicator { - private final MongoClient mongoClient; - private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; - //private final AdminClient kafkaAdminClient; - - - @Override - public Health getHealth(boolean includeDetails) { - return HealthIndicator.super.getHealth(includeDetails); - } + private final ReactiveMongoTemplate reactiveMongoTemplate; + private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + //private final AdminClient adminClient; @Override public Health health() { - return Health.up() - .withDetails(kafkaHealth().getDetails()) - .withDetails(mongoHealth().getDetails()) - .build(); + try { + return Health.up() + .withDetail(AppService.REGULAR_KAFKA.name(), regularKafkaHealth()) + //.withDetail(AppService.REACTIVE_KAFKA.name(), reactiveKafkaHealth()) + .withDetail(AppService.MONGO.name(), mongoHealth()) + .build(); + }catch (Exception e) { + return Health.down().build(); + } } - private Health kafkaHealth() { + private String regularKafkaHealth() { try { //kafkaAdminClient.describeCluster().nodes().get(); - MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("listenerOne"); + MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("regularKafkaListener"); if (listenerContainer != null && listenerContainer.isRunning()) - return Health.up().withDetail("RegularKafkaListener", "UP").build(); - return Health.down().withDetail("RegularKafkaListener", "DOWN").build(); + return Status.UP.toString(); + return Status.DOWN.toString(); } catch (Exception e) { - return Health.down().withDetail("Kafka", "Not Available").withException(e).build(); + return Status.DOWN.toString(); + } + } + +/* //TODO: + private String reactiveKafkaHealth() { + try { + ListTopicsResult result = adminClient.listTopics(); + return result.listings().get().size() > 0 ? Status.UP.toString() : Status.DOWN.toString(); // Wait for the result + } catch (InterruptedException | ExecutionException e) { + return Status.DOWN.toString(); // Topic is not accessible } } +*/ - private Health mongoHealth() { + private String mongoHealth() { try { - mongoClient.listDatabaseNames(); - return Health.up().withDetail("MongoDB", "Available").build(); + Mono h = reactiveMongoTemplate.executeCommand("{ ping: 1 }") + .flatMap(result -> Mono.just(Health.up().withDetail(AppService.MONGO.name(), Status.UP).build())) + .onErrorResume(ex -> Mono.just(Health.down().withDetail(AppService.MONGO.name(), Status.DOWN).build())); + Optional health = h.blockOptional(Duration.ofSeconds(5)); + return health.isPresent() ? health.get().getStatus().toString() : Status.DOWN.toString(); } catch (Exception e) { - return Health.down().withDetail("MongoDB", "Not Available").withException(e).build(); + return "DOWN"; } } } diff --git a/src/main/java/com/kafka/learn/kafkastudy/listener/RegularKafkaListener.java b/src/main/java/com/kafka/learn/kafkastudy/listener/RegularKafkaListener.java index 27c6f5d..7b70eb3 100644 --- a/src/main/java/com/kafka/learn/kafkastudy/listener/RegularKafkaListener.java +++ b/src/main/java/com/kafka/learn/kafkastudy/listener/RegularKafkaListener.java @@ -8,10 +8,14 @@ @Component public class RegularKafkaListener { - private static final Logger logger = LoggerFactory.getLogger(RegularKafkaListener.class); + private static final Logger logger = LoggerFactory.getLogger(RegularKafkaListener.class); - @KafkaListener(topics = "#{'${kafka.regular.topic}'}", containerFactory = "regularKafkaListenerContainerFactory") - public void consume(Message message) { - logger.info("Regular message: {}, Regular header: {}", message.getPayload(), message.getHeaders() ); - } + @KafkaListener( + id = "regularKafkaListener", + topics = "#{'${kafka.regular.topic}'}", + containerFactory = "regularKafkaListenerContainerFactory" + ) + public void consume(Message message) { + logger.info("Regular message: {}, Regular header: {}", message.getPayload(), message.getHeaders()); + } }