From 482abe154e17ded2053c25bed77cefa489d16f42 Mon Sep 17 00:00:00 2001 From: Simon Laux Date: Mon, 22 May 2023 21:02:30 +0200 Subject: [PATCH] make the repl tool work with the tokio console https://github.com/tokio-rs/console/tree/main this can help with debugging async stuff. --- .cargo/config.toml | 3 + Cargo.lock | 127 +++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 +- deltachat-repl/Cargo.toml | 1 + deltachat-repl/src/main.rs | 109 ++++++++++++++++--------------- src/configure.rs | 76 +++++++++++----------- src/contact.rs | 5 +- src/debug_logging.rs | 8 ++- src/pgp.rs | 48 +++++++------- src/scheduler.rs | 28 +++++--- src/smtp.rs | 6 +- 11 files changed, 287 insertions(+), 126 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index e2e5fa90e2..629174d5ed 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -9,3 +9,6 @@ # invoking `cargo test` threads are allowed to have a large enough # stack size without needing to use an optimised build. RUST_MIN_STACK = "8388608" + +[build] +rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b3a006e0da..d4f93cc68e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -768,6 +768,42 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console-api" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" +dependencies = [ + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57ab2224a0311582eb03adba4caaf18644f7b1f10a760803a803b9b605187fc7" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures", + "hdrhistogram", + "humantime 2.1.0", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const-oid" version = "0.9.2" @@ -1246,6 +1282,7 @@ version = "1.115.0" dependencies = [ "ansi_term", "anyhow", + "console-subscriber", "deltachat", "dirs", "log", @@ -2208,6 +2245,19 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "hdrhistogram" +version = "7.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" +dependencies = [ + "base64 0.13.1", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heck" version = "0.4.1" @@ -2369,6 +2419,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -3615,6 +3677,38 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost", +] + [[package]] name = "qrcodegen" version = "1.8.0" @@ -4858,6 +4952,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] @@ -4993,6 +5088,34 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +dependencies = [ + "async-trait", + "axum", + "base64 0.21.0", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -5001,9 +5124,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap", "pin-project", "pin-project-lite", + "rand 0.8.5", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 0ff02606f8..09557edbc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,7 +84,7 @@ strum_macros = "0.24" tagger = "4.3.4" textwrap = "0.16.0" thiserror = "1" -tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] } +tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros", "tracing"] } tokio-io-timeout = "1.2.0" tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar diff --git a/deltachat-repl/Cargo.toml b/deltachat-repl/Cargo.toml index 6ef28ef05a..4e60fe88a0 100644 --- a/deltachat-repl/Cargo.toml +++ b/deltachat-repl/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] ansi_term = "0.12.1" anyhow = "1" +console-subscriber = "0.1.9" deltachat = { path = "..", features = ["internals"]} dirs = "5" log = "0.4.16" diff --git a/deltachat-repl/src/main.rs b/deltachat-repl/src/main.rs index 2670b311f1..5499f7581a 100644 --- a/deltachat-repl/src/main.rs +++ b/deltachat-repl/src/main.rs @@ -315,11 +315,13 @@ async fn start(args: Vec) -> Result<(), Error> { let context = Context::new(Path::new(&args[1]), 0, Events::new(), StockStrings::new()).await?; let events = context.get_event_emitter(); - tokio::task::spawn(async move { - while let Some(event) = events.recv().await { - receive_event(event.typ); - } - }); + tokio::task::Builder::new() + .name("repl:receive_event") + .spawn(async move { + while let Some(event) = events.recv().await { + receive_event(event.typ); + } + })?; println!("Delta Chat Core is awaiting your commands."); @@ -331,61 +333,63 @@ async fn start(args: Vec) -> Result<(), Error> { let mut selected_chat = ChatId::default(); let ctx = context.clone(); - let input_loop = tokio::task::spawn_blocking(move || { - let h = DcHelper { - completer: FilenameCompleter::new(), - highlighter: MatchingBracketHighlighter::new(), - hinter: HistoryHinter {}, - }; - let mut rl = Editor::with_config(config)?; - rl.set_helper(Some(h)); - rl.bind_sequence(KeyEvent::alt('N'), Cmd::HistorySearchForward); - rl.bind_sequence(KeyEvent::alt('P'), Cmd::HistorySearchBackward); - if rl.load_history(".dc-history.txt").is_err() { - println!("No previous history."); - } + let input_loop = tokio::task::Builder::new() + .name("repl:input_loop") + .spawn_blocking(move || { + let h = DcHelper { + completer: FilenameCompleter::new(), + highlighter: MatchingBracketHighlighter::new(), + hinter: HistoryHinter {}, + }; + let mut rl = Editor::with_config(config)?; + rl.set_helper(Some(h)); + rl.bind_sequence(KeyEvent::alt('N'), Cmd::HistorySearchForward); + rl.bind_sequence(KeyEvent::alt('P'), Cmd::HistorySearchBackward); + if rl.load_history(".dc-history.txt").is_err() { + println!("No previous history."); + } - loop { - let p = "> "; - let readline = rl.readline(p); - - match readline { - Ok(line) => { - // TODO: ignore "set mail_pw" - rl.add_history_entry(line.as_str())?; - let should_continue = Handle::current().block_on(async { - match handle_cmd(line.trim(), ctx.clone(), &mut selected_chat).await { - Ok(ExitResult::Continue) => true, - Ok(ExitResult::Exit) => { - println!("Exiting ..."); - false + loop { + let p = "> "; + let readline = rl.readline(p); + + match readline { + Ok(line) => { + // TODO: ignore "set mail_pw" + rl.add_history_entry(line.as_str())?; + let should_continue = Handle::current().block_on(async { + match handle_cmd(line.trim(), ctx.clone(), &mut selected_chat).await { + Ok(ExitResult::Continue) => true, + Ok(ExitResult::Exit) => { + println!("Exiting ..."); + false + } + Err(err) => { + println!("Error: {err:#}"); + true + } } - Err(err) => { - println!("Error: {err:#}"); - true - } - } - }); + }); - if !should_continue { + if !should_continue { + break; + } + } + Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => { + println!("Exiting..."); + break; + } + Err(err) => { + println!("Error: {err:#}"); break; } - } - Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => { - println!("Exiting..."); - break; - } - Err(err) => { - println!("Error: {err:#}"); - break; } } - } - rl.save_history(".dc-history.txt")?; - println!("history saved"); - Ok::<_, Error>(()) - }); + rl.save_history(".dc-history.txt")?; + println!("history saved"); + Ok::<_, Error>(()) + })?; context.stop_io().await; input_loop.await??; @@ -481,6 +485,7 @@ async fn handle_cmd( #[tokio::main] async fn main() -> Result<(), Error> { + console_subscriber::init(); let _ = pretty_env_logger::try_init(); let args = std::env::args().collect(); diff --git a/src/configure.rs b/src/configure.rs index 7acd6e4d25..91ed2ffb97 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -181,7 +181,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { let socks5_enabled = socks5_config.is_some(); let ctx2 = ctx.clone(); - let update_device_chats_handle = task::spawn(async move { ctx2.update_device_chats().await }); + let update_device_chats_handle = tokio::task::Builder::new() + .name("update_device_chats") + .spawn(async move { ctx2.update_device_chats().await })?; // Step 1: Load the parameters and check email-address and password @@ -350,44 +352,46 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { .provider .map_or(socks5_config.is_some(), |provider| provider.opt.strict_tls); - let smtp_config_task = task::spawn(async move { - let mut smtp_configured = false; - let mut errors = Vec::new(); - for smtp_server in smtp_servers { - smtp_param.user = smtp_server.username.clone(); - smtp_param.server = smtp_server.hostname.clone(); - smtp_param.port = smtp_server.port; - smtp_param.security = smtp_server.socket; - smtp_param.certificate_checks = match smtp_server.strict_tls { - Some(true) => CertificateChecks::Strict, - Some(false) => CertificateChecks::AcceptInvalidCertificates, - None => CertificateChecks::Automatic, - }; - - match try_smtp_one_param( - &context_smtp, - &smtp_param, - &socks5_config, - &smtp_addr, - provider_strict_tls, - &mut smtp, - ) - .await - { - Ok(_) => { - smtp_configured = true; - break; + let smtp_config_task = tokio::task::Builder::new() + .name("smtp_config") + .spawn(async move { + let mut smtp_configured = false; + let mut errors = Vec::new(); + for smtp_server in smtp_servers { + smtp_param.user = smtp_server.username.clone(); + smtp_param.server = smtp_server.hostname.clone(); + smtp_param.port = smtp_server.port; + smtp_param.security = smtp_server.socket; + smtp_param.certificate_checks = match smtp_server.strict_tls { + Some(true) => CertificateChecks::Strict, + Some(false) => CertificateChecks::AcceptInvalidCertificates, + None => CertificateChecks::Automatic, + }; + + match try_smtp_one_param( + &context_smtp, + &smtp_param, + &socks5_config, + &smtp_addr, + provider_strict_tls, + &mut smtp, + ) + .await + { + Ok(_) => { + smtp_configured = true; + break; + } + Err(e) => errors.push(e), } - Err(e) => errors.push(e), } - } - if smtp_configured { - Ok(smtp_param) - } else { - Err(errors) - } - }); + if smtp_configured { + Ok(smtp_param) + } else { + Err(errors) + } + })?; progress!(ctx, 600); diff --git a/src/contact.rs b/src/contact.rs index 78269f0035..8587e1f81a 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1555,7 +1555,10 @@ impl RecentlySeenLoop { pub(crate) fn new(context: Context) -> Self { let (interrupt_send, interrupt_recv) = channel::bounded(1); - let handle = task::spawn(Self::run(context, interrupt_recv)); + let handle = tokio::task::Builder::new() + .name("recently_seen") + .spawn(Self::run(context, interrupt_recv)) + .expect("failed to spawn recently_seen task"); Self { handle, interrupt_send, diff --git a/src/debug_logging.rs b/src/debug_logging.rs index 7b343a35f9..9aec621a2d 100644 --- a/src/debug_logging.rs +++ b/src/debug_logging.rs @@ -143,9 +143,11 @@ pub(crate) async fn set_debug_logging_xdc(ctx: &Context, id: Option) -> a let (sender, debug_logging_recv) = channel::bounded(1000); let loop_handle = { let ctx = ctx.clone(); - task::spawn(async move { - debug_logging_loop(&ctx, debug_logging_recv).await - }) + tokio::task::Builder::new() + .name("debug_logging") + .spawn(async move { + debug_logging_loop(&ctx, debug_logging_recv).await + })? }; *debug_logging = Some(DebugLogging { msg_id, diff --git a/src/pgp.rs b/src/pgp.rs index 43ea19f9aa..6f87e62c03 100644 --- a/src/pgp.rs +++ b/src/pgp.rs @@ -364,17 +364,19 @@ pub async fn symm_encrypt(passphrase: &str, plain: &[u8]) -> Result { let lit_msg = Message::new_literal_bytes("", plain); let passphrase = passphrase.to_string(); - tokio::task::spawn_blocking(move || { - let mut rng = thread_rng(); - let s2k = StringToKey::new_default(&mut rng); - let msg = - lit_msg.encrypt_with_password(&mut rng, s2k, Default::default(), || passphrase)?; + tokio::task::Builder::new() + .name("symm_encrypt") + .spawn_blocking(move || { + let mut rng = thread_rng(); + let s2k = StringToKey::new_default(&mut rng); + let msg = + lit_msg.encrypt_with_password(&mut rng, s2k, Default::default(), || passphrase)?; - let encoded_msg = msg.to_armored_string(None)?; + let encoded_msg = msg.to_armored_string(None)?; - Ok(encoded_msg) - }) - .await? + Ok(encoded_msg) + })? + .await? } /// Symmetric decryption. @@ -385,20 +387,22 @@ pub async fn symm_decrypt( let (enc_msg, _) = Message::from_armor_single(ctext)?; let passphrase = passphrase.to_string(); - tokio::task::spawn_blocking(move || { - let decryptor = enc_msg.decrypt_with_password(|| passphrase)?; - - let msgs = decryptor.collect::>>()?; - if let Some(msg) = msgs.first() { - match msg.get_content()? { - Some(content) => Ok(content), - None => bail!("Decrypted message is empty"), + tokio::task::Builder::new() + .name("symm_decrypt") + .spawn_blocking(move || { + let decryptor = enc_msg.decrypt_with_password(|| passphrase)?; + + let msgs = decryptor.collect::>>()?; + if let Some(msg) = msgs.first() { + match msg.get_content()? { + Some(content) => Ok(content), + None => bail!("Decrypted message is empty"), + } + } else { + bail!("No valid messages found") } - } else { - bail!("No valid messages found") - } - }) - .await? + })? + .await? } #[cfg(test)] diff --git a/src/scheduler.rs b/src/scheduler.rs index 7962227025..4667ecb5fd 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -740,7 +740,9 @@ impl Scheduler { let (inbox_start_send, inbox_start_recv) = oneshot::channel(); let handle = { let ctx = ctx.clone(); - task::spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers)) + tokio::task::Builder::new() + .name("inbox_loop") + .spawn(inbox_loop(ctx, inbox_start_send, inbox_handlers))? }; let inbox = SchedBox { meaning: FolderMeaning::Inbox, @@ -760,7 +762,9 @@ impl Scheduler { let (conn_state, handlers) = ImapConnectionState::new(&ctx).await?; let (start_send, start_recv) = oneshot::channel(); let ctx = ctx.clone(); - let handle = task::spawn(simple_imap_loop(ctx, start_send, handlers, meaning)); + let handle = tokio::task::Builder::new() + .name("simple_imap_loop") + .spawn(simple_imap_loop(ctx, start_send, handlers, meaning))?; oboxes.push(SchedBox { meaning, conn_state, @@ -772,22 +776,28 @@ impl Scheduler { let smtp_handle = { let ctx = ctx.clone(); - task::spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers)) + tokio::task::Builder::new() + .name("smtp_loop") + .spawn(smtp_loop(ctx, smtp_start_send, smtp_handlers))? }; start_recvs.push(smtp_start_recv); let ephemeral_handle = { let ctx = ctx.clone(); - task::spawn(async move { - ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await; - }) + tokio::task::Builder::new() + .name("ephemeral") + .spawn(async move { + ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await; + })? }; let location_handle = { let ctx = ctx.clone(); - task::spawn(async move { - location::location_loop(&ctx, location_interrupt_recv).await; - }) + tokio::task::Builder::new() + .name("location") + .spawn(async move { + location::location_loop(&ctx, location_interrupt_recv).await; + })? }; let recently_seen_loop = RecentlySeenLoop::new(ctx.clone()); diff --git a/src/smtp.rs b/src/smtp.rs index 5916cad2b7..922249c049 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -8,7 +8,6 @@ use anyhow::{bail, format_err, Context as _, Error, Result}; use async_smtp::response::{Category, Code, Detail}; use async_smtp::{self as smtp, EmailAddress, SmtpTransport}; use tokio::io::BufStream; -use tokio::task; use crate::config::Config; use crate::contact::{Contact, ContactId}; @@ -60,7 +59,10 @@ impl Smtp { // Closing connection with a QUIT command may take some time, especially if it's a // stale connection and an attempt to send the command times out. Send a command in a // separate task to avoid waiting for reply or timeout. - task::spawn(async move { transport.quit().await }); + tokio::task::Builder::new() + .name("smtp_disconnect") + .spawn(async move { transport.quit().await }) + .expect("failed to spawn smtp_disconnect task"); } self.last_success = None; }