Skip to content

Commit

Permalink
Reduce HttpServer trait functions
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed May 6, 2024
1 parent 733d97e commit 190bb75
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 103 deletions.
84 changes: 36 additions & 48 deletions neqo-bin/src/server/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{
cell::RefCell, collections::HashMap, fmt::Display, path::PathBuf, rc::Rc, time::Instant,
};
use std::{cell::RefCell, collections::HashMap, fmt::Display, rc::Rc, time::Instant};

use neqo_common::{event::Provider, hex, qdebug, qerror, qinfo, qwarn, Datagram};
use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay, Cipher};
use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay};
use neqo_http3::Error;
use neqo_transport::{
server::{ActiveConnectionRef, Server, ValidateAddress},
ConnectionEvent, ConnectionIdGenerator, ConnectionParameters, Output, State, StreamId,
ConnectionEvent, ConnectionIdGenerator, Output, State, StreamId,
};
use regex::Regex;

Expand All @@ -29,29 +27,44 @@ pub struct HttpServer {
server: Server,
write_state: HashMap<StreamId, HttpStreamState>,
read_state: HashMap<StreamId, Vec<u8>>,
is_qns_test: bool,
}

impl HttpServer {
pub fn new(
now: Instant,
certs: &[impl AsRef<str>],
protocols: &[impl AsRef<str>],
args: &Args,
anti_replay: AntiReplay,
cid_manager: Rc<RefCell<dyn ConnectionIdGenerator>>,
conn_params: ConnectionParameters,
) -> Result<Self, Error> {
let mut server = Server::new(
args.now(),
&[args.key.clone()],
&[args.shared.alpn.clone()],
anti_replay,
Box::new(AllowZeroRtt {}),
cid_manager,
args.shared.quic_parameters.get(&args.shared.alpn),
)?;

server.set_ciphers(&args.get_ciphers());
server.set_qlog_dir(args.shared.qlog_dir.clone());
if args.retry {
server.set_validation(ValidateAddress::Always);
}
if args.ech {
let (sk, pk) = generate_ech_keys().expect("generate ECH keys");
server
.enable_ech(random::<1>()[0], "public.example", &sk, &pk)
.expect("enable ECH");
let cfg = server.ech_config();
qinfo!("ECHConfigList: {}", hex(cfg));
}

Ok(Self {
server: Server::new(
now,
certs,
protocols,
anti_replay,
Box::new(AllowZeroRtt {}),
cid_manager,
conn_params,
)?,
server,
write_state: HashMap::new(),
read_state: HashMap::new(),
is_qns_test: args.shared.qns_test.is_some(),
})
}

Expand Down Expand Up @@ -100,12 +113,7 @@ impl HttpServer {
}
}

fn stream_readable(
&mut self,
stream_id: StreamId,
conn: &mut ActiveConnectionRef,
args: &Args,
) {
fn stream_readable(&mut self, stream_id: StreamId, conn: &mut ActiveConnectionRef) {
if !stream_id.is_client_initiated() || !stream_id.is_bidi() {
qdebug!("Stream {} not client-initiated bidi, ignoring", stream_id);
return;
Expand Down Expand Up @@ -136,7 +144,7 @@ impl HttpServer {
return;
};

let re = if args.shared.qns_test.is_some() {
let re = if self.is_qns_test {
Regex::new(r"GET +/(\S+)(?:\r)?\n").unwrap()
} else {
Regex::new(r"GET +/(\d+)(?:\r)?\n").unwrap()
Expand All @@ -150,7 +158,7 @@ impl HttpServer {
let resp = {
let path = path.as_str();
qdebug!("Path = '{path}'");
if args.shared.qns_test.is_some() {
if self.is_qns_test {
match qns_read_response(path) {
Ok(data) => Some(data),
Err(e) => {
Expand Down Expand Up @@ -199,7 +207,7 @@ impl super::HttpServer for HttpServer {
self.server.process(dgram, now)
}

fn process_events(&mut self, args: &Args, now: Instant) {
fn process_events(&mut self, now: Instant) {
let active_conns = self.server.active_connections();
for mut acr in active_conns {
loop {
Expand All @@ -213,7 +221,7 @@ impl super::HttpServer for HttpServer {
.insert(stream_id, HttpStreamState::default());
}
ConnectionEvent::RecvStreamReadable { stream_id } => {
self.stream_readable(stream_id, &mut acr, args);
self.stream_readable(stream_id, &mut acr);
}
ConnectionEvent::SendStreamWritable { stream_id } => {
self.stream_writable(stream_id, &mut acr);
Expand All @@ -233,26 +241,6 @@ impl super::HttpServer for HttpServer {
}
}

fn set_qlog_dir(&mut self, dir: Option<PathBuf>) {
self.server.set_qlog_dir(dir);
}

fn validate_address(&mut self, v: ValidateAddress) {
self.server.set_validation(v);
}

fn set_ciphers(&mut self, ciphers: &[Cipher]) {
self.server.set_ciphers(ciphers);
}

fn enable_ech(&mut self) -> &[u8] {
let (sk, pk) = generate_ech_keys().expect("generate ECH keys");
self.server
.enable_ech(random::<1>()[0], "public.example", &sk, &pk)
.expect("enable ECH");
self.server.ech_config()
}

fn has_events(&self) -> bool {
self.server.has_active_connections()
}
Expand Down
47 changes: 21 additions & 26 deletions neqo-bin/src/server/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ use std::{
cmp::min,
collections::HashMap,
fmt::{self, Display},
path::PathBuf,
rc::Rc,
time::Instant,
};

use neqo_common::{qdebug, qerror, qwarn, Datagram, Header};
use neqo_crypto::{generate_ech_keys, random, AntiReplay, Cipher};
use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram, Header};
use neqo_crypto::{generate_ech_keys, random, AntiReplay};
use neqo_http3::{
Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId,
};
Expand All @@ -29,6 +28,7 @@ pub struct HttpServer {
/// Progress writing to each stream.
remaining_data: HashMap<StreamId, ResponseData>,
posts: HashMap<Http3OrWebTransportStream, usize>,
is_qns_test: bool,
}

impl HttpServer {
Expand All @@ -39,7 +39,7 @@ impl HttpServer {
anti_replay: AntiReplay,
cid_mgr: Rc<RefCell<dyn ConnectionIdGenerator>>,
) -> Self {
let server = Http3Server::new(
let mut server = Http3Server::new(
args.now(),
&[args.key.clone()],
&[args.shared.alpn.clone()],
Expand All @@ -53,10 +53,25 @@ impl HttpServer {
None,
)
.expect("We cannot make a server!");

server.set_ciphers(&args.get_ciphers());
server.set_qlog_dir(args.shared.qlog_dir.clone());
if args.retry {
server.set_validation(ValidateAddress::Always);
}
if args.ech {
let (sk, pk) = generate_ech_keys().expect("should create ECH keys");
server
.enable_ech(random::<1>()[0], "public.example", &sk, &pk)
.unwrap();
let cfg = server.ech_config();
qinfo!("ECHConfigList: {}", hex(cfg));
}
Self {
server,
remaining_data: HashMap::new(),
posts: HashMap::new(),
is_qns_test: args.shared.qns_test.is_some(),
}
}
}
Expand All @@ -72,7 +87,7 @@ impl super::HttpServer for HttpServer {
self.server.process(dgram, now)
}

fn process_events(&mut self, args: &Args, _now: Instant) {
fn process_events(&mut self, _now: Instant) {
while let Some(event) = self.server.next_event() {
match event {
Http3ServerEvent::Headers {
Expand All @@ -97,7 +112,7 @@ impl super::HttpServer for HttpServer {
continue;
};

let mut response = if args.shared.qns_test.is_some() {
let mut response = if self.is_qns_test {
match qns_read_response(path.value()) {
Ok(data) => ResponseData::from(data),
Err(e) => {
Expand Down Expand Up @@ -166,26 +181,6 @@ impl super::HttpServer for HttpServer {
}
}

fn set_qlog_dir(&mut self, dir: Option<PathBuf>) {
self.server.set_qlog_dir(dir);
}

fn validate_address(&mut self, v: ValidateAddress) {
self.server.set_validation(v);
}

fn set_ciphers(&mut self, ciphers: &[Cipher]) {
self.server.set_ciphers(ciphers);
}

fn enable_ech(&mut self) -> &[u8] {
let (sk, pk) = generate_ech_keys().expect("should create ECH keys");
self.server
.enable_ech(random::<1>()[0], "public.example", &sk, &pk)
.unwrap();
self.server.ech_config()
}

fn has_events(&self) -> bool {
self.server.has_events()
}
Expand Down
36 changes: 7 additions & 29 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use futures::{
future::{select, select_all, Either},
FutureExt,
};
use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram};
use neqo_common::{qdebug, qerror, qinfo, qwarn, Datagram};
use neqo_crypto::{
constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256},
init_db, AntiReplay, Cipher,
};
use neqo_transport::{server::ValidateAddress, Output, RandomConnectionIdGenerator, Version};
use neqo_transport::{Output, RandomConnectionIdGenerator, Version};
use tokio::time::Sleep;

use crate::{udp, SharedArgs};
Expand Down Expand Up @@ -185,13 +185,8 @@ fn qns_read_response(filename: &str) -> Result<Vec<u8>, io::Error> {

pub trait HttpServer: Display {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
// TODO: Is the provided Args really needed here? Maybe clone on construction of the HttpServer implementation?
fn process_events(&mut self, args: &Args, now: Instant);
fn process_events(&mut self, now: Instant);
fn has_events(&self) -> bool;
fn set_qlog_dir(&mut self, dir: Option<PathBuf>);
fn set_ciphers(&mut self, ciphers: &[Cipher]);
fn validate_address(&mut self, when: ValidateAddress);
fn enable_ech(&mut self) -> &[u8];
}

// TODO: Use singular form.
Expand Down Expand Up @@ -236,31 +231,14 @@ impl ServersRunner {
.expect("unable to setup anti-replay");
let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10)));

let mut svr: Box<dyn HttpServer> = if args.shared.use_old_http {
if args.shared.use_old_http {
Box::new(
http09::HttpServer::new(
args.now(),
&[args.key.clone()],
&[args.shared.alpn.clone()],
anti_replay,
cid_mgr,
args.shared.quic_parameters.get(&args.shared.alpn),
)
.expect("We cannot make a server!"),
http09::HttpServer::new(args, anti_replay, cid_mgr)
.expect("We cannot make a server!"),
)
} else {
Box::new(http3::HttpServer::new(args, anti_replay, cid_mgr))
};
svr.set_ciphers(&args.get_ciphers());
svr.set_qlog_dir(args.shared.qlog_dir.clone());
if args.retry {
svr.validate_address(ValidateAddress::Always);
}
if args.ech {
let cfg = svr.enable_ech();
qinfo!("ECHConfigList: {}", hex(cfg));
}
svr
}

/// Tries to find a socket, but then just falls back to sending from the first.
Expand Down Expand Up @@ -319,7 +297,7 @@ impl ServersRunner {

pub async fn run(mut self) -> Res<()> {
loop {
self.server.process_events(&self.args, self.args.now());
self.server.process_events(self.args.now());

self.process(None).await?;

Expand Down

0 comments on commit 190bb75

Please sign in to comment.