From d893c996c6a78958d35cf2d32dbcb85945504276 Mon Sep 17 00:00:00 2001 From: Max Mindlin Date: Sun, 28 Jul 2024 18:54:57 -0400 Subject: [PATCH] stash --- Cargo.lock | 1 - scout-interpreter/src/builder.rs | 9 +++- scout-interpreter/src/lib.rs | 67 ++++++++++++++++++++++-- scout-worker/Cargo.toml | 2 +- scout-worker/README.md | 1 + scout-worker/src/config.rs | 12 ++--- scout-worker/src/http.rs | 51 ++++++++++++++---- scout-worker/src/main.rs | 20 ++++--- src/main.rs | 89 +++++++++++++++++--------------- src/repl.rs | 22 ++++---- 10 files changed, 195 insertions(+), 79 deletions(-) create mode 100644 scout-worker/README.md diff --git a/Cargo.lock b/Cargo.lock index bdb80a6..c38a46e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2270,7 +2270,6 @@ name = "scout-worker" version = "0.1.0" dependencies = [ "actix-web", - "fantoccini", "scout-interpreter", "serde", "tokio", diff --git a/scout-interpreter/src/builder.rs b/scout-interpreter/src/builder.rs index 854c61b..2d32653 100644 --- a/scout-interpreter/src/builder.rs +++ b/scout-interpreter/src/builder.rs @@ -1,3 +1,5 @@ +use std::process::Command; + use crate::{env::EnvPointer, eval::ScrapeResultsPtr, EnvVars, Interpreter}; #[derive(Debug)] @@ -36,11 +38,14 @@ impl InterpreterBuilder { Some(c) => Ok(c), None => new_crawler(&env_vars).await, }?; + let interpreter = Interpreter::new( - self.env.unwrap_or(EnvPointer::default()), - self.results.unwrap_or(ScrapeResultsPtr::default()), + self.env.unwrap_or_default(), + self.results.unwrap_or_default(), crawler, + crate::GeckDriverProc::new(env_vars.port() as usize), ); + Ok(interpreter) } } diff --git a/scout-interpreter/src/lib.rs b/scout-interpreter/src/lib.rs index aa60a06..3a39e8b 100644 --- a/scout-interpreter/src/lib.rs +++ b/scout-interpreter/src/lib.rs @@ -1,4 +1,7 @@ -use std::sync::Arc; +use std::{ + process::{Child, Command}, + sync::Arc, +}; use env::EnvPointer; use eval::{eval, EvalError, ScrapeResultsPtr}; @@ -50,18 +53,43 @@ pub enum InterpreterError { ParserError(ParseError), } +pub struct GeckDriverProc(Child); + +impl GeckDriverProc { + pub fn new(port: usize) -> Self { + let child = Command::new("geckodriver") + .arg("--port") + .arg(port.to_string()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() + .expect("error spinning up driver process"); + + // sleep to allow driver to start + std::thread::sleep(std::time::Duration::from_millis(50)); + Self(child) + } +} + pub struct Interpreter { env: EnvPointer, results: ScrapeResultsPtr, crawler: fantoccini::Client, + _geckodriver_proc: GeckDriverProc, } impl Interpreter { - pub fn new(env: EnvPointer, results: ScrapeResultsPtr, crawler: fantoccini::Client) -> Self { + pub fn new( + env: EnvPointer, + results: ScrapeResultsPtr, + crawler: fantoccini::Client, + geckodriver_proc: GeckDriverProc, + ) -> Self { Self { env, results, crawler, + _geckodriver_proc: geckodriver_proc, } } pub async fn eval(&self, content: &str) -> Result, InterpreterError> { @@ -79,7 +107,16 @@ impl Interpreter { } } - pub async fn finalize(self) { + pub fn results(&self) -> ScrapeResultsPtr { + self.results.clone() + } + + pub fn reset(&mut self) { + self.env = EnvPointer::default(); + self.results = ScrapeResultsPtr::default(); + } + + pub async fn close(self) { let _ = self.crawler.close().await; } } @@ -89,3 +126,27 @@ impl From for InterpreterError { InterpreterError::EvalError(value) } } + +impl Drop for GeckDriverProc { + fn drop(&mut self) { + #[cfg(target_os = "windows")] + let mut kill = Command::new("taskkill") + .arg("/PID") + .arg(&self.0.id().to_string()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .arg("/F") + .spawn() + .expect("error sending driver kill"); + + #[cfg(not(target_os = "windows"))] + let mut kill = Command::new("kill") + .args(["-s", "TERM", &self.0.id().to_string()]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() + .expect("error sending driver kill"); + + kill.wait().expect("error waiting for driver kill"); + } +} diff --git a/scout-worker/Cargo.toml b/scout-worker/Cargo.toml index f600f02..0f503de 100644 --- a/scout-worker/Cargo.toml +++ b/scout-worker/Cargo.toml @@ -9,4 +9,4 @@ actix-web = "4" tokio = { version = "1", features = ["full"] } serde = { version = "1.0.203", features = ["derive"] } toml = "0.8.16" -fantoccini = "0.19.3" \ No newline at end of file + diff --git a/scout-worker/README.md b/scout-worker/README.md new file mode 100644 index 0000000..324afd5 --- /dev/null +++ b/scout-worker/README.md @@ -0,0 +1 @@ +Scout-worker is a processing framework for ScoutLang. It provides multiple input/output types that are driven via configuration. diff --git a/scout-worker/src/config.rs b/scout-worker/src/config.rs index fcfa9a5..04e1d1e 100644 --- a/scout-worker/src/config.rs +++ b/scout-worker/src/config.rs @@ -1,3 +1,5 @@ +use std::fs; + use serde::Deserialize; use crate::WorkerError; @@ -20,11 +22,9 @@ pub struct ConfigInputsHttp { impl Config { pub fn load_file(path: Option<&str>) -> Result { - fn load(content: &str) -> Result { - toml::from_str(&content).map_err(|e| WorkerError::ConfigError(e.to_string())) - } - - let path = path.unwrap_or("scout-worker.toml"); - load(path) + let path = path.unwrap_or("scout.toml"); + let content = + fs::read_to_string(path).map_err(|e| WorkerError::ConfigError(e.to_string()))?; + toml::from_str(&content).map_err(|e| WorkerError::ConfigError(e.to_string())) } } diff --git a/scout-worker/src/http.rs b/scout-worker/src/http.rs index 9a89dbf..52ee7a1 100644 --- a/scout-worker/src/http.rs +++ b/scout-worker/src/http.rs @@ -1,25 +1,58 @@ -use std::{io, sync::Arc}; +use std::{ + fs, + io::{self, ErrorKind}, + sync::Arc, +}; -use actix_web::{post, web, App, HttpServer, Responder}; -use scout_interpreter::builder::InterpreterBuilder; +use actix_web::{get, http::StatusCode, post, web, App, HttpResponse, HttpServer, Responder}; +use scout_interpreter::Interpreter; use tokio::sync::Mutex; -use crate::config::ConfigInputsHttp; +use crate::{config::ConfigInputsHttp, models::incoming}; #[post("/crawl")] -async fn crawl(crawler: web::Data>>) -> impl Responder { - let f = &*crawler.get_ref().lock().await; - format!("hello") +async fn crawl( + body: web::Json, + interpreter: web::Data>>, +) -> impl Responder { + match fs::read_to_string(&body.file) { + Ok(content) => { + let interpreter = &mut *interpreter.get_ref().lock().await; + + if let Err(e) = interpreter.eval(&content).await { + interpreter.reset(); + return HttpResponse::build(StatusCode::INTERNAL_SERVER_ERROR) + .body(format!("interpreter error: {e:?}")); + } + let res = interpreter.results(); + let payload = res.lock().await.to_json(); + + interpreter.reset(); + HttpResponse::build(StatusCode::OK) + .content_type("application/json") + .body(payload) + } + Err(e) if e.kind() == ErrorKind::NotFound => HttpResponse::build(StatusCode::BAD_REQUEST) + .body(format!("unknown file: {}", body.file)), + Err(e) => HttpResponse::build(StatusCode::INTERNAL_SERVER_ERROR) + .body(format!("error reading file: {e}")), + } +} + +#[get("/health")] +async fn health() -> impl Responder { + "OK" } pub async fn start_http_consumer( config: &ConfigInputsHttp, - crawler: Arc>, + interpeter: Arc>, ) -> Result<(), io::Error> { HttpServer::new(move || { App::new() - .app_data(web::Data::new(crawler.clone())) + .app_data(web::Data::new(interpeter.clone())) .service(crawl) + .service(health) }) .bind((config.addr.as_str(), config.port as u16))? .run() diff --git a/scout-worker/src/main.rs b/scout-worker/src/main.rs index 97b0536..0ff198c 100644 --- a/scout-worker/src/main.rs +++ b/scout-worker/src/main.rs @@ -1,4 +1,8 @@ +use std::sync::Arc; + use config::Config; +use scout_interpreter::{builder::InterpreterBuilder, Interpreter}; +use tokio::sync::Mutex; mod config; mod http; @@ -8,17 +12,21 @@ pub enum WorkerError { ConfigError(String), } -async fn start(config: Config) { +async fn start(config: Config, interpeter: Interpreter) { + let inter_ptr = Arc::new(Mutex::new(interpeter)); if let Some(http_config) = config.inputs.http { - // if let Err(e) = http::start_http_consumer(&http_config).await { - // println!("{e:?}"); - // } + if let Err(e) = http::start_http_consumer(&http_config, inter_ptr.clone()).await { + println!("{e:?}"); + } } } #[tokio::main] async fn main() { let config = Config::load_file(None).unwrap_or_default(); - println!("{config:?}"); - start(config).await; + let interpreter = InterpreterBuilder::default() + .build() + .await + .expect("error building interpreter"); + start(config, interpreter).await; } diff --git a/src/main.rs b/src/main.rs index 5d5c63e..8b1065e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,29 +1,30 @@ use std::{env, fs, process::Command}; use repl::run_repl; -use scout_interpreter::{builder::InterpreterBuilder, env::Env, eval::ScrapeResultsPtr, EnvVars}; +use scout_interpreter::{ + builder::InterpreterBuilder, env::Env, eval::ScrapeResultsPtr, EnvVars, Interpreter, +}; mod repl; async fn run( file: Option, - results: ScrapeResultsPtr, + interpreter: &Interpreter, + // results: ScrapeResultsPtr, ) -> Result<(), Box> { - let env = Env::default(); + // let env = Env::default(); match file { - None => run_repl(results, env).await, + None => run_repl(interpreter).await, Some(f) => { let contents = fs::read_to_string(f)?; - let interpret = InterpreterBuilder::default() - .with_results(results) - .build() - .await?; - if let Err(e) = interpret.eval(&contents).await { + // let interpret = InterpreterBuilder::default() + // .with_results(results) + // .build() + // .await?; + if let Err(e) = interpreter.eval(&contents).await { println!("Interpeter error: {:?}", e); } - interpret.finalize().await; - Ok(()) } } @@ -31,44 +32,50 @@ async fn run( #[tokio::main] async fn main() { - let env_vars = envy::from_env::().expect("error loading env config"); + // let env_vars = envy::from_env::().expect("error loading env config"); let args: Vec = env::args().collect(); - let child = Command::new("geckodriver") - .arg("--port") - .arg(env_vars.port().to_string()) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .spawn() - .expect("error spinning up driver process"); + // let child = Command::new("geckodriver") + // .arg("--port") + // .arg(env_vars.port().to_string()) + // .stdout(std::process::Stdio::null()) + // .stderr(std::process::Stdio::null()) + // .spawn() + // .expect("error spinning up driver process"); - // sleep to allow driver to start - std::thread::sleep(std::time::Duration::from_millis(50)); + // // sleep to allow driver to start + // std::thread::sleep(std::time::Duration::from_millis(50)); - let results = ScrapeResultsPtr::default(); - if let Err(e) = run(args.get(1).cloned(), results.clone()).await { + // let results = ScrapeResultsPtr::default(); + let interpreter = InterpreterBuilder::default() + .build() + .await + .expect("failed to build interpreter"); + if let Err(e) = run(args.get(1).cloned(), &interpreter).await { println!("Error: {}", e); } - let json_results = results.lock().await.to_json(); + let json_results = interpreter.results().lock().await.to_json(); println!("{}", json_results); - #[cfg(target_os = "windows")] - let mut kill = Command::new("taskkill") - .arg("/PID") - .arg(&child.id().to_string()) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .arg("/F") - .spawn() - .expect("error sending driver kill"); + interpreter.close().await; + + // #[cfg(target_os = "windows")] + // let mut kill = Command::new("taskkill") + // .arg("/PID") + // .arg(&child.id().to_string()) + // .stdout(std::process::Stdio::null()) + // .stderr(std::process::Stdio::null()) + // .arg("/F") + // .spawn() + // .expect("error sending driver kill"); - #[cfg(not(target_os = "windows"))] - let mut kill = Command::new("kill") - .args(["-s", "TERM", &child.id().to_string()]) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .spawn() - .expect("error sending driver kill"); + // #[cfg(not(target_os = "windows"))] + // let mut kill = Command::new("kill") + // .args(["-s", "TERM", &child.id().to_string()]) + // .stdout(std::process::Stdio::null()) + // .stderr(std::process::Stdio::null()) + // .spawn() + // .expect("error sending driver kill"); - kill.wait().expect("error waiting for driver kill"); + // kill.wait().expect("error waiting for driver kill"); } diff --git a/src/repl.rs b/src/repl.rs index df5a77f..bc13534 100644 --- a/src/repl.rs +++ b/src/repl.rs @@ -2,22 +2,24 @@ use std::sync::Arc; use futures::lock::Mutex; use rustyline::{error::ReadlineError, Editor}; -use scout_interpreter::{builder::InterpreterBuilder, env::Env, eval::ScrapeResultsPtr}; +use scout_interpreter::{ + builder::InterpreterBuilder, env::Env, eval::ScrapeResultsPtr, Interpreter, +}; const PROMPT: &str = ">> "; pub async fn run_repl( - results: ScrapeResultsPtr, - env: Env, + // results: ScrapeResultsPtr, + interpreter: &Interpreter, // env: Env, ) -> Result<(), Box> { let mut rl = Editor::<()>::new(); - let env = Arc::new(Mutex::new(env)); + // let env = Arc::new(Mutex::new(env)); - let interpreter = InterpreterBuilder::default() - .with_env(env.clone()) - .with_results(results.clone()) - .build() - .await?; + // let interpreter = InterpreterBuilder::default() + // .with_env(env.clone()) + // .with_results(results.clone()) + // .build() + // .await?; if rl.load_history("history.txt").is_err() { println!("No previous history."); } @@ -49,6 +51,6 @@ pub async fn run_repl( } } rl.save_history("history.txt").unwrap(); - interpreter.finalize().await; + // interpreter.close().await; Ok(()) }