-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.rs
68 lines (61 loc) · 1.68 KB
/
consumer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#[path = "common/lib.rs"]
mod common;
use std::path::PathBuf;
use clap::Parser;
use flexi_logger::{colored_detailed_format, Logger};
use futures::StreamExt;
use signal_hook::consts::{SIGINT, SIGQUIT, SIGTERM};
use signal_hook_tokio::Signals;
use tokio::select;
use crate::common::client_certs;
use bud_client::{client::ClientBuilder, consumer::SubscribeBuilder};
#[derive(clap::Parser)]
struct Args {
#[arg(short, default_value = "./certs")]
certs: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
// logger init
Logger::try_with_str("trace, mio=off, rustls=off")
.unwrap()
.format(colored_detailed_format)
.start()
.unwrap();
let mut client = ClientBuilder::new(
"127.0.0.1:9080".parse()?,
"localhost",
client_certs(args.certs),
)
.keepalive(10000)
.build()
.await?;
let mut consumer = client
.new_consumer(
"test-consumer",
SubscribeBuilder::new("test-topic", "test-subscription").build(),
)
.await?;
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
let handle = signals.handle();
loop {
select! {
msg = consumer.next() => {
let Some(message) = msg else {
break
};
consumer.ack(&message.id).await?;
let s = String::from_utf8(message.payload.to_vec())?;
println!("received a message: {s}");
}
_ = signals.next() => {
break
}
}
}
handle.close();
consumer.close().await?;
client.close().await?;
Ok(())
}