From ebbf38280ccb2e848552780f5175ea2c2c12d136 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Fri, 27 Oct 2023 17:42:07 +0200 Subject: [PATCH] fail init if kafka cluster unreachable --- capture-server/src/main.rs | 6 +++--- capture/src/sink.rs | 12 ++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/capture-server/src/main.rs b/capture-server/src/main.rs index 86684b4..1f659c8 100644 --- a/capture-server/src/main.rs +++ b/capture-server/src/main.rs @@ -35,6 +35,9 @@ async fn shutdown() { #[tokio::main] async fn main() { + // initialize tracing + tracing_subscriber::fmt::init(); + let config = Config::init_from_env().expect("Invalid configuration:"); let redis_client = @@ -63,9 +66,6 @@ async fn main() { ) }; - // initialize tracing - tracing_subscriber::fmt::init(); - // run our app with hyper // `axum::Server` is a re-export of `hyper::Server` diff --git a/capture/src/sink.rs b/capture/src/sink.rs index 54d765d..8de9ff1 100644 --- a/capture/src/sink.rs +++ b/capture/src/sink.rs @@ -1,11 +1,15 @@ use async_trait::async_trait; use metrics::{absolute_counter, counter, gauge, histogram}; +use std::time::Duration; use tokio::task::JoinSet; use crate::api::CaptureError; use rdkafka::config::ClientConfig; use rdkafka::error::RDKafkaErrorCode; use rdkafka::producer::future_producer::{FutureProducer, FutureRecord}; +use rdkafka::producer::Producer; +use rdkafka::util::Timeout; +use tracing::info; use crate::event::ProcessedEvent; @@ -102,11 +106,19 @@ pub struct KafkaSink { impl KafkaSink { pub fn new(topic: String, brokers: String) -> anyhow::Result { + info!("connecting to Kafka brokers at {}...", brokers); let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", &brokers) .set("statistics.interval.ms", "10000") .create_with_context(KafkaContext)?; + // Ping the cluster to make sure we can reach brokers + _ = producer.client().fetch_metadata( + Some("__consumer_offsets"), + Timeout::After(Duration::new(10, 0)), + )?; + info!("connected to Kafka brokers"); + Ok(KafkaSink { producer, topic }) } }