Skip to content

Commit

Permalink
Keepalive and nodelay support. (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
jq-rs authored Feb 9, 2024
1 parent 0b5541f commit 73f35dc
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 32 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to Mles project will be documented in this file after 1.0-re
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [2.2.0]

Keepalive and no delay support.

## [2.1.0]

Http port 80 redirect support.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mles"
version = "2.1.0"
version = "2.2.0"
authors = ["jq-rs"]
edition = "2021"

Expand Down
58 changes: 27 additions & 31 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ use siphasher::sip::SipHasher;
use std::collections::VecDeque;
use std::collections::{hash_map::Entry, HashMap};
use std::hash::{Hash, Hasher};
use std::io;
use std::net::Ipv6Addr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::net::TcpSocket;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -81,6 +82,7 @@ const WS_BUF: usize = 128;
const HISTORY_LIMIT: &str = "200";
const TLS_PORT: &str = "443";
const PING_INTERVAL: u64 = 12000;
const BACKLOG: u32 = 1024;

#[derive(Debug)]
enum WsEvent {
Expand Down Expand Up @@ -109,17 +111,24 @@ fn add_message(msg: Message, limit: u32, queue: &mut VecDeque<Message>) {
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
async fn main() -> io::Result<()> {
simple_logger::init_with_env().unwrap();
let args = Args::parse();
let limit = args.limit;
let www_root_dir = args.wwwroot;

let (tx, rx) = mpsc::channel::<WsEvent>(TASK_BUF);
let mut rx = ReceiverStream::new(rx);
let tcp_listener = tokio::net::TcpListener::bind((Ipv6Addr::UNSPECIFIED, args.port))
.await

let addr = format!("[{}]:{}", Ipv6Addr::UNSPECIFIED, args.port)
.parse()
.unwrap();
let socket = TcpSocket::new_v6()?;
socket.set_keepalive(true)?;
socket.set_nodelay(true)?;
socket.bind(addr)?;

let tcp_listener = socket.listen(BACKLOG)?;
let tcp_incoming = TcpListenerStream::new(tcp_listener);

let tls_incoming = AcmeConfig::new(args.domains.clone())
Expand Down Expand Up @@ -343,16 +352,8 @@ async fn main() {
let redirect = warp::get()
.and(warp::header::<String>("host"))
.and(warp::path::tail())
.and(warp::addr::remote())
.map(move |uri: String, path: warp::path::Tail, addr: Option<SocketAddr>| {
(
uri,
domain.clone(),
path,
addr,
)
})
.and_then(dyn_hreply);
.map(move |uri: String, path: warp::path::Tail| (uri, domain.clone(), path))
.and_then(dyn_hreply);
http_index.push(redirect);
}

Expand All @@ -362,25 +363,24 @@ async fn main() {
}

tokio::spawn(async move {
warp::serve(hindex).run(([0, 0, 0, 0, 0, 0, 0, 0], 80)).await;
warp::serve(hindex)
.run(([0, 0, 0, 0, 0, 0, 0, 0], 80))
.await;
});
}


let mut vindex = Vec::new();
for domain in args.domains {
let www_root = www_root_dir.clone();
let index = warp::get()
.and(warp::header::<String>("host"))
.and(warp::path::tail())
.and(warp::addr::remote())
.map(move |uri: String, path: warp::path::Tail, addr: Option<SocketAddr>| {
.map(move |uri: String, path: warp::path::Tail| {
(
uri,
domain.clone(),
www_root.to_str().unwrap().to_string(),
path,
addr,
)
})
.and_then(dyn_reply);
Expand All @@ -399,28 +399,24 @@ async fn main() {
}

async fn dyn_hreply(
tuple: (String, String, warp::path::Tail, Option<SocketAddr>),
tuple: (String, String, warp::path::Tail),
) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
let (uri, domain, tail, addr) = tuple;

log::debug!("redirecting, uri {uri}, http remote {:?}", addr);
let (uri, domain, tail) = tuple;

if uri != domain {
return Err(warp::reject::not_found());
}

Ok(Box::new(warp::redirect::redirect(
warp::http::Uri::from_str(&format!("https://{}/{}", &domain, tail.as_str()))
.expect("problem with uri?"),
)))
warp::http::Uri::from_str(&format!("https://{}/{}", &domain, tail.as_str()))
.expect("problem with uri?"),
)))
}

async fn dyn_reply(
tuple: (String, String, String, warp::path::Tail, Option<SocketAddr>),
tuple: (String, String, String, warp::path::Tail),
) -> Result<Box<dyn warp::Reply>, warp::Rejection> {
let (uri, domain, www_root, tail, addr) = tuple;

log::debug!("Remote {:?}", addr);
let (uri, domain, www_root, tail) = tuple;

if uri != domain {
return Err(warp::reject::not_found());
Expand Down

0 comments on commit 73f35dc

Please sign in to comment.