Skip to content

Commit d159783

Browse files
authored
Merge pull request #190 from benesch/msg-send-sync
Implement Send + Sync on BorrowedMessage
2 parents 4b51170 + 2779f77 commit d159783

File tree

2 files changed

+73
-37
lines changed

2 files changed

+73
-37
lines changed

examples/asynchronous_processing.rs

Lines changed: 70 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
11
use std::thread;
22
use std::time::Duration;
33

4-
use clap::{App, Arg};
5-
use futures::{future, TryStreamExt};
6-
use log::{info, warn};
4+
use clap::{App, Arg, value_t};
5+
use futures::{StreamExt, TryStreamExt};
6+
use futures::stream::FuturesUnordered;
7+
use log::info;
78

89
use rdkafka::config::ClientConfig;
910
use rdkafka::consumer::stream_consumer::StreamConsumer;
1011
use rdkafka::consumer::Consumer;
11-
use rdkafka::message::OwnedMessage;
12+
use rdkafka::message::{BorrowedMessage, OwnedMessage};
1213
use rdkafka::producer::{FutureProducer, FutureRecord};
1314
use rdkafka::Message;
1415

1516
use crate::example_utils::setup_logger;
1617

1718
mod example_utils;
1819

20+
async fn record_message_receipt(msg: &BorrowedMessage<'_>) {
21+
// Simulate some work that must be done in the same order as messages are
22+
// received; i.e., before truly parallel processing can begin.
23+
info!("Message received: {}", msg.offset());
24+
}
25+
1926
// Emulates an expensive, synchronous computation.
2027
fn expensive_computation<'a>(msg: OwnedMessage) -> String {
2128
info!("Starting expensive computation on message {}", msg.offset());
@@ -39,58 +46,65 @@ fn expensive_computation<'a>(msg: OwnedMessage) -> String {
3946
// `tokio::spawn` is used to handle IO-bound tasks in parallel (e.g., producing
4047
// the messages), while `tokio::task::spawn_blocking` is used to handle the
4148
// simulated CPU-bound task.
42-
async fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) {
49+
async fn run_async_processor(
50+
brokers: String,
51+
group_id: String,
52+
input_topic: String,
53+
output_topic: String,
54+
) {
4355
// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
4456
let consumer: StreamConsumer = ClientConfig::new()
45-
.set("group.id", group_id)
46-
.set("bootstrap.servers", brokers)
57+
.set("group.id", &group_id)
58+
.set("bootstrap.servers", &brokers)
4759
.set("enable.partition.eof", "false")
4860
.set("session.timeout.ms", "6000")
4961
.set("enable.auto.commit", "false")
5062
.create()
5163
.expect("Consumer creation failed");
5264

5365
consumer
54-
.subscribe(&[input_topic])
66+
.subscribe(&[&input_topic])
5567
.expect("Can't subscribe to specified topic");
5668

5769
// Create the `FutureProducer` to produce asynchronously.
5870
let producer: FutureProducer = ClientConfig::new()
59-
.set("bootstrap.servers", brokers)
71+
.set("bootstrap.servers", &brokers)
6072
.set("message.timeout.ms", "5000")
6173
.create()
6274
.expect("Producer creation error");
6375

6476
// Create the outer pipeline on the message stream.
6577
let stream_processor = consumer.start().try_for_each(|borrowed_message| {
66-
// Process each message
67-
info!("Message received: {}", borrowed_message.offset());
68-
// Borrowed messages can't outlive the consumer they are received from, so they need to
69-
// be owned in order to be sent to a separate thread.
70-
let owned_message = borrowed_message.detach();
71-
let output_topic = output_topic.to_string();
7278
let producer = producer.clone();
73-
tokio::spawn(async move {
74-
// The body of this block will be executed on the main thread pool,
75-
// but we perform `expensive_computation` on a separate thread pool
76-
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
77-
let computation_result =
78-
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
79-
.await
80-
.expect("failed to wait for expensive computation");
81-
let produce_future = producer.send(
82-
FutureRecord::to(&output_topic)
83-
.key("some key")
84-
.payload(&computation_result),
85-
0,
86-
);
87-
match produce_future.await {
88-
Ok(Ok(delivery)) => println!("Sent: {:?}", delivery),
89-
Ok(Err((e, _))) => println!("Error: {:?}", e),
90-
Err(_) => println!("Future cancelled"),
91-
}
92-
});
93-
future::ready(Ok(()))
79+
let output_topic = output_topic.to_string();
80+
async move {
81+
// Process each message
82+
record_message_receipt(&borrowed_message).await;
83+
// Borrowed messages can't outlive the consumer they are received from, so they need to
84+
// be owned in order to be sent to a separate thread.
85+
let owned_message = borrowed_message.detach();
86+
tokio::spawn(async move {
87+
// The body of this block will be executed on the main thread pool,
88+
// but we perform `expensive_computation` on a separate thread pool
89+
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
90+
let computation_result =
91+
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
92+
.await
93+
.expect("failed to wait for expensive computation");
94+
let produce_future = producer.send(
95+
FutureRecord::to(&output_topic)
96+
.key("some key")
97+
.payload(&computation_result),
98+
0,
99+
);
100+
match produce_future.await {
101+
Ok(Ok(delivery)) => println!("Sent: {:?}", delivery),
102+
Ok(Err((e, _))) => println!("Error: {:?}", e),
103+
Err(_) => println!("Future cancelled"),
104+
}
105+
});
106+
Ok(())
107+
}
94108
});
95109

96110
info!("Starting event loop");
@@ -139,6 +153,13 @@ async fn main() {
139153
.takes_value(true)
140154
.required(true),
141155
)
156+
.arg(
157+
Arg::with_name("num-workers")
158+
.long("num-workers")
159+
.help("Number of workers")
160+
.takes_value(true)
161+
.default_value("1"),
162+
)
142163
.get_matches();
143164

144165
setup_logger(true, matches.value_of("log-conf"));
@@ -147,6 +168,18 @@ async fn main() {
147168
let group_id = matches.value_of("group-id").unwrap();
148169
let input_topic = matches.value_of("input-topic").unwrap();
149170
let output_topic = matches.value_of("output-topic").unwrap();
171+
let num_workers = value_t!(matches, "num-workers", usize).unwrap();
150172

151-
run_async_processor(brokers, group_id, input_topic, output_topic).await
173+
(0..num_workers)
174+
.map(|_| {
175+
tokio::spawn(run_async_processor(
176+
brokers.to_owned(),
177+
group_id.to_owned(),
178+
input_topic.to_owned(),
179+
output_topic.to_owned(),
180+
))
181+
})
182+
.collect::<FuturesUnordered<_>>()
183+
.for_each(|_| async { () })
184+
.await
152185
}

src/message.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,9 @@ impl<'a> Drop for BorrowedMessage<'a> {
362362
}
363363
}
364364

365+
unsafe impl<'a> Send for BorrowedMessage<'a> {}
366+
unsafe impl<'a> Sync for BorrowedMessage<'a> {}
367+
365368
//
366369
// ********** OWNED MESSAGE **********
367370
//

0 commit comments

Comments
 (0)