Skip to content

Commit f3c8924

Browse files
committed
rumqttd: adding optional native-tls support.
By using the `use-native-tls` feature, this crate can now use tokio-native-tls vs tokio-rustls. Changed: * CI tests to incorporate disparate features (rustls vs native-tls) * Made certain rustls includes to be conditional in rumqttd * How errors are handled in main loop. Otherwise process loop exits silently. Added: * Notes to Readme about adding native-tls * Added separate tls() function in rumqttd for native-tls * Added use of tokio-native-tls
1 parent 01be5c2 commit f3c8924

File tree

4 files changed

+125
-10
lines changed

4 files changed

+125
-10
lines changed

.github/workflows/features.yml

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ on:
99
name: features
1010

1111
jobs:
12-
test:
13-
name: Rumqtt build and test
12+
test-rustls:
13+
name: Rumqtt build and test
1414
# runs-on: [self-hosted, linux, X64, rumqtt]
1515
runs-on: ubuntu-18.04
1616
steps:
@@ -21,6 +21,9 @@ jobs:
2121
- uses: actions-rs/cargo@v1
2222
with:
2323
command: test
24-
args: --release --all-features
25-
26-
24+
args: --release
25+
# Test rumqttd native-tls support
26+
- uses: actions-rs/cargo@v1
27+
with:
28+
command: test
29+
args: --release --features use-native-tls --manifest-path rumqttd/Cargo.toml --no-default-features

rumqttd/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ name = "rumqttd"
1818
path = "src/bin.rs"
1919

2020
[features]
21+
default = ["use-rustls"]
2122
prof = ["pprof"]
23+
use-rustls = ["tokio-rustls"]
24+
use-native-tls = ["tokio-native-tls"]
2225

2326
[dependencies]
2427
rumqttlog = { path = "../rumqttlog", version = "0.5"}
2528
mqttbytes = { path = "../mqttbytes", version = "0.2" }
2629
tokio = { version = "1.0", features = ["full"] }
27-
tokio-rustls = "0.22"
2830
serde = { version = "1", features = ["derive"] }
2931
log = "0.4"
3032
thiserror = "1"
@@ -36,5 +38,9 @@ warp = "0.3"
3638
futures-util = "0.3.8"
3739
pprof = { version = "0.4", features = ["flamegraph", "protobuf"], optional = true }
3840

41+
# Optional
42+
tokio-rustls = { version = "0.22", optional = true }
43+
tokio-native-tls = { version = "0.3", optional = true }
44+
3945
[target.'cfg(not(target_env = "msvc"))'.dependencies]
4046
jemallocator = "0.3"

rumqttd/README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,32 @@
22

33
[![crates.io page](https://img.shields.io/crates/v/rumqttd.svg)](https://crates.io/crates/rumqttd)
44
[![docs.rs page](https://docs.rs/rumqttd/badge.svg)](https://docs.rs/rumqttd)
5+
6+
## `native-tls` support
7+
8+
This crate, by default uses the `tokio-rustls` crate. There's also support for the `tokio-native-tls` crate.
9+
Add it to your Cargo.toml like so:
10+
11+
```
12+
rumqttd = { version = "0.5", default-features = false, features = ["use-native-tls"] }
13+
```
14+
15+
Then in your config file make sure that you use the `pkcs12` options for your cert instead of `cert_path`, `key_path`, etc.
16+
17+
```toml
18+
[rumqtt.servers.1]
19+
port = 8883
20+
pkcs12_path = "/root/identity.pfx"
21+
pkcs12_pass = ""
22+
# cert_path = "/root/issued/test.crt"
23+
# key_path = "/root/private/test.key"
24+
# ca_path = "/root/ca.crt"
25+
```
26+
27+
You can generate the `.p12`/`.pfx` file using `openssl`:
28+
29+
```
30+
openssl pkcs12 -export -out identity.pfx -inkey ~/pki/private/test.key -in ~/pki/issued/test.crt -certfile ~/pki/ca.crt
31+
```
32+
33+
Make sure if you use a password it matches the entry in `pkcs12_pass`. If no password, use an empty string `""`.

rumqttd/src/lib.rs

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
extern crate log;
33

44
use serde::{Deserialize, Serialize};
5-
use std::{net::SocketAddr, sync::Arc};
65
use std::time::Duration;
76
use std::{io, thread};
7+
use std::{net::SocketAddr, sync::Arc};
88

99
use mqttbytes::v4::Packet;
1010
use rumqttlog::*;
@@ -15,12 +15,25 @@ use crate::remotelink::RemoteLink;
1515
use tokio::io::{AsyncRead, AsyncWrite};
1616
use tokio::net::TcpListener;
1717
use tokio::{task, time};
18+
19+
// All requirements for `rustls`
20+
#[cfg(feature = "use-rustls")]
1821
use tokio_rustls::rustls::internal::pemfile::{certs, rsa_private_keys};
22+
#[cfg(feature = "use-rustls")]
1923
use tokio_rustls::rustls::{
2024
AllowAnyAuthenticatedClient, NoClientAuth, RootCertStore, ServerConfig, TLSError,
2125
};
26+
#[cfg(feature = "use-rustls")]
2227
use tokio_rustls::TlsAcceptor;
2328

29+
// All requirements for `native-tls`
30+
#[cfg(feature = "use-native-tls")]
31+
use std::io::Read;
32+
#[cfg(feature = "use-native-tls")]
33+
use tokio_native_tls::native_tls::Error as TLSError;
34+
#[cfg(feature = "use-native-tls")]
35+
use tokio_native_tls::{native_tls, TlsAcceptor};
36+
2437
pub mod async_locallink;
2538
mod consolelink;
2639
mod locallink;
@@ -31,9 +44,11 @@ mod state;
3144
use crate::consolelink::ConsoleLink;
3245
pub use crate::locallink::{LinkError, LinkRx, LinkTx};
3346
use crate::network::Network;
47+
#[cfg(feature = "use-rustls")]
3448
use crate::Error::ServerKeyNotFound;
3549
use std::collections::HashMap;
3650
use std::fs::File;
51+
#[cfg(feature = "use-rustls")]
3752
use std::io::BufReader;
3853

3954
#[derive(Debug, thiserror::Error)]
@@ -87,6 +102,10 @@ pub struct Config {
87102
#[derive(Debug, Serialize, Deserialize, Clone)]
88103
pub struct ServerSettings {
89104
pub listen: SocketAddr,
105+
/// Used only for native-tls implementation
106+
pub pkcs12_path: Option<String>,
107+
/// Used only for native-tls implementation
108+
pub pkcs12_pass: Option<String>,
90109
pub ca_path: Option<String>,
91110
pub cert_path: Option<String>,
92111
pub key_path: Option<String>,
@@ -215,6 +234,40 @@ impl Server {
215234
}
216235
}
217236

237+
#[cfg(feature = "use-native-tls")]
238+
fn tls(&self) -> Result<Option<TlsAcceptor>, Error> {
239+
match (
240+
self.config.pkcs12_path.clone(),
241+
self.config.pkcs12_pass.clone(),
242+
) {
243+
(Some(cert), Some(password)) => {
244+
// Get certificates
245+
let cert_file = File::open(&cert);
246+
let mut cert_file =
247+
cert_file.map_err(|_| Error::ServerCertNotFound(cert.clone()))?;
248+
249+
// Read cert into memory
250+
let mut buf = Vec::new();
251+
cert_file
252+
.read_to_end(&mut buf)
253+
.map_err(|_| Error::InvalidServerCert(cert.clone()))?;
254+
255+
// Get the identity
256+
let identity = native_tls::Identity::from_pkcs12(&buf, &password)
257+
.map_err(|_| Error::InvalidServerCert(cert.clone()))?;
258+
259+
// Builder
260+
let builder = native_tls::TlsAcceptor::builder(identity).build()?;
261+
262+
// Create acceptor
263+
let acceptor = TlsAcceptor::from(builder);
264+
Ok(Some(acceptor))
265+
}
266+
_ => Ok(None),
267+
}
268+
}
269+
270+
#[cfg(feature = "use-rustls")]
218271
fn tls(&self) -> Result<Option<TlsAcceptor>, Error> {
219272
let (certs, key) = match self.config.cert_path.clone() {
220273
Some(cert) => {
@@ -271,13 +324,35 @@ impl Server {
271324
let acceptor = self.tls()?;
272325
let max_incoming_size = config.max_payload_size;
273326

274-
info!("Waiting for connections on {}. Server = {}", self.config.listen, self.id);
327+
info!(
328+
"Waiting for connections on {}. Server = {}",
329+
self.config.listen, self.id
330+
);
275331
loop {
276-
let (stream, addr) = listener.accept().await?;
332+
// Await new network connection.
333+
let (stream, addr) = match listener.accept().await {
334+
Ok((s, r)) => (s, r),
335+
Err(_e) => {
336+
error!("Unable to accept socket.");
337+
continue;
338+
}
339+
};
340+
341+
// Depending on TLS or not create a new Network
277342
let network = match &acceptor {
278343
Some(acceptor) => {
279344
info!("{}. Accepting TLS connection from: {}", count, addr);
280-
Network::new(acceptor.accept(stream).await?, max_incoming_size)
345+
let sock = match acceptor.accept(stream).await {
346+
Ok(s) => s,
347+
Err(_e) => {
348+
error!(
349+
"{} Unable to accept incoming TLS connection from {}",
350+
count, addr
351+
);
352+
continue;
353+
}
354+
};
355+
Network::new(sock, max_incoming_size)
281356
}
282357
None => {
283358
info!("{}. Accepting TCP connection from: {}", count, addr);
@@ -289,6 +364,8 @@ impl Server {
289364

290365
let config = config.clone();
291366
let router_tx = self.router_tx.clone();
367+
368+
// Spawn a new thread to handle this connection.
292369
task::spawn(async {
293370
let connector = Connector::new(config, router_tx);
294371
if let Err(e) = connector.new_connection(network).await {

0 commit comments

Comments
 (0)