Skip to content

Commit

Permalink
Feat/scout worker (#7)
Browse files Browse the repository at this point in the history
* stash

* stash

* stash

* stash:

* stash

* http output

* stash
  • Loading branch information
maxmindlin authored Aug 8, 2024
1 parent 241466d commit 116eb42
Show file tree
Hide file tree
Showing 21 changed files with 1,669 additions and 101 deletions.
1,080 changes: 1,058 additions & 22 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["scout-interpreter", "scout-lexer", "scout-parser"]
members = ["scout-interpreter", "scout-lexer", "scout-parser", "scout-worker"]

# Config for 'cargo dist'
[workspace.metadata.dist]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ The `scout` binary ran with a filename will read and interpret a script file. Wi

Available ENV variables:
- `SCOUT_DEBUG`: Whether or not to open the debug browser. Defaults to `false`.
- `SCOUT_PORT`: Which port to run Scout on. Defaults to `4444`.
- `SCOUT_PORT`: Which port to run Scout on. Defaults to a random open port. Do not set if you intend to run multiple scout instances at once as ports will conflict.
- `SCOUT_PROXY`: An optional URL to proxy requests to. Defaults to none.
- `SCOUT_PATH`: A path to where Scout installs dependencies, like the standard lib. Defaults to `$HOME/scout-lang/`.

Expand Down
1 change: 1 addition & 0 deletions scout-interpreter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ scout-lexer = { version = "0.6.0", path = "../scout-lexer/" }
url = "2.5.2"
reqwest = { version = "0.12", features = ["json", "cookies"] }
envy = "0.4.2"
get-port = "4.0.0"

[dev-dependencies]
test-case = "3.3.1"
19 changes: 14 additions & 5 deletions scout-interpreter/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use get_port::Ops;

use crate::{env::EnvPointer, eval::ScrapeResultsPtr, EnvVars, Interpreter};

#[derive(Debug)]
Expand Down Expand Up @@ -32,20 +34,27 @@ impl InterpreterBuilder {
pub async fn build(self) -> Result<Interpreter, BuilderError> {
let env_vars =
envy::from_env::<EnvVars>().map_err(|e| BuilderError::EnvError(e.to_string()))?;
let port = env_vars
.port()
.unwrap_or_else(|| get_port::tcp::TcpPort::any("127.0.0.1").unwrap() as usize);
let child = crate::GeckDriverProc::new(port);
let crawler = match self.crawler {
Some(c) => Ok(c),
None => new_crawler(&env_vars).await,
None => new_crawler(&env_vars, port).await,
}?;

let interpreter = Interpreter::new(
self.env.unwrap_or(EnvPointer::default()),
self.results.unwrap_or(ScrapeResultsPtr::default()),
self.env.unwrap_or_default(),
self.results.unwrap_or_default(),
crawler,
child,
);

Ok(interpreter)
}
}

async fn new_crawler(env_vars: &EnvVars) -> Result<fantoccini::Client, BuilderError> {
async fn new_crawler(env_vars: &EnvVars, port: usize) -> Result<fantoccini::Client, BuilderError> {
let mut caps = serde_json::map::Map::new();
if !env_vars.scout_debug {
let opts = serde_json::json!({ "args": ["--headless"] });
Expand All @@ -58,7 +67,7 @@ async fn new_crawler(env_vars: &EnvVars) -> Result<fantoccini::Client, BuilderEr
});
caps.insert("proxy".into(), opt);
}
let conn_url = format!("http://localhost:{}", env_vars.scout_port);
let conn_url = format!("http://localhost:{}", port);
let crawler = fantoccini::ClientBuilder::native()
.capabilities(caps)
.connect(&conn_url)
Expand Down
77 changes: 67 additions & 10 deletions scout-interpreter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::Arc;
use std::{
process::{Child, Command},
sync::Arc,
};

use env::EnvPointer;
use eval::{eval, EvalError, ScrapeResultsPtr};
Expand All @@ -19,8 +22,8 @@ pub struct EnvVars {
#[serde(default)]
scout_debug: bool,

#[serde(default = "default_port")]
scout_port: usize,
#[serde(default)]
scout_port: Option<usize>,

#[serde(default)]
scout_proxy: Option<String>,
Expand All @@ -31,7 +34,7 @@ impl EnvVars {
self.scout_debug
}

pub fn port(&self) -> usize {
pub fn port(&self) -> Option<usize> {
self.scout_port
}

Expand All @@ -40,28 +43,49 @@ impl EnvVars {
}
}

fn default_port() -> usize {
4444
}

#[derive(Debug)]
pub enum InterpreterError {
EvalError(EvalError),
ParserError(ParseError),
}

pub struct GeckDriverProc(Child);

impl GeckDriverProc {
pub fn new(port: usize) -> Self {
let child = Command::new("geckodriver")
.arg("--port")
.arg(port.to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("error spinning up driver process");

// sleep to allow driver to start
std::thread::sleep(std::time::Duration::from_millis(50));
Self(child)
}
}

pub struct Interpreter {
env: EnvPointer,
results: ScrapeResultsPtr,
crawler: fantoccini::Client,
_geckodriver_proc: GeckDriverProc,
}

impl Interpreter {
pub fn new(env: EnvPointer, results: ScrapeResultsPtr, crawler: fantoccini::Client) -> Self {
pub fn new(
env: EnvPointer,
results: ScrapeResultsPtr,
crawler: fantoccini::Client,
geckodriver_proc: GeckDriverProc,
) -> Self {
Self {
env,
results,
crawler,
_geckodriver_proc: geckodriver_proc,
}
}
pub async fn eval(&self, content: &str) -> Result<Arc<Object>, InterpreterError> {
Expand All @@ -79,7 +103,16 @@ impl Interpreter {
}
}

pub async fn finalize(self) {
pub fn results(&self) -> ScrapeResultsPtr {
self.results.clone()
}

pub fn reset(&mut self) {
self.env = EnvPointer::default();
self.results = ScrapeResultsPtr::default();
}

pub async fn close(self) {
let _ = self.crawler.close().await;
}
}
Expand All @@ -89,3 +122,27 @@ impl From<EvalError> for InterpreterError {
InterpreterError::EvalError(value)
}
}

impl Drop for GeckDriverProc {
fn drop(&mut self) {
#[cfg(target_os = "windows")]
let mut kill = Command::new("taskkill")
.arg("/PID")
.arg(&self.0.id().to_string())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.arg("/F")
.spawn()
.expect("error sending driver kill");

#[cfg(not(target_os = "windows"))]
let mut kill = Command::new("kill")
.args(["-s", "TERM", &self.0.id().to_string()])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("error sending driver kill");

kill.wait().expect("error waiting for driver kill");
}
}
16 changes: 16 additions & 0 deletions scout-worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "scout-worker"
version = "0.1.0"
edition = "2021"

[dependencies]
scout-interpreter = { version = "0.6.0", path = "../scout-interpreter/" }
actix-web = "4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.203", features = ["derive"] }
toml = "0.8.16"
lapin = { version = "2.5.0", default-features = false, features = ["native-tls"] }
futures-lite = "2.3.0"
serde_json = "1.0"
tracing = "0.1.40"
reqwest = "0.12.5"
1 change: 1 addition & 0 deletions scout-worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Scout-worker is a processing framework for ScoutLang. It provides multiple input/output types that are driven via configuration.
71 changes: 71 additions & 0 deletions scout-worker/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::fs;

use serde::Deserialize;

use crate::WorkerError;

const DFEAULT_CONFIG_FILE: &str = "scout.toml";

#[derive(Debug, Default, Deserialize)]
pub struct Config {
pub inputs: ConfigInputs,
pub outputs: Option<ConfigOutputs>,
}

#[derive(Debug, Default, Deserialize)]
pub struct ConfigOutputs {
pub rmq: Option<ConfigRMQ>,
pub http: Option<ConfigOutputHttp>,
}

#[derive(Debug, Default, Deserialize)]
pub struct ConfigInputs {
pub http: Option<ConfigInputHttp>,
pub rmq: Option<ConfigRMQ>,
}

#[derive(Debug, Deserialize)]
pub struct ConfigInputHttp {
pub addr: String,
pub port: usize,
}

#[derive(Debug, Deserialize)]
pub enum OutputMethods {
POST,
PUT,
PATCH,
}

#[derive(Debug, Deserialize)]
pub struct ConfigOutputHttp {
pub endpoint: String,
pub method: OutputMethods,
}

#[derive(Debug, Deserialize)]
pub struct ConfigRMQ {
pub addr: String,
pub queue: String,
pub exchange: String,
pub routing_key: String,
}

impl Config {
pub fn load_file(path: Option<&str>) -> Result<Self, WorkerError> {
let path = path.unwrap_or(DFEAULT_CONFIG_FILE);
let content =
fs::read_to_string(path).map_err(|e| WorkerError::ConfigError(e.to_string()))?;
toml::from_str(&content).map_err(|e| WorkerError::ConfigError(e.to_string()))
}
}

impl std::fmt::Display for OutputMethods {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::PATCH => write!(f, "PATCH"),
Self::POST => write!(f, "POST"),
Self::PUT => write!(f, "PUT"),
}
}
}
2 changes: 2 additions & 0 deletions scout-worker/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod sender;
pub mod server;
37 changes: 37 additions & 0 deletions scout-worker/src/http/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use reqwest::Method;
use tracing::info;

use crate::config::OutputMethods;

pub struct Sender {
client: reqwest::Client,
method: Method,
endpoint: String,
}

impl Sender {
pub fn new(
method: &OutputMethods,
endpoint: String,
) -> Result<Self, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let method = Method::from_bytes(method.to_string().as_bytes())?;
Ok(Self {
client,
method,
endpoint,
})
}

pub async fn send(&self, payload: &str) -> Result<(), reqwest::Error> {
info!("sending output to {} {}", self.method, self.endpoint);
let req = self
.client
.request(self.method.clone(), &self.endpoint)
.body(payload.to_owned())
.build()?;
let res = self.client.execute(req).await?;
info!("received response code {}", res.status());
Ok(())
}
}
Loading

0 comments on commit 116eb42

Please sign in to comment.