Skip to content

Commit

Permalink
Merge pull request #1 from maxmindlin/feature/metrics
Browse files Browse the repository at this point in the history
Feature/metrics
  • Loading branch information
maxmindlin authored Mar 9, 2024
2 parents c861738 + 22f30df commit a515821
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 9 deletions.
23 changes: 23 additions & 0 deletions .docker/telegraf/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[global_tags]

[agent]
interval = "1s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 200000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = "1s"
debug = true

[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
bucket = "${DOCKER_INFLUXDB_INIT_BUCKET}"
organization = "${DOCKER_INFLUXDB_INIT_ORG}"
token = "${DOCKER_INFLUXDB_INIT_ADMIN_TOKEN}"
timeout = "40s"

[[inputs.socket_listener]]
## Address and port to host HTTP listener on
service_address = "tcp://${TELEGRAFCLIENT_HOST}:${TELEGRAFCLIENT_PORT}"
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ POSTGRES_PASSWORD=testuserpwd
POSTGRES_DB=locust
POSTGRES_HOST=postgres-locust
POSTGRES_PORT=5432

TELEGRAF_ADDR=tcp://telegraf:8092

22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ tracing-subscriber = "0.3.0"
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres"] }
cookie = "0.18.0"
warp = "0.3.6"
telegraf = "0.6"

# The profile that 'cargo dist' will build with
[profile.dist]
Expand Down
50 changes: 50 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ services:
- 3000:3000
depends_on:
- postgres-locust
- influxdb
- telegraf
# - flyway-locust

postgres-locust:
Expand Down Expand Up @@ -48,6 +50,54 @@ services:
depends_on:
- postgres-locust

influxdb:
networks:
- locust-net
image: influxdb:latest
ports:
- 8086:8086
environment:
DOCKER_INFLUXDB_INIT_MODE: ${DOCKER_INFLUXDB_INIT_MODE}
DOCKER_INFLUXDB_INIT_ORG: ${DOCKER_INFLUXDB_INIT_ORG}
DOCKER_INFLUXDB_INIT_BUCKET: ${DOCKER_INFLUXDB_INIT_BUCKET}
DOCKER_INFLUXDB_INIT_USERNAME: ${DOCKER_INFLUXDB_INIT_USERNAME}
DOCKER_INFLUXDB_INIT_PASSWORD: ${DOCKER_INFLUXDB_INIT_PASSWORD}
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${DOCKER_INFLUXDB_INIT_ADMIN_TOKEN}
INFLUXDB_DB: influx

telegraf:
networks:
- locust-net
image: telegraf:latest
hostname: ${TELEGRAFCLIENT_HOST}
ports:
- 8092:8092
volumes:
- ./.docker/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
depends_on:
- influxdb
environment:
DOCKER_INFLUXDB_INIT_MODE: ${DOCKER_INFLUXDB_INIT_MODE}
DOCKER_INFLUXDB_INIT_ORG: ${DOCKER_INFLUXDB_INIT_ORG}
DOCKER_INFLUXDB_INIT_BUCKET: ${DOCKER_INFLUXDB_INIT_BUCKET}
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${DOCKER_INFLUXDB_INIT_ADMIN_TOKEN}
TELEGRAFCLIENT_HOST: ${TELEGRAFCLIENT_HOST}
TELEGRAFCLIENT_PORT: ${TELEGRAFCLIENT_PORT}
INFLUXDB_DB: influx


grafana:
networks:
- locust-net
image: grafana/grafana-enterprise
container_name: grafana
restart: unless-stopped
ports:
- 4000:3000
depends_on:
- influxdb


networks:
locust-net:
external: false
Expand Down
11 changes: 10 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod ca;
mod error;
mod metrics;
mod rewind;
mod service;
mod worker;

use crate::metrics::TelegrafClient;
use crate::worker::DBWorker;
use ca::RcgenAuthority;
use futures::Future;
Expand All @@ -17,6 +19,7 @@ use rustls_pemfile as pemfile;
use sqlx::PgPool;
use std::{
convert::Infallible,
env,
net::SocketAddr,
sync::{mpsc, Arc},
thread, time,
Expand Down Expand Up @@ -92,8 +95,14 @@ async fn main() {
let db_pool_arc = Arc::new(db_pool);
let (tx, rx) = mpsc::channel();

// @TODO: config out metrics client options.
let telegraf_client = match env::var("TELEGRAF_ADDR") {
Ok(addr) => Some(TelegrafClient::new(addr.as_ref())),
Err(_) => None,
};

// @TODO: could probably make a worker pool instead of a single worker.
let worker = DBWorker::new(Arc::clone(&db_pool_arc), rx);
let mut worker = DBWorker::new(Arc::clone(&db_pool_arc), rx, telegraf_client);
thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(worker.start());
Expand Down
43 changes: 43 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use telegraf::Metric;

#[derive(Debug)]
pub enum MetricsError {
WriteError(String),
}

pub trait MetricClient {
fn send_proxy_metric(&mut self, metric: &ProxyMetric) -> Result<(), MetricsError>;
}

#[derive(Metric)]
#[measurement = "proxy_metrics"]
pub struct ProxyMetric {
#[telegraf(tag)]
pub domain: String,
#[telegraf(tag)]
pub proxy_id: i32,
pub response_time: u32,
pub status: u16,
}

pub struct TelegrafClient {
client: telegraf::Client,
}

impl TelegrafClient {
pub fn new(addr: &str) -> Self {
Self {
client: telegraf::Client::new(addr).unwrap(),
}
}
}

impl MetricClient for TelegrafClient {
fn send_proxy_metric(&mut self, metric: &ProxyMetric) -> Result<(), MetricsError> {
if let Err(e) = self.client.write(metric) {
return Err(MetricsError::WriteError(e.to_string()));
}

Ok(())
}
}
42 changes: 34 additions & 8 deletions src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,61 @@
use std::sync::{mpsc, Arc};

use http::StatusCode;
use sqlx::PgPool;
use std::sync::{mpsc, Arc};
use tracing::warn;

pub struct DBWorker {
use crate::metrics::{MetricClient, ProxyMetric};

pub struct DBWorker<T> {
pool: Arc<PgPool>,

Check warning on line 9 in src/worker.rs

View workflow job for this annotation

GitHub Actions / test

field `pool` is never read
channel: mpsc::Receiver<DBJob>,
metrics_clients: Option<T>,
}

impl DBWorker {
pub fn new(pool: Arc<PgPool>, channel: mpsc::Receiver<DBJob>) -> Self {
Self { pool, channel }
impl<T> DBWorker<T>
where
T: MetricClient,
{
pub fn new(
pool: Arc<PgPool>,
channel: mpsc::Receiver<DBJob>,
metrics_clients: Option<T>,
) -> Self {
Self {
pool,
channel,
metrics_clients,
}
}

pub async fn start(&self) {
pub async fn start(&mut self) {
while let Ok(job) = self.channel.recv() {
self.process_job(job).await;
}

warn!("Error receiving worker job. Exiting");
}

async fn process_job(&self, job: DBJob) {
async fn process_job(&mut self, job: DBJob) {
match job {
DBJob::ProxyResponse {
proxy_id,
status,
response_time,
domain,
} => {
let metric = ProxyMetric {
proxy_id,
domain: domain.unwrap_or("".to_string()),
status: status.as_u16(),
response_time,
};

if let Some(client) = &mut self.metrics_clients {
if let Err(e) = client.send_proxy_metric(&metric) {
warn!("error sending proxy metric: {e:?}");
};
}

// Modify success coefficient in DB
// to keep track of proxy success across domains.
// Coeff will be used to calculate likelihood of
Expand Down

0 comments on commit a515821

Please sign in to comment.