diff --git a/src/app/metrics.rs b/src/app/metrics.rs index e1bce35..4445e46 100644 --- a/src/app/metrics.rs +++ b/src/app/metrics.rs @@ -1,6 +1,12 @@ use prometheus::{Encoder, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}; -use crate::{data_readers::DataReaderConnection, db::DbTableMetrics}; +use crate::db::DbTableMetrics; + +#[async_trait::async_trait] +pub trait UpdatePendingToSyncModel { + async fn get_name(&self) -> Option; + fn get_pending_to_sync(&self) -> usize; +} pub struct PrometheusMetrics { registry: Registry, @@ -93,7 +99,10 @@ impl PrometheusMetrics { .set(persist_delay); } - pub async fn update_pending_to_sync(&self, data_reader_connection: &DataReaderConnection) { + pub async fn update_pending_to_sync( + &self, + data_reader_connection: &TUpdatePendingToSyncModel, + ) { let name = data_reader_connection.get_name().await; if name.is_none() { @@ -102,14 +111,17 @@ impl PrometheusMetrics { let name = name.unwrap(); - let pending_to_sync = data_reader_connection.get_pending_to_send(); + let pending_to_sync = data_reader_connection.get_pending_to_sync(); self.pending_to_sync .with_label_values(&[&name]) .set(pending_to_sync as i64); } - pub async fn remove_pending_to_sync(&self, data_reader_connection: &DataReaderConnection) { + pub async fn remove_pending_to_sync( + &self, + data_reader_connection: &TUpdatePendingToSyncModel, + ) { let name = data_reader_connection.get_name().await; if name.is_none() { return; diff --git a/src/app/mod.rs b/src/app/mod.rs index f044d57..886eec9 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -7,5 +7,6 @@ mod request_metrics; pub use app_ctx::{AppContext, APP_VERSION, DEFAULT_PERSIST_PERIOD}; pub use metrics::PrometheusMetrics; +pub use metrics::UpdatePendingToSyncModel; pub use persist_history_duration::PersistHistoryDuration; pub use request_metrics::{RequestMetric, RequestMetrics}; diff --git a/src/background/tcp_connection_send_event_loop.rs b/src/background/tcp_connection_send_event_loop.rs index 643a085..c30495e 100644 --- a/src/background/tcp_connection_send_event_loop.rs +++ b/src/background/tcp_connection_send_event_loop.rs @@ -2,16 +2,18 @@ use std::sync::Arc; use rust_extensions::events_loop::EventsLoopTick; -use crate::data_readers::tcp_connection::TcpConnectionInfo; +use crate::{app::AppContext, data_readers::tcp_connection::TcpConnectionInfo}; pub struct TcpConnectionSendEventLoop { + app: Arc, tcp_connection_info: Arc, } impl TcpConnectionSendEventLoop { - pub fn new(tcp_connection_info: Arc) -> Self { + pub fn new(app: Arc, tcp_connection_info: Arc) -> Self { Self { tcp_connection_info, + app, } } } @@ -20,5 +22,9 @@ impl TcpConnectionSendEventLoop { impl EventsLoopTick<()> for TcpConnectionSendEventLoop { async fn tick(&self, _model: ()) { self.tcp_connection_info.flush_payloads().await; + self.app + .metrics + .update_pending_to_sync(self.tcp_connection_info.as_ref()) + .await; } } diff --git a/src/data_readers/connection.rs b/src/data_readers/connection.rs index 335d75f..7f2c91b 100644 --- a/src/data_readers/connection.rs +++ b/src/data_readers/connection.rs @@ -8,13 +8,6 @@ pub enum DataReaderConnection { } impl DataReaderConnection { - pub fn get_pending_to_send(&self) -> usize { - match self { - DataReaderConnection::Tcp(tcp_info) => tcp_info.get_pending_to_send(), - DataReaderConnection::Http(http_info) => http_info.get_pending_to_send(), - } - } - pub async fn get_name(&self) -> Option { match self { DataReaderConnection::Tcp(tcp_info) => tcp_info.get_name().await, @@ -29,3 +22,17 @@ impl DataReaderConnection { } } } + +#[async_trait::async_trait] +impl crate::app::UpdatePendingToSyncModel for DataReaderConnection { + async fn get_name(&self) -> Option { + self.get_name().await + } + + fn get_pending_to_sync(&self) -> usize { + match self { + DataReaderConnection::Tcp(tcp_info) => tcp_info.get_pending_to_send(), + DataReaderConnection::Http(http_info) => http_info.get_pending_to_send(), + } + } +} diff --git a/src/data_readers/data_readers_list.rs b/src/data_readers/data_readers_list.rs index 89e88c9..476dc27 100644 --- a/src/data_readers/data_readers_list.rs +++ b/src/data_readers/data_readers_list.rs @@ -46,6 +46,7 @@ impl DataReadersList { connection_info .flush_events_loop .register_event_loop(Arc::new(TcpConnectionSendEventLoop::new( + app.clone(), connection_info.clone(), ))) .await; diff --git a/src/data_readers/tcp_connection/tcp_connection_info.rs b/src/data_readers/tcp_connection/tcp_connection_info.rs index 1a26ecc..0ea0798 100644 --- a/src/data_readers/tcp_connection/tcp_connection_info.rs +++ b/src/data_readers/tcp_connection/tcp_connection_info.rs @@ -106,3 +106,14 @@ impl TcpConnectionInfo { .load(std::sync::atomic::Ordering::Relaxed) } } + +#[async_trait::async_trait] +impl crate::app::UpdatePendingToSyncModel for TcpConnectionInfo { + async fn get_name(&self) -> Option { + self.get_name().await + } + + fn get_pending_to_sync(&self) -> usize { + self.get_pending_to_send() + } +}