Skip to content

Commit e0187a4

Browse files
committed
rumqttd: adding optional native-tls support.
Readjusting acceptor into the new task as well. That way this loop doesn't get blocked by setting up the connection.
1 parent fa4beab commit e0187a4

File tree

1 file changed

+29
-15
lines changed

1 file changed

+29
-15
lines changed

rumqttd/src/lib.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -327,38 +327,52 @@ impl Server {
327327

328328
let listener = TcpListener::bind(&addr).await?;
329329
let delay = Duration::from_millis(self.config.next_connection_delay_ms);
330-
let mut count = 0;
330+
let mut count: u32 = 0;
331331

332332
let config = Arc::new(self.config.connections.clone());
333333
let acceptor = self.tls()?;
334334
let max_incoming_size = config.max_payload_size;
335335

336336
info!("Waiting for connections on {}. Server = {}", addr, self.id);
337337
loop {
338+
// Accept incoming connection
338339
let (stream, addr) = listener.accept().await?;
339-
let network = match &acceptor {
340-
Some(acceptor) => {
341-
info!("{}. Accepting TLS connection from: {}", count, addr);
342-
let sock = acceptor.accept(stream).await?;
343-
Network::new(sock, max_incoming_size)
344-
}
345-
None => {
346-
info!("{}. Accepting TCP connection from: {}", count, addr);
347-
Network::new(stream, max_incoming_size)
348-
}
349-
};
350340

351-
count += 1;
341+
// Router tx needs to be outside
342+
let router_tx = self.router_tx.clone();
352343

344+
// Acceptor cloned
345+
let acceptor = acceptor.clone();
346+
347+
// Cloneconfig
353348
let config = config.clone();
354-
let router_tx = self.router_tx.clone();
355-
task::spawn(async {
349+
350+
// Then spawn a new thread to handle the connection
351+
task::spawn(async move {
352+
let network = match acceptor {
353+
Some(acceptor) => {
354+
info!("{}. Accepting TLS connection from: {}", count, addr);
355+
let sock = acceptor.accept(stream).await.unwrap();
356+
Network::new(sock, max_incoming_size)
357+
}
358+
None => {
359+
info!("{}. Accepting TCP connection from: {}", count, addr);
360+
Network::new(stream, max_incoming_size)
361+
}
362+
};
363+
364+
let config = config.clone();
365+
356366
let connector = Connector::new(config, router_tx);
357367
if let Err(e) = connector.new_connection(network).await {
358368
error!("Dropping link task!! Result = {:?}", e);
359369
}
360370
});
361371

372+
// Increment count
373+
count += 1;
374+
375+
// Wait a certain amount between connection attempts.
362376
time::sleep(delay).await;
363377
}
364378
}

0 commit comments

Comments
 (0)