Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
maxmindlin committed Jul 28, 2024
1 parent 18a18ba commit d893c99
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 79 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions scout-interpreter/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::process::Command;

Check warning on line 1 in scout-interpreter/src/builder.rs

View workflow job for this annotation

GitHub Actions / test

unused import: `std::process::Command`

use crate::{env::EnvPointer, eval::ScrapeResultsPtr, EnvVars, Interpreter};

#[derive(Debug)]
Expand Down Expand Up @@ -36,11 +38,14 @@ impl InterpreterBuilder {
Some(c) => Ok(c),
None => new_crawler(&env_vars).await,
}?;

let interpreter = Interpreter::new(
self.env.unwrap_or(EnvPointer::default()),
self.results.unwrap_or(ScrapeResultsPtr::default()),
self.env.unwrap_or_default(),
self.results.unwrap_or_default(),
crawler,
crate::GeckDriverProc::new(env_vars.port() as usize),
);

Ok(interpreter)
}
}
Expand Down
67 changes: 64 additions & 3 deletions scout-interpreter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::Arc;
use std::{
process::{Child, Command},
sync::Arc,
};

use env::EnvPointer;
use eval::{eval, EvalError, ScrapeResultsPtr};
Expand Down Expand Up @@ -50,18 +53,43 @@ pub enum InterpreterError {
ParserError(ParseError),
}

pub struct GeckDriverProc(Child);

impl GeckDriverProc {
pub fn new(port: usize) -> Self {
let child = Command::new("geckodriver")
.arg("--port")
.arg(port.to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("error spinning up driver process");

// sleep to allow driver to start
std::thread::sleep(std::time::Duration::from_millis(50));
Self(child)
}
}

pub struct Interpreter {
env: EnvPointer,
results: ScrapeResultsPtr,
crawler: fantoccini::Client,
_geckodriver_proc: GeckDriverProc,
}

impl Interpreter {
pub fn new(env: EnvPointer, results: ScrapeResultsPtr, crawler: fantoccini::Client) -> Self {
pub fn new(
env: EnvPointer,
results: ScrapeResultsPtr,
crawler: fantoccini::Client,
geckodriver_proc: GeckDriverProc,
) -> Self {
Self {
env,
results,
crawler,
_geckodriver_proc: geckodriver_proc,
}
}
pub async fn eval(&self, content: &str) -> Result<Arc<Object>, InterpreterError> {
Expand All @@ -79,7 +107,16 @@ impl Interpreter {
}
}

pub async fn finalize(self) {
pub fn results(&self) -> ScrapeResultsPtr {
self.results.clone()
}

pub fn reset(&mut self) {
self.env = EnvPointer::default();
self.results = ScrapeResultsPtr::default();
}

pub async fn close(self) {
let _ = self.crawler.close().await;
}
}
Expand All @@ -89,3 +126,27 @@ impl From<EvalError> for InterpreterError {
InterpreterError::EvalError(value)
}
}

impl Drop for GeckDriverProc {
fn drop(&mut self) {
#[cfg(target_os = "windows")]
let mut kill = Command::new("taskkill")
.arg("/PID")
.arg(&self.0.id().to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.arg("/F")
.spawn()
.expect("error sending driver kill");

#[cfg(not(target_os = "windows"))]
let mut kill = Command::new("kill")
.args(["-s", "TERM", &self.0.id().to_string()])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("error sending driver kill");

kill.wait().expect("error waiting for driver kill");
}
}
2 changes: 1 addition & 1 deletion scout-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ actix-web = "4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.203", features = ["derive"] }
toml = "0.8.16"
fantoccini = "0.19.3"

1 change: 1 addition & 0 deletions scout-worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Scout-worker is a processing framework for ScoutLang. It provides multiple input/output types that are driven via configuration.
12 changes: 6 additions & 6 deletions scout-worker/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fs;

use serde::Deserialize;

use crate::WorkerError;
Expand All @@ -20,11 +22,9 @@ pub struct ConfigInputsHttp {

impl Config {
pub fn load_file(path: Option<&str>) -> Result<Self, WorkerError> {
fn load(content: &str) -> Result<Config, WorkerError> {
toml::from_str(&content).map_err(|e| WorkerError::ConfigError(e.to_string()))
}

let path = path.unwrap_or("scout-worker.toml");
load(path)
let path = path.unwrap_or("scout.toml");
let content =
fs::read_to_string(path).map_err(|e| WorkerError::ConfigError(e.to_string()))?;
toml::from_str(&content).map_err(|e| WorkerError::ConfigError(e.to_string()))
}
}
51 changes: 42 additions & 9 deletions scout-worker/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,58 @@
use std::{io, sync::Arc};
use std::{
fs,
io::{self, ErrorKind},
sync::Arc,
};

use actix_web::{post, web, App, HttpServer, Responder};
use scout_interpreter::builder::InterpreterBuilder;
use actix_web::{get, http::StatusCode, post, web, App, HttpResponse, HttpServer, Responder};
use scout_interpreter::Interpreter;
use tokio::sync::Mutex;

use crate::config::ConfigInputsHttp;
use crate::{config::ConfigInputsHttp, models::incoming};

#[post("/crawl")]
async fn crawl(crawler: web::Data<Arc<Mutex<fantoccini::Client>>>) -> impl Responder {
let f = &*crawler.get_ref().lock().await;
format!("hello")
async fn crawl(
body: web::Json<incoming::Incoming>,
interpreter: web::Data<Arc<Mutex<Interpreter>>>,
) -> impl Responder {
match fs::read_to_string(&body.file) {
Ok(content) => {
let interpreter = &mut *interpreter.get_ref().lock().await;

if let Err(e) = interpreter.eval(&content).await {
interpreter.reset();
return HttpResponse::build(StatusCode::INTERNAL_SERVER_ERROR)
.body(format!("interpreter error: {e:?}"));
}
let res = interpreter.results();
let payload = res.lock().await.to_json();

interpreter.reset();
HttpResponse::build(StatusCode::OK)
.content_type("application/json")
.body(payload)
}
Err(e) if e.kind() == ErrorKind::NotFound => HttpResponse::build(StatusCode::BAD_REQUEST)
.body(format!("unknown file: {}", body.file)),
Err(e) => HttpResponse::build(StatusCode::INTERNAL_SERVER_ERROR)
.body(format!("error reading file: {e}")),
}
}

#[get("/health")]
async fn health() -> impl Responder {
"OK"
}

pub async fn start_http_consumer(
config: &ConfigInputsHttp,
crawler: Arc<Mutex<fantoccini::Client>>,
interpeter: Arc<Mutex<Interpreter>>,
) -> Result<(), io::Error> {
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(crawler.clone()))
.app_data(web::Data::new(interpeter.clone()))
.service(crawl)
.service(health)
})
.bind((config.addr.as_str(), config.port as u16))?
.run()
Expand Down
20 changes: 14 additions & 6 deletions scout-worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::sync::Arc;

use config::Config;
use scout_interpreter::{builder::InterpreterBuilder, Interpreter};
use tokio::sync::Mutex;

mod config;
mod http;
Expand All @@ -8,17 +12,21 @@ pub enum WorkerError {
ConfigError(String),
}

async fn start(config: Config) {
async fn start(config: Config, interpeter: Interpreter) {
let inter_ptr = Arc::new(Mutex::new(interpeter));
if let Some(http_config) = config.inputs.http {
// if let Err(e) = http::start_http_consumer(&http_config).await {
// println!("{e:?}");
// }
if let Err(e) = http::start_http_consumer(&http_config, inter_ptr.clone()).await {
println!("{e:?}");
}
}
}

#[tokio::main]
async fn main() {
let config = Config::load_file(None).unwrap_or_default();
println!("{config:?}");
start(config).await;
let interpreter = InterpreterBuilder::default()
.build()
.await
.expect("error building interpreter");
start(config, interpreter).await;
}
89 changes: 48 additions & 41 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,74 +1,81 @@
use std::{env, fs, process::Command};

Check warning on line 1 in src/main.rs

View workflow job for this annotation

GitHub Actions / test

unused import: `process::Command`

use repl::run_repl;
use scout_interpreter::{builder::InterpreterBuilder, env::Env, eval::ScrapeResultsPtr, EnvVars};
use scout_interpreter::{
builder::InterpreterBuilder, env::Env, eval::ScrapeResultsPtr, EnvVars, Interpreter,

Check warning on line 5 in src/main.rs

View workflow job for this annotation

GitHub Actions / test

unused imports: `EnvVars`, `env::Env`, and `eval::ScrapeResultsPtr`
};

mod repl;

async fn run(
file: Option<String>,
results: ScrapeResultsPtr,
interpreter: &Interpreter,
// results: ScrapeResultsPtr,
) -> Result<(), Box<dyn std::error::Error>> {
let env = Env::default();
// let env = Env::default();
match file {
None => run_repl(results, env).await,
None => run_repl(interpreter).await,
Some(f) => {
let contents = fs::read_to_string(f)?;
let interpret = InterpreterBuilder::default()
.with_results(results)
.build()
.await?;
if let Err(e) = interpret.eval(&contents).await {
// let interpret = InterpreterBuilder::default()
// .with_results(results)
// .build()
// .await?;
if let Err(e) = interpreter.eval(&contents).await {
println!("Interpeter error: {:?}", e);
}

interpret.finalize().await;

Ok(())
}
}
}

#[tokio::main]
async fn main() {
let env_vars = envy::from_env::<EnvVars>().expect("error loading env config");
// let env_vars = envy::from_env::<EnvVars>().expect("error loading env config");
let args: Vec<String> = env::args().collect();

let child = Command::new("geckodriver")
.arg("--port")
.arg(env_vars.port().to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("error spinning up driver process");
// let child = Command::new("geckodriver")
// .arg("--port")
// .arg(env_vars.port().to_string())
// .stdout(std::process::Stdio::null())
// .stderr(std::process::Stdio::null())
// .spawn()
// .expect("error spinning up driver process");

// sleep to allow driver to start
std::thread::sleep(std::time::Duration::from_millis(50));
// // sleep to allow driver to start
// std::thread::sleep(std::time::Duration::from_millis(50));

let results = ScrapeResultsPtr::default();
if let Err(e) = run(args.get(1).cloned(), results.clone()).await {
// let results = ScrapeResultsPtr::default();
let interpreter = InterpreterBuilder::default()
.build()
.await
.expect("failed to build interpreter");
if let Err(e) = run(args.get(1).cloned(), &interpreter).await {
println!("Error: {}", e);
}
let json_results = results.lock().await.to_json();
let json_results = interpreter.results().lock().await.to_json();
println!("{}", json_results);

#[cfg(target_os = "windows")]
let mut kill = Command::new("taskkill")
.arg("/PID")
.arg(&child.id().to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.arg("/F")
.spawn()
.expect("error sending driver kill");
interpreter.close().await;

// #[cfg(target_os = "windows")]
// let mut kill = Command::new("taskkill")
// .arg("/PID")
// .arg(&child.id().to_string())
// .stdout(std::process::Stdio::null())
// .stderr(std::process::Stdio::null())
// .arg("/F")
// .spawn()
// .expect("error sending driver kill");

#[cfg(not(target_os = "windows"))]
let mut kill = Command::new("kill")
.args(["-s", "TERM", &child.id().to_string()])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("error sending driver kill");
// #[cfg(not(target_os = "windows"))]
// let mut kill = Command::new("kill")
// .args(["-s", "TERM", &child.id().to_string()])
// .stdout(std::process::Stdio::null())
// .stderr(std::process::Stdio::null())
// .spawn()
// .expect("error sending driver kill");

kill.wait().expect("error waiting for driver kill");
// kill.wait().expect("error waiting for driver kill");
}
Loading

0 comments on commit d893c99

Please sign in to comment.