Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
kafka: check reachability + collect metrics (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Oct 27, 2023
1 parent e6ea52f commit fb6fa7c
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 14 deletions.
7 changes: 4 additions & 3 deletions capture-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct Config {
#[envconfig(default = "127.0.0.1:3000")]
address: SocketAddr,
redis_url: String,

kafka_hosts: String,
kafka_topic: String,
}
Expand All @@ -34,6 +35,9 @@ async fn shutdown() {

#[tokio::main]
async fn main() {
// initialize tracing
tracing_subscriber::fmt::init();

let config = Config::init_from_env().expect("Invalid configuration:");

let redis_client =
Expand Down Expand Up @@ -62,9 +66,6 @@ async fn main() {
)
};

// initialize tracing
tracing_subscriber::fmt::init();

// run our app with hyper
// `axum::Server` is a re-export of `hyper::Server`

Expand Down
8 changes: 2 additions & 6 deletions capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ pub async fn event(
headers: HeaderMap,
body: Bytes,
) -> Result<Json<CaptureResponse>, CaptureError> {
tracing::debug!(len = body.len(), "new event request");

let events = match headers
.get("content-type")
.map_or("", |v| v.to_str().unwrap_or(""))
Expand All @@ -47,8 +45,6 @@ pub async fn event(
_ => RawEvent::from_bytes(&meta, body),
}?;

tracing::debug!("got events {:?}", &events);

if events.is_empty() {
return Err(CaptureError::EmptyBatch);
}
Expand Down Expand Up @@ -98,7 +94,7 @@ pub async fn event(
}));
}

tracing::debug!("got context {:?}", &context);
tracing::debug!(context=?context, events=?events, "decoded request");

process_events(state.sink.clone(), &events, &context).await?;

Expand Down Expand Up @@ -169,7 +165,7 @@ pub async fn process_events<'a>(
.map(|e| process_single_event(e, context))
.collect::<Result<Vec<ProcessedEvent>, CaptureError>>()?;

println!("Processed events: {:?}", events);
tracing::debug!(events=?events, "processed {} events", events.len());

if events.len() == 1 {
sink.send(events[0].clone()).await?;
Expand Down
78 changes: 73 additions & 5 deletions capture/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use async_trait::async_trait;
use metrics::{counter, histogram};
use metrics::{absolute_counter, counter, gauge, histogram};
use std::time::Duration;
use tokio::task::JoinSet;

use crate::api::CaptureError;
use rdkafka::config::ClientConfig;
use rdkafka::error::RDKafkaErrorCode;
use rdkafka::producer::future_producer::{FutureProducer, FutureRecord};
use rdkafka::producer::Producer;
use rdkafka::util::Timeout;
use tracing::info;

use crate::event::ProcessedEvent;

Expand Down Expand Up @@ -39,25 +43,89 @@ impl EventSink for PrintSink {
}
}

struct KafkaContext;

impl rdkafka::ClientContext for KafkaContext {
fn stats(&self, stats: rdkafka::Statistics) {
gauge!("capture_kafka_callback_queue_depth", stats.replyq as f64);
gauge!("capture_kafka_producer_queue_depth", stats.msg_cnt as f64);
gauge!(
"capture_kafka_producer_queue_depth_limit",
stats.msg_max as f64
);
gauge!("capture_kafka_producer_queue_bytes", stats.msg_max as f64);
gauge!(
"capture_kafka_producer_queue_bytes_limit",
stats.msg_size_max as f64
);

for (topic, stats) in stats.topics {
gauge!(
"capture_kafka_produce_avg_batch_size_bytes",
stats.batchsize.avg as f64,
"topic" => topic.clone()
);
gauge!(
"capture_kafka_produce_avg_batch_size_events",
stats.batchcnt.avg as f64,
"topic" => topic
);
}

for (_, stats) in stats.brokers {
let id_string = format!("{}", stats.nodeid);
gauge!(
"capture_kafka_broker_requests_pending",
stats.outbuf_cnt as f64,
"broker" => id_string.clone()
);
gauge!(
"capture_kafka_broker_responses_awaiting",
stats.waitresp_cnt as f64,
"broker" => id_string.clone()
);
absolute_counter!(
"capture_kafka_broker_tx_errors_total",
stats.txerrs,
"broker" => id_string.clone()
);
absolute_counter!(
"capture_kafka_broker_rx_errors_total",
stats.rxerrs,
"broker" => id_string
);
}
}
}

#[derive(Clone)]
pub struct KafkaSink {
producer: FutureProducer,
producer: FutureProducer<KafkaContext>,
topic: String,
}

impl KafkaSink {
pub fn new(topic: String, brokers: String) -> anyhow::Result<KafkaSink> {
let producer: FutureProducer = ClientConfig::new()
info!("connecting to Kafka brokers at {}...", brokers);
let producer: FutureProducer<KafkaContext> = ClientConfig::new()
.set("bootstrap.servers", &brokers)
.create()?;
.set("statistics.interval.ms", "10000")
.create_with_context(KafkaContext)?;

// Ping the cluster to make sure we can reach brokers
_ = producer.client().fetch_metadata(
Some("__consumer_offsets"),
Timeout::After(Duration::new(10, 0)),
)?;
info!("connected to Kafka brokers");

Ok(KafkaSink { producer, topic })
}
}

impl KafkaSink {
async fn kafka_send(
producer: FutureProducer,
producer: FutureProducer<KafkaContext>,
topic: String,
event: ProcessedEvent,
) -> Result<(), CaptureError> {
Expand Down

0 comments on commit fb6fa7c

Please sign in to comment.