Skip to content

Commit

Permalink
exception handling and health actuator implementation (#12)
Browse files Browse the repository at this point in the history
* Health indicator

* exception handler

* regular kafka Health and test endpoint
  • Loading branch information
SVAdithya authored Sep 22, 2024
1 parent 7e61087 commit 38702e5
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.kafka.learn.kafkastudy.controller;

import com.kafka.learn.kafkastudy.config.KafkaConsumerConfiguration;
import com.kafka.learn.kafkastudy.controller.model.AppService;
import com.kafka.learn.kafkastudy.health.AppHealthIndicator;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.time.ZonedDateTime;

@RestController
@AllArgsConstructor
public class HealthController {
private static final Logger logger = LoggerFactory.getLogger(HealthController.class);

private AppHealthIndicator appHealthIndicator;
@GetMapping("/health/{service}")
public String healthCheck(@PathVariable AppService service){
logger.info("Start time : {}", ZonedDateTime.now());
String s = appHealthIndicator.health().toString();
logger.info("Health update : {}, {}",ZonedDateTime.now(), s);
return s;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.kafka.learn.kafkastudy.controller.model;

public enum AppService {
MONGO,
REGULAR_KAFKA,
REACTIVE_KAFKA,
APP,
OTHERS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.kafka.learn.kafkastudy.exception;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public enum ExceptionCode {
E5001_M("MONGO_DB_EXCEPTION :"),
E5002_K("KAFKA_EXCEPTION :"),
E5003_GEN("APPLICATION_EXCEPTION :"),
E5004_DB("DATABASE_CONNECTION_EXCEPTION :");

String value;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.kafka.learn.kafkastudy.exception.handler;


import com.kafka.learn.kafkastudy.exception.ExceptionCode;
import com.mongodb.MongoException;
import org.apache.kafka.common.KafkaException;
import org.springframework.dao.DataAccessException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;

import java.util.HashMap;
import java.util.Map;

@ControllerAdvice
public class GlobalExceptionHandler {

// Handle global exceptions
@ExceptionHandler(Exception.class)
public ResponseEntity<Object> handleGlobalException(Exception ex) {
Map<String, String> response = new HashMap<>();
response.put("message", ExceptionCode.E5003_GEN.getValue() + ex.getMessage());
response.put("status", ExceptionCode.E5003_GEN.toString());
return new ResponseEntity<>(response, HttpStatus.INTERNAL_SERVER_ERROR);
}

// Handle data access exceptions (for MongoDB or other databases)
@ExceptionHandler(DataAccessException.class)
public ResponseEntity<Object> handleDataAccessException(DataAccessException ex) {
Map<String, String> response = new HashMap<>();
response.put("message", ExceptionCode.E5004_DB.getValue() + ex.getMessage());
response.put("status", ExceptionCode.E5004_DB.toString());
return new ResponseEntity<>(response, HttpStatus.INTERNAL_SERVER_ERROR);
}

// Handle Kafka exceptions
@ExceptionHandler(KafkaException.class)
public ResponseEntity<Object> handleKafkaException(KafkaException ex) {
Map<String, String> response = new HashMap<>();
response.put("message", ExceptionCode.E5002_K.getValue() + ex.getMessage());
response.put("status", ExceptionCode.E5002_K.toString());
return new ResponseEntity<>(response, HttpStatus.SERVICE_UNAVAILABLE);
}

// Handle MongoDB exceptions
@ExceptionHandler(MongoException.class)
public ResponseEntity<Object> handleMongoException(MongoException ex) {
Map<String, String> response = new HashMap<>();
response.put("message", ExceptionCode.E5001_M.getValue() + ex.getMessage());
response.put("status", ExceptionCode.E5001_M.toString());
return new ResponseEntity<>(response, HttpStatus.SERVICE_UNAVAILABLE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.kafka.learn.kafkastudy.health;

import com.kafka.learn.kafkastudy.controller.model.AppService;
import lombok.AllArgsConstructor;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Optional;

@AllArgsConstructor
@Component
public class AppHealthIndicator implements HealthIndicator {
private final ReactiveMongoTemplate reactiveMongoTemplate;
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
//private final AdminClient adminClient;

@Override
public Health health() {
try {
return Health.up()
.withDetail(AppService.REGULAR_KAFKA.name(), regularKafkaHealth())
//.withDetail(AppService.REACTIVE_KAFKA.name(), reactiveKafkaHealth())
.withDetail(AppService.MONGO.name(), mongoHealth())
.build();
}catch (Exception e) {
return Health.down().build();
}
}

private String regularKafkaHealth() {
try {
//kafkaAdminClient.describeCluster().nodes().get();
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("regularKafkaListener");

if (listenerContainer != null && listenerContainer.isRunning())
return Status.UP.toString();
return Status.DOWN.toString();
} catch (Exception e) {
return Status.DOWN.toString();
}
}

/* //TODO:
private String reactiveKafkaHealth() {
try {
ListTopicsResult result = adminClient.listTopics();
return result.listings().get().size() > 0 ? Status.UP.toString() : Status.DOWN.toString(); // Wait for the result
} catch (InterruptedException | ExecutionException e) {
return Status.DOWN.toString(); // Topic is not accessible
}
}
*/

private String mongoHealth() {
try {
Mono<Health> h = reactiveMongoTemplate.executeCommand("{ ping: 1 }")
.flatMap(result -> Mono.just(Health.up().withDetail(AppService.MONGO.name(), Status.UP).build()))
.onErrorResume(ex -> Mono.just(Health.down().withDetail(AppService.MONGO.name(), Status.DOWN).build()));
Optional<Health> health = h.blockOptional(Duration.ofSeconds(5));
return health.isPresent() ? health.get().getStatus().toString() : Status.DOWN.toString();
} catch (Exception e) {
return "DOWN";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@

@Component
public class RegularKafkaListener {
private static final Logger logger = LoggerFactory.getLogger(RegularKafkaListener.class);
private static final Logger logger = LoggerFactory.getLogger(RegularKafkaListener.class);

@KafkaListener(topics = "#{'${kafka.regular.topic}'}", containerFactory = "regularKafkaListenerContainerFactory")
public void consume(Message<String> message) {
logger.info("Regular message: {}, Regular header: {}", message.getPayload(), message.getHeaders() );
}
@KafkaListener(
id = "regularKafkaListener",
topics = "#{'${kafka.regular.topic}'}",
containerFactory = "regularKafkaListenerContainerFactory"
)
public void consume(Message<String> message) {
logger.info("Regular message: {}, Regular header: {}", message.getPayload(), message.getHeaders());
}
}

0 comments on commit 38702e5

Please sign in to comment.