Skip to content

Commit

Permalink
Merge pull request #43 from reiseburo/more-perf
Browse files Browse the repository at this point in the history
Handle the producing of each message to Kafka in a task
  • Loading branch information
rtyler authored Jun 23, 2020
2 parents c2200b4 + 900a22d commit 78fda3a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 52 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hotdog"
version = "0.3.2"
version = "0.3.3"
authors = ["R. Tyler Croy <[email protected]>"]
edition = "2018"

Expand Down
106 changes: 56 additions & 50 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::status::{Statistic, Stats};
* The Kafka module contains all the tooling/code necessary for connecting hotdog to Kafka for
* sending log lines along as Kafka messages
*/
use async_std::task;
use async_std::sync::{channel, Receiver, Sender};
use log::*;
use rdkafka::client::DefaultClientContext;
Expand Down Expand Up @@ -140,64 +141,69 @@ impl Kafka {
let start_time = Instant::now();
let producer = producer.clone();

// TODO: What if this is a task::spawn for each message, would that be too much
// overhead?

let record = FutureRecord::<String, String>::to(&kmsg.topic).payload(&kmsg.msg);
/*
* Intentionally setting the timeout_ms to -1 here so this blocks forever if the
* outbound librdkafka queue is full. This will block up the crossbeam channel
* properly and cause messages to begin to be dropped, rather than buffering
* "forever" inside of hotdog
* Needed in order to prevent concurrent writers from totally
* killing parallel performance
*/
if let Ok(delivery_result) = producer.send(record, -1 as i64).await {
match delivery_result {
Ok(_) => {
stats
.send((Stats::KafkaMsgSubmitted { topic: kmsg.topic }, 1))
.await;
/*
* dipstick only supports u64 timers anyways, but as_micros() can
* give a u128 (!).
*/
if let Ok(elapsed) = start_time.elapsed().as_micros().try_into() {
stats.send((Stats::KafkaMsgSent, elapsed)).await;
} else {
error!("Could not collect message time because the duration couldn't fit in an i64, yikes");
}
}
Err((err, _)) => {
match err {
task::yield_now().await;

task::spawn(async move {
let record = FutureRecord::<String, String>::to(&kmsg.topic).payload(&kmsg.msg);
/*
* Intentionally setting the timeout_ms to -1 here so this blocks forever if the
* outbound librdkafka queue is full. This will block up the crossbeam channel
* properly and cause messages to begin to be dropped, rather than buffering
* "forever" inside of hotdog
*/
if let Ok(delivery_result) = producer.send(record, -1 as i64).await {
match delivery_result {
Ok(_) => {
stats
.send((Stats::KafkaMsgSubmitted { topic: kmsg.topic }, 1))
.await;
/*
* err_type will be one of RdKafkaError types defined:
* https://docs.rs/rdkafka/0.23.1/rdkafka/error/enum.RDKafkaError.html
*/
KafkaError::MessageProduction(err_type) => {
error!("Failed to send message to Kafka due to: {}", err_type);
stats
.send((
Stats::KafkaMsgErrored {
errcode: metric_name_for(err_type),
},
1,
))
.await;
* dipstick only supports u64 timers anyways, but as_micros() can
* give a u128 (!).
*/
if let Ok(elapsed) = start_time.elapsed().as_micros().try_into() {
stats.send((Stats::KafkaMsgSent, elapsed)).await;
} else {
error!("Could not collect message time because the duration couldn't fit in an i64, yikes");
}
_ => {
error!("Failed to send message to Kafka!");
stats
.send((
Stats::KafkaMsgErrored {
errcode: String::from("generic"),
},
1,
))
.await;
}
Err((err, _)) => {
match err {
/*
* err_type will be one of RdKafkaError types defined:
* https://docs.rs/rdkafka/0.23.1/rdkafka/error/enum.RDKafkaError.html
*/
KafkaError::MessageProduction(err_type) => {
error!("Failed to send message to Kafka due to: {}", err_type);
stats
.send((
Stats::KafkaMsgErrored {
errcode: metric_name_for(err_type),
},
1,
))
.await;
}
_ => {
error!("Failed to send message to Kafka!");
stats
.send((
Stats::KafkaMsgErrored {
errcode: String::from("generic"),
},
1,
))
.await;
}
}
}
}
}
}
});
}
}
}
Expand Down

0 comments on commit 78fda3a

Please sign in to comment.