Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exception handling and health actuator implementation #12

Merged
merged 3 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.kafka.learn.kafkastudy.controller;

import com.kafka.learn.kafkastudy.config.KafkaConsumerConfiguration;
import com.kafka.learn.kafkastudy.controller.model.AppService;
import com.kafka.learn.kafkastudy.health.AppHealthIndicator;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.time.ZonedDateTime;

@RestController
@AllArgsConstructor
public class HealthController {
private static final Logger logger = LoggerFactory.getLogger(HealthController.class);

private AppHealthIndicator appHealthIndicator;
@GetMapping("/health/{service}")
public String healthCheck(@PathVariable AppService service){
logger.info("Start time : {}", ZonedDateTime.now());
String s = appHealthIndicator.health().toString();
logger.info("Health update : {}, {}",ZonedDateTime.now(), s);
return s;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.kafka.learn.kafkastudy.controller.model;

public enum AppService {
MONGO,
REGULAR_KAFKA,
REACTIVE_KAFKA,
APP,
OTHERS
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.kafka.learn.kafkastudy.exception;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public enum ExceptionCode {
E5001_M("MONGO_DB_EXCEPTION :"),
E5002_K("KAFKA_EXCEPTION :"),
E5003_GEN("APPLICATION_EXCEPTION :"),
E5004_DB("DATABASE_CONNECTION_EXCEPTION :");

String value;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.kafka.learn.kafkastudy.exception.handler;


import com.kafka.learn.kafkastudy.exception.ExceptionCode;
import com.mongodb.MongoException;
import org.apache.kafka.common.KafkaException;
import org.springframework.dao.DataAccessException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;

import java.util.HashMap;
import java.util.Map;

@ControllerAdvice
public class GlobalExceptionHandler {

// Handle global exceptions
@ExceptionHandler(Exception.class)
public ResponseEntity<Object> handleGlobalException(Exception ex) {
Map<String, String> response = new HashMap<>();
response.put("message", ExceptionCode.E5003_GEN.getValue() + ex.getMessage());
response.put("status", ExceptionCode.E5003_GEN.toString());
return new ResponseEntity<>(response, HttpStatus.INTERNAL_SERVER_ERROR);
}

// Handle data access exceptions (for MongoDB or other databases)
@ExceptionHandler(DataAccessException.class)
public ResponseEntity<Object> handleDataAccessException(DataAccessException ex) {
Map<String, String> response = new HashMap<>();
response.put("message", ExceptionCode.E5004_DB.getValue() + ex.getMessage());
response.put("status", ExceptionCode.E5004_DB.toString());
return new ResponseEntity<>(response, HttpStatus.INTERNAL_SERVER_ERROR);
}

// Handle Kafka exceptions
@ExceptionHandler(KafkaException.class)
public ResponseEntity<Object> handleKafkaException(KafkaException ex) {
Map<String, String> response = new HashMap<>();
response.put("message", ExceptionCode.E5002_K.getValue() + ex.getMessage());
response.put("status", ExceptionCode.E5002_K.toString());
return new ResponseEntity<>(response, HttpStatus.SERVICE_UNAVAILABLE);
}

// Handle MongoDB exceptions
@ExceptionHandler(MongoException.class)
public ResponseEntity<Object> handleMongoException(MongoException ex) {
Map<String, String> response = new HashMap<>();
response.put("message", ExceptionCode.E5001_M.getValue() + ex.getMessage());
response.put("status", ExceptionCode.E5001_M.toString());
return new ResponseEntity<>(response, HttpStatus.SERVICE_UNAVAILABLE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.kafka.learn.kafkastudy.health;

import com.kafka.learn.kafkastudy.controller.model.AppService;
import lombok.AllArgsConstructor;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Optional;

@AllArgsConstructor
@Component
public class AppHealthIndicator implements HealthIndicator {
private final ReactiveMongoTemplate reactiveMongoTemplate;
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
//private final AdminClient adminClient;

@Override
public Health health() {
try {
return Health.up()
.withDetail(AppService.REGULAR_KAFKA.name(), regularKafkaHealth())
//.withDetail(AppService.REACTIVE_KAFKA.name(), reactiveKafkaHealth())
.withDetail(AppService.MONGO.name(), mongoHealth())
.build();
}catch (Exception e) {
return Health.down().build();
}
}

private String regularKafkaHealth() {
try {
//kafkaAdminClient.describeCluster().nodes().get();
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("regularKafkaListener");

if (listenerContainer != null && listenerContainer.isRunning())
return Status.UP.toString();
return Status.DOWN.toString();
} catch (Exception e) {
return Status.DOWN.toString();
}
}

/* //TODO:
private String reactiveKafkaHealth() {
try {
ListTopicsResult result = adminClient.listTopics();
return result.listings().get().size() > 0 ? Status.UP.toString() : Status.DOWN.toString(); // Wait for the result
} catch (InterruptedException | ExecutionException e) {
return Status.DOWN.toString(); // Topic is not accessible
}
}
*/

private String mongoHealth() {
try {
Mono<Health> h = reactiveMongoTemplate.executeCommand("{ ping: 1 }")
.flatMap(result -> Mono.just(Health.up().withDetail(AppService.MONGO.name(), Status.UP).build()))
.onErrorResume(ex -> Mono.just(Health.down().withDetail(AppService.MONGO.name(), Status.DOWN).build()));
Optional<Health> health = h.blockOptional(Duration.ofSeconds(5));
return health.isPresent() ? health.get().getStatus().toString() : Status.DOWN.toString();
} catch (Exception e) {
return "DOWN";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@

@Component
public class RegularKafkaListener {
private static final Logger logger = LoggerFactory.getLogger(RegularKafkaListener.class);
private static final Logger logger = LoggerFactory.getLogger(RegularKafkaListener.class);

@KafkaListener(topics = "#{'${kafka.regular.topic}'}", containerFactory = "regularKafkaListenerContainerFactory")
public void consume(Message<String> message) {
logger.info("Regular message: {}, Regular header: {}", message.getPayload(), message.getHeaders() );
}
@KafkaListener(
id = "regularKafkaListener",
topics = "#{'${kafka.regular.topic}'}",
containerFactory = "regularKafkaListenerContainerFactory"
)
public void consume(Message<String> message) {
logger.info("Regular message: {}, Regular header: {}", message.getPayload(), message.getHeaders());
}
}
Loading