Skip to content

Commit

Permalink
http output
Browse files Browse the repository at this point in the history
  • Loading branch information
maxmindlin committed Aug 6, 2024
1 parent e9d04a3 commit 55b563d
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 625 deletions.
608 changes: 20 additions & 588 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion scout-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ actix-web = "4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.203", features = ["derive"] }
toml = "0.8.16"
lapin = "2.5.0"
lapin = { version = "2.5.0", default-features = false, features = ["native-tls"] }
futures-lite = "2.3.0"
serde_json = "1.0"
tracing = "0.1.40"
reqwest = "0.12.5"
28 changes: 26 additions & 2 deletions scout-worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,34 @@ pub struct Config {
#[derive(Debug, Default, Deserialize)]
pub struct ConfigOutputs {
pub rmq: Option<ConfigRMQ>,
pub http: Option<ConfigOutputHttp>,
}

#[derive(Debug, Default, Deserialize)]
pub struct ConfigInputs {
pub http: Option<ConfigInputsHttp>,
pub http: Option<ConfigInputHttp>,
pub rmq: Option<ConfigRMQ>,
}

#[derive(Debug, Deserialize)]
pub struct ConfigInputsHttp {
pub struct ConfigInputHttp {
pub addr: String,
pub port: usize,
}

#[derive(Debug, Deserialize)]
pub enum OutputMethods {
POST,
PUT,
PATCH,
}

#[derive(Debug, Deserialize)]
pub struct ConfigOutputHttp {
pub endpoint: String,
pub method: OutputMethods,
}

#[derive(Debug, Deserialize)]
pub struct ConfigRMQ {
pub addr: String,
Expand All @@ -45,3 +59,13 @@ impl Config {
toml::from_str(&content).map_err(|e| WorkerError::ConfigError(e.to_string()))
}
}

impl std::fmt::Display for OutputMethods {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::PATCH => write!(f, "PATCH"),
Self::POST => write!(f, "POST"),
Self::PUT => write!(f, "PUT"),
}
}
}
2 changes: 2 additions & 0 deletions scout-worker/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod sender;
pub mod server;
35 changes: 35 additions & 0 deletions scout-worker/src/http/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use reqwest::Method;

use crate::config::OutputMethods;

pub struct Sender {
client: reqwest::Client,
method: Method,
endpoint: String,
}

impl Sender {
pub fn new(
method: &OutputMethods,
endpoint: String,
) -> Result<Self, Box<dyn std::error::Error>> {
let client = reqwest::Client::new();
let method = Method::from_bytes(method.to_string().as_bytes())?;
Ok(Self {
client,
method,
endpoint,
})
}

pub async fn send(&self, payload: &str) -> Result<(), reqwest::Error> {
let req = self
.client
.request(self.method.clone(), &self.endpoint)
.body(payload.to_owned())
.build()?;
self.client.execute(req).await?;

Ok(())
}
}
43 changes: 35 additions & 8 deletions scout-worker/src/http.rs → scout-worker/src/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
use std::{
fs,
io::{self, ErrorKind},
sync::Arc,
};

use actix_web::{get, http::StatusCode, post, web, App, HttpResponse, HttpServer, Responder};
use actix_web::{
get,
http::StatusCode,
post,
web::{self, Data},
App, HttpResponse, HttpServer, Responder,
};
use scout_interpreter::builder::InterpreterBuilder;
use tracing::info;

use crate::{config::ConfigInputsHttp, models::incoming};
use crate::{config::ConfigInputHttp, models::incoming, Output};

#[post("/")]
async fn crawl(body: web::Json<incoming::Incoming>) -> impl Responder {
async fn crawl(
outputs: Data<Arc<Vec<Output>>>,
body: web::Json<incoming::Incoming>,
) -> impl Responder {
match fs::read_to_string(&body.file) {
Ok(content) => {
let interpreter = InterpreterBuilder::default().build().await.unwrap();
Expand All @@ -21,6 +32,12 @@ async fn crawl(body: web::Json<incoming::Incoming>) -> impl Responder {
let payload = res.lock().await.to_json();
interpreter.close().await;

for output in outputs.iter() {
if let Err(e) = output.send(&payload).await {
println!("error sending to output: {e}");
}
}

HttpResponse::build(StatusCode::OK)
.content_type("application/json")
.body(payload)
Expand All @@ -37,9 +54,19 @@ async fn health() -> impl Responder {
"OK"
}

pub async fn start_http_consumer(config: &ConfigInputsHttp) -> Result<(), io::Error> {
HttpServer::new(move || App::new().service(crawl).service(health))
.bind((config.addr.as_str(), config.port as u16))?
.run()
.await
pub async fn start_http_consumer(
config: &ConfigInputHttp,
outputs: Arc<Vec<Output>>,
) -> Result<(), io::Error> {
info!("starting HTTP server on {}:{}", config.addr, config.port);
let data = Data::new(outputs.clone());
HttpServer::new(move || {
App::new()
.app_data(data.clone())
.service(crawl)
.service(health)
})
.bind((config.addr.as_str(), config.port as u16))?
.run()
.await
}
41 changes: 22 additions & 19 deletions scout-worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,56 @@
use std::sync::Arc;

use config::Config;
use http::sender::Sender;
use output::Output;
use rmq::producer::Producer;
use tracing::error;

mod config;
mod http;
mod models;
mod output;
mod rmq;

pub enum WorkerError {
ConfigError(String),
}

pub enum Output {
RMQ(rmq::producer::Producer),
}

impl Output {
pub async fn send(&self, payload: String) -> Result<(), Box<dyn std::error::Error>> {
match self {
Output::RMQ(p) => {
p.send(payload).await?;
Ok(())
}
}
}
}

async fn start(config: Config) -> Result<(), Box<dyn std::error::Error>> {
let mut outputs = Vec::new();
if let Some(outputs_config) = config.outputs {
if let Some(rmq) = outputs_config.rmq {
let rmq_out = Producer::new(&rmq).await?;
outputs.push(Output::RMQ(rmq_out));
}

if let Some(http) = outputs_config.http {
let http_out = Sender::new(&http.method, http.endpoint)?;
outputs.push(Output::HTTP(http_out));
}
}

let aoutputs = Arc::new(outputs);
if let Some(http_config) = config.inputs.http {
http::start_http_consumer(&http_config).await?;
http::server::start_http_consumer(&http_config, aoutputs.clone()).await?;
} else if let Some(rmq_config) = config.inputs.rmq {
let mut consumer = rmq::consumer::Consumer::new(&rmq_config).await?;
consumer.start().await.expect("error starting consumer");
rmq::consumer::Consumer::new(&rmq_config, aoutputs.clone())
.await?
.start()
.await?;
}

Ok(())
}

#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}

let config = Config::load_file(None).unwrap_or_default();
if let Err(e) = start(config).await {
println!("error starting worker: {e}")
error!("error starting worker: {e}");
}
}
21 changes: 21 additions & 0 deletions scout-worker/src/output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::{http, rmq};

pub enum Output {
RMQ(rmq::producer::Producer),
HTTP(http::sender::Sender),
}

impl Output {
pub async fn send(&self, payload: &str) -> Result<(), Box<dyn std::error::Error>> {
match self {
Output::RMQ(p) => {
p.send(payload).await?;
Ok(())
}
Output::HTTP(sender) => {
sender.send(payload).await?;
Ok(())
}
}
}
}
16 changes: 13 additions & 3 deletions scout-worker/src/rmq/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use scout_interpreter::{
builder::{BuilderError, InterpreterBuilder},
Interpreter, InterpreterError,
};
use std::{fmt::Display, fs, str};
use std::{fmt::Display, fs, str, sync::Arc};
use tracing::{error, info};

use crate::{config::ConfigRMQ, models::incoming};
use crate::{config::ConfigRMQ, models::incoming, Output};

#[derive(Debug)]
pub enum ConsumerError {
Expand All @@ -25,10 +26,11 @@ pub struct Consumer {
chann: Channel,
queue: String,
interpreter: Interpreter,
outputs: Arc<Vec<Output>>,
}

impl Consumer {
pub async fn new(config: &ConfigRMQ) -> Result<Self, ConsumerError> {
pub async fn new(config: &ConfigRMQ, outputs: Arc<Vec<Output>>) -> Result<Self, ConsumerError> {
let conn = Connection::connect(&config.addr, ConnectionProperties::default()).await?;
let chann = conn.create_channel().await?;
let interpreter = InterpreterBuilder::default().build().await?;
Expand Down Expand Up @@ -64,6 +66,7 @@ impl Consumer {
chann,
queue: config.queue.clone(),
interpreter,
outputs,
})
}

Expand All @@ -90,10 +93,17 @@ impl Consumer {
)
.await?;

info!("listening on queue {}", self.queue);
while let Some(delivery) = consumer.next().await {
if let Ok(delivery) = delivery {
info!("processing msg");
match self.process(&delivery.data).await {
Ok(res) => {
for output in self.outputs.iter() {
if let Err(e) = output.send(&res).await {
error!("error sending to output: {e}");
}
}
delivery.ack(BasicAckOptions::default()).await?;
}
Err(_) => {
Expand Down
8 changes: 4 additions & 4 deletions scout-worker/src/rmq/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use lapin::{
types::FieldTable,
BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind,
};
use tracing::info;

use crate::config::ConfigRMQ;

Expand All @@ -15,7 +16,6 @@ pub enum ProducerError {

pub struct Producer {
chann: Channel,
queue: String,
exchange: String,
out_key: String,
}
Expand Down Expand Up @@ -44,19 +44,19 @@ impl Producer {

Ok(Self {
chann,
queue: config.queue.clone(),
exchange: config.exchange.clone(),
out_key: config.routing_key.clone(),
})
}

pub async fn send(&self, payload: String) -> Result<(), ProducerError> {
pub async fn send(&self, payload: &str) -> Result<(), ProducerError> {
info!("publishing message to {}", self.out_key);
self.chann
.basic_publish(
&self.exchange,
&self.out_key,
BasicPublishOptions::default(),
&payload,
&payload.as_bytes(),
BasicProperties::default(),
)
.await?;
Expand Down

0 comments on commit 55b563d

Please sign in to comment.