Skip to content

Commit 2cbee48

Browse files
committed
rumqttd: adding optional native-tls support.
Readjusting acceptor into the new task as well. That way this loop doesn't get blocked by setting up the connection.
1 parent fa4beab commit 2cbee48

File tree

1 file changed

+45
-20
lines changed

1 file changed

+45
-20
lines changed

rumqttd/src/lib.rs

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ impl Server {
242242
}
243243

244244
#[cfg(feature = "use-native-tls")]
245-
fn tls(&self) -> Result<Option<TlsAcceptor>, Error> {
245+
fn tls(&self) -> Result<Option<Arc<TlsAcceptor>>, Error> {
246246
match (
247247
self.config.pkcs12_path.clone(),
248248
self.config.pkcs12_pass.clone(),
@@ -268,14 +268,14 @@ impl Server {
268268

269269
// Create acceptor
270270
let acceptor = TlsAcceptor::from(builder);
271-
Ok(Some(acceptor))
271+
Ok(Some(Arc::new(acceptor)))
272272
}
273273
_ => Ok(None),
274274
}
275275
}
276276

277277
#[cfg(feature = "use-rustls")]
278-
fn tls(&self) -> Result<Option<TlsAcceptor>, Error> {
278+
fn tls(&self) -> Result<Option<Arc<TlsAcceptor>>, Error> {
279279
let (certs, key) = match self.config.cert_path.clone() {
280280
Some(cert) => {
281281
// Get certificates
@@ -319,46 +319,71 @@ impl Server {
319319

320320
server_config.set_single_cert(certs, key)?;
321321
let acceptor = TlsAcceptor::from(Arc::new(server_config));
322-
Ok(Some(acceptor))
322+
Ok(Some(Arc::new(acceptor)))
323323
}
324324

325325
async fn start(&self) -> Result<(), Error> {
326326
let addr = format!("0.0.0.0:{}", self.config.port);
327327

328328
let listener = TcpListener::bind(&addr).await?;
329329
let delay = Duration::from_millis(self.config.next_connection_delay_ms);
330-
let mut count = 0;
330+
let mut count: u32 = 0;
331331

332332
let config = Arc::new(self.config.connections.clone());
333-
let acceptor = self.tls()?;
334333
let max_incoming_size = config.max_payload_size;
334+
let acceptor = self.tls()?;
335335

336336
info!("Waiting for connections on {}. Server = {}", addr, self.id);
337337
loop {
338+
// Accept incoming connection
338339
let (stream, addr) = listener.accept().await?;
339-
let network = match &acceptor {
340-
Some(acceptor) => {
341-
info!("{}. Accepting TLS connection from: {}", count, addr);
342-
let sock = acceptor.accept(stream).await?;
343-
Network::new(sock, max_incoming_size)
344-
}
345-
None => {
346-
info!("{}. Accepting TCP connection from: {}", count, addr);
347-
Network::new(stream, max_incoming_size)
348-
}
349-
};
350340

351-
count += 1;
341+
// Router tx needs to be outside
342+
let router_tx = self.router_tx.clone();
352343

344+
// Acceptor cloned
345+
let acceptor = acceptor.clone();
346+
347+
// Cloneconfig
353348
let config = config.clone();
354-
let router_tx = self.router_tx.clone();
355-
task::spawn(async {
349+
350+
// Then spawn a new thread to handle the connection
351+
task::spawn(async move {
352+
let network = match acceptor {
353+
Some(acceptor) => {
354+
info!("{}. Accepting TLS connection from: {}", count, addr);
355+
356+
// Handle acceptor error
357+
let sock = match acceptor.accept(stream).await {
358+
Ok(s) => s,
359+
Err(e) => {
360+
error!(
361+
"{}. Unable to acccept TLS connection. Result = {:?}",
362+
count, e
363+
);
364+
return;
365+
}
366+
};
367+
Network::new(sock, max_incoming_size)
368+
}
369+
None => {
370+
info!("{}. Accepting TCP connection from: {}", count, addr);
371+
Network::new(stream, max_incoming_size)
372+
}
373+
};
374+
375+
let config = config.clone();
376+
356377
let connector = Connector::new(config, router_tx);
357378
if let Err(e) = connector.new_connection(network).await {
358379
error!("Dropping link task!! Result = {:?}", e);
359380
}
360381
});
361382

383+
// Increment count
384+
count += 1;
385+
386+
// Wait a certain amount between connection attempts.
362387
time::sleep(delay).await;
363388
}
364389
}

0 commit comments

Comments
 (0)