Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
fail init if kafka cluster unreachable
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Oct 27, 2023
1 parent d02c909 commit ebbf382
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
6 changes: 3 additions & 3 deletions capture-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ async fn shutdown() {

#[tokio::main]
async fn main() {
// initialize tracing
tracing_subscriber::fmt::init();

let config = Config::init_from_env().expect("Invalid configuration:");

let redis_client =
Expand Down Expand Up @@ -63,9 +66,6 @@ async fn main() {
)
};

// initialize tracing
tracing_subscriber::fmt::init();

// run our app with hyper
// `axum::Server` is a re-export of `hyper::Server`

Expand Down
12 changes: 12 additions & 0 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use async_trait::async_trait;
use metrics::{absolute_counter, counter, gauge, histogram};
use std::time::Duration;
use tokio::task::JoinSet;

use crate::api::CaptureError;
use rdkafka::config::ClientConfig;
use rdkafka::error::RDKafkaErrorCode;
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::Producer;
use rdkafka::util::Timeout;
use tracing::info;

use crate::event::ProcessedEvent;

Expand Down Expand Up @@ -102,11 +106,19 @@ pub struct KafkaSink {

impl KafkaSink {
pub fn new(topic: String, brokers: String) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", brokers);
let producer: FutureProducer<KafkaContext> = ClientConfig::new()
.set("bootstrap.servers", &brokers)
.set("statistics.interval.ms", "10000")
.create_with_context(KafkaContext)?;

// Ping the cluster to make sure we can reach brokers
_ = producer.client().fetch_metadata(
Some("__consumer_offsets"),
Timeout::After(Duration::new(10, 0)),
)?;
info!("connected to Kafka brokers");

Ok(KafkaSink { producer, topic })
}
}
Expand Down

0 comments on commit ebbf382

Please sign in to comment.