Skip to content

Commit

Permalink
Added monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed May 30, 2022
1 parent d08bfb9 commit a9cfc1f
Show file tree
Hide file tree
Showing 20 changed files with 247 additions and 48 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ my-app-insights = { branch = "main", git = "https://github.com/MyJetTools/my-app
my-http-server = { branch = "0.2.3", git = "https://github.com/MyJetTools/my-http-server.git" }
my-http-server-controllers = { branch = "0.2.3", git = "https://github.com/MyJetTools/my-http-server-controllers.git" }
my-http-server-swagger = { branch = "0.2.3", git = "https://github.com/MyJetTools/my-http-server-swagger.git" }
my-http-server-app-insights = { branch = "0.2.3", git = "https://github.com/MyJetTools/my-http-server-app-insights.git" }
my-tcp-sockets = { branch = "0.1.1", git = "https://github.com/MyJetTools/my-tcp-sockets.git" }
my-logger = { branch = "0.1.1", git = "https://github.com/MyJetTools/my-logger.git" }
my-json = { branch = "main", git = "https://github.com/MyJetTools/my-json.git" }
Expand Down
2 changes: 1 addition & 1 deletion JavaScript/HtmlSubscribersGenerator.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion JavaScript/HtmlSubscribersGenerator.js.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions TypeScript/HtmlSubscribersGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

class HtmlSubscribersGenerator {




public static generateHtml(data: IInitializedStatus): string {

return '<h3>Connected Nodes</h3>'
Expand Down Expand Up @@ -42,7 +39,7 @@ class HtmlSubscribersGenerator {
let total_partitions = 0;
let total_records = 0;
let total_indexed_records = 0;
for (let table of tables.sort(itm => itm.name ? 1 : -1)) {
for (let table of tables.sort((a, b) => a.name > b.name ? 1 : -1)) {

let style = ' style="color:green" ';

Expand Down
2 changes: 2 additions & 0 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ pub mod global_states;
pub mod logs;
mod metrics;
mod persist_history_duration;
mod request_metrics;

pub use app_ctx::{AppContext, APP_VERSION, DEFAULT_PERSIST_PERIOD};
pub use event_dispatcher::{EventsDispatcher, EventsDispatcherProduction, SyncEventsReader};
pub use metrics::PrometheusMetrics;
pub use persist_history_duration::PersistHistoryDuration;
pub use request_metrics::{RequestMetric, RequestMetrics};
53 changes: 53 additions & 0 deletions src/app/request_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::time::Duration;

use rust_extensions::date_time::DateTimeAsMicroseconds;
use tokio::sync::Mutex;

#[derive(Debug, Clone)]
pub struct RequestMetric {
pub moment: DateTimeAsMicroseconds,
pub name: String,
pub duration: Duration,
pub status_code: u16,
pub result_size: usize,
}

pub struct RequestMetrics {
data: Mutex<Vec<RequestMetric>>,
}

impl RequestMetrics {
pub fn new() -> Self {
Self {
data: Mutex::new(Vec::new()),
}
}

pub async fn add_metric(
&self,
name: String,
duration: Duration,
status_code: u16,
result_size: usize,
) {
let metric = RequestMetric {
moment: DateTimeAsMicroseconds::now(),
name: name,
duration,
status_code,
result_size,
};

let mut data = self.data.lock().await;
data.push(metric);

while data.len() > 100 {
data.remove(0);
}
}

pub async fn get_metrics(&self) -> Vec<RequestMetric> {
let data = self.data.lock().await;
data.clone()
}
}
4 changes: 4 additions & 0 deletions src/db/db_table/db_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use rust_extensions::{
use tokio::sync::{Mutex, RwLock};

use crate::{
app::RequestMetrics,
db::{
db_snapshots::{DbPartitionSnapshot, DbTableSnapshot},
DbRow,
Expand All @@ -34,6 +35,8 @@ pub struct DbTable {

pub common_persist_thread: AtomicBool,
pub dedicated_thread: Mutex<Option<MyTimer>>,

pub request_metrics: RequestMetrics,
}

pub struct DbTableMetrics {
Expand All @@ -58,6 +61,7 @@ impl DbTable {
last_update_time: AtomicDateTimeAsMicroseconds::new(created),
common_persist_thread: AtomicBool::new(true),
dedicated_thread: Mutex::new(None),
request_metrics: RequestMetrics::new(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/db_operations/gc/keep_max_partitions_amount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
//TODO - Use Method from TableData
pub async fn keep_max_partitions_amount(
app: &AppContext,
db_table: Arc<DbTable>,
db_table: &Arc<DbTable>,
max_partitions_amount: usize,
event_src: EventSource,
persist_moment: DateTimeAsMicroseconds,
Expand Down
6 changes: 5 additions & 1 deletion src/http/controllers/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use my_http_server_controllers::controllers::ControllersMiddleware;

use crate::app::AppContext;

pub fn build(app: Arc<AppContext>) -> ControllersMiddleware {
pub fn build(app: &Arc<AppContext>) -> ControllersMiddleware {
let mut result = ControllersMiddleware::new();

let api_controller = super::api::ApiController::new();
Expand Down Expand Up @@ -191,5 +191,9 @@ pub fn build(app: Arc<AppContext>) -> ControllersMiddleware {

result.register_post_action(Arc::new(force_persist_action));

result.register_get_action(Arc::new(super::status_controller::RequestsAction::new(
app.clone(),
)));

result
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn handle_request(

crate::db_operations::gc::keep_max_partitions_amount(
action.app.as_ref(),
db_table,
&db_table,
http_input.max_partitions_amount,
event_src,
http_input.sync_period.get_sync_moment(),
Expand Down
11 changes: 0 additions & 11 deletions src/http/controllers/metrics.rs

This file was deleted.

1 change: 1 addition & 0 deletions src/http/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod data_reader_controller;
pub mod gc_controller;
pub mod home_controller;
pub mod persist_controller;
pub mod request_metrics_writer;

pub mod logs_controller;
pub mod multipart;
Expand Down
93 changes: 93 additions & 0 deletions src/http/controllers/request_metrics_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::sync::Arc;

use my_http_server::{
HttpContext, HttpFailResult, HttpOkResult, HttpOutput, HttpServerMiddleware,
HttpServerRequestFlow,
};
use rust_extensions::StopWatch;

use crate::{app::AppContext, db::DbTable};

pub struct WriteMetricContext {
db_table: Arc<DbTable>,
stop_watch: StopWatch,
}

pub struct RequestMetricsWriter {
app: Arc<AppContext>,
}

impl RequestMetricsWriter {
pub fn new(app: Arc<AppContext>) -> Self {
Self { app }
}

async fn get_metrics_context(&self, ctx: &mut HttpContext) -> Option<WriteMetricContext> {
if let Ok(qs) = ctx.request.get_query_string() {
if let Some(table_name) = qs.get_optional("tableName") {
if let Ok(table_name) = table_name.as_string() {
if let Some(db_table) = self.app.db.get_table(table_name.as_str()).await {
let mut result = WriteMetricContext {
db_table,
stop_watch: StopWatch::new(),
};

result.stop_watch.start();
return result.into();
}
}
}
}

None
}
}

#[async_trait::async_trait]
impl HttpServerMiddleware for RequestMetricsWriter {
async fn handle_request(
&self,
ctx: &mut HttpContext,
get_next: &mut HttpServerRequestFlow,
) -> Result<HttpOkResult, HttpFailResult> {
let metrics_context = self.get_metrics_context(ctx).await;
let result = get_next.next(ctx).await;

if let Some(mut metrics_context) = metrics_context {
metrics_context.stop_watch.pause();

let (result_code, result_size) = match &result {
Ok(result) => (
result.output.get_status_code(),
get_content_size(&result.output),
),
Err(err) => (err.status_code, 0),
};

metrics_context
.db_table
.request_metrics
.add_metric(
format!("[{}]{}", ctx.request.get_method(), ctx.request.get_path(),),
metrics_context.stop_watch.duration(),
result_code,
result_size,
)
.await;
}

result
}
}

fn get_content_size(src: &HttpOutput) -> usize {
match src {
HttpOutput::Empty => 0,
HttpOutput::Content {
headers: _,
content_type: _,
content,
} => content.len(),
HttpOutput::Redirect { url: _ } => 0,
}
}
2 changes: 2 additions & 0 deletions src/http/controllers/status_controller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod models;
mod non_initialized;
mod requests_action;
mod status;
pub use status::StatusController;
mod status_bar_model;
pub use requests_action::RequestsAction;
29 changes: 28 additions & 1 deletion src/http/controllers/status_controller/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::app::AppContext;
use crate::app::{AppContext, RequestMetric};
use my_http_server_swagger::*;
use rust_extensions::date_time::DateTimeAsMicroseconds;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -151,3 +151,30 @@ async fn get_readers(app: &AppContext) -> (Vec<ReaderModel>, usize, usize) {

(result, tcp_count, http_count)
}

#[derive(MyHttpInput)]
pub struct RequestActionInputContract {
#[http_query(name = "tableName"; description = "Name of a table")]
pub table_name: String,
}

#[derive(Serialize, Deserialize, Debug, MyHttpObjectStructure)]
pub struct RequestContract {
pub moment: String,
pub action: String,
pub duration: String,
pub status_code: u16,
pub size: usize,
}

impl RequestContract {
pub fn from(src: RequestMetric) -> Self {
Self {
moment: src.moment.to_rfc3339(),
action: src.name,
duration: format!("{:?}", src.duration),
status_code: src.status_code,
size: src.result_size,
}
}
}
44 changes: 44 additions & 0 deletions src/http/controllers/status_controller/requests_action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::sync::Arc;

use my_http_server::{HttpContext, HttpFailResult, HttpOkResult, HttpOutput};

use crate::app::AppContext;

use super::models::{RequestActionInputContract, RequestContract};

#[my_http_server_swagger::http_route(
method: "GET",
route: "/Monitoring/Requests",
input_data: "RequestActionInputContract",
description: "Get Requests statistic",
controller: "Monitoring",
)]
pub struct RequestsAction {
app: Arc<AppContext>,
}

impl RequestsAction {
pub fn new(app: Arc<AppContext>) -> Self {
Self { app }
}
}

async fn handle_request(
action: &RequestsAction,
input_data: RequestActionInputContract,
_ctx: &mut HttpContext,
) -> Result<HttpOkResult, HttpFailResult> {
let db_table =
crate::db_operations::read::table::get(action.app.as_ref(), input_data.table_name.as_str())
.await?;

let src = db_table.request_metrics.get_metrics().await;

let mut result = Vec::with_capacity(src.len());

for metric in db_table.request_metrics.get_metrics().await {
result.push(RequestContract::from(metric));
}

return Ok(HttpOutput::as_json(result).into_ok_result(true));
}
Loading

0 comments on commit a9cfc1f

Please sign in to comment.