Skip to content

Commit

Permalink
Now we update amount to sync after we flush data to socket as well
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Jun 20, 2022
1 parent 5bf3a60 commit 083ecaa
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 13 deletions.
20 changes: 16 additions & 4 deletions src/app/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use prometheus::{Encoder, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder};

use crate::{data_readers::DataReaderConnection, db::DbTableMetrics};
use crate::db::DbTableMetrics;

#[async_trait::async_trait]
pub trait UpdatePendingToSyncModel {
async fn get_name(&self) -> Option<String>;
fn get_pending_to_sync(&self) -> usize;
}

pub struct PrometheusMetrics {
registry: Registry,
Expand Down Expand Up @@ -93,7 +99,10 @@ impl PrometheusMetrics {
.set(persist_delay);
}

pub async fn update_pending_to_sync(&self, data_reader_connection: &DataReaderConnection) {
pub async fn update_pending_to_sync<TUpdatePendingToSyncModel: UpdatePendingToSyncModel>(
&self,
data_reader_connection: &TUpdatePendingToSyncModel,
) {
let name = data_reader_connection.get_name().await;

if name.is_none() {
Expand All @@ -102,14 +111,17 @@ impl PrometheusMetrics {

let name = name.unwrap();

let pending_to_sync = data_reader_connection.get_pending_to_send();
let pending_to_sync = data_reader_connection.get_pending_to_sync();

self.pending_to_sync
.with_label_values(&[&name])
.set(pending_to_sync as i64);
}

pub async fn remove_pending_to_sync(&self, data_reader_connection: &DataReaderConnection) {
pub async fn remove_pending_to_sync<TUpdatePendingToSyncModel: UpdatePendingToSyncModel>(
&self,
data_reader_connection: &TUpdatePendingToSyncModel,
) {
let name = data_reader_connection.get_name().await;
if name.is_none() {
return;
Expand Down
1 change: 1 addition & 0 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ mod request_metrics;

pub use app_ctx::{AppContext, APP_VERSION, DEFAULT_PERSIST_PERIOD};
pub use metrics::PrometheusMetrics;
pub use metrics::UpdatePendingToSyncModel;
pub use persist_history_duration::PersistHistoryDuration;
pub use request_metrics::{RequestMetric, RequestMetrics};
10 changes: 8 additions & 2 deletions src/background/tcp_connection_send_event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ use std::sync::Arc;

use rust_extensions::events_loop::EventsLoopTick;

use crate::data_readers::tcp_connection::TcpConnectionInfo;
use crate::{app::AppContext, data_readers::tcp_connection::TcpConnectionInfo};

pub struct TcpConnectionSendEventLoop {
app: Arc<AppContext>,
tcp_connection_info: Arc<TcpConnectionInfo>,
}

impl TcpConnectionSendEventLoop {
pub fn new(tcp_connection_info: Arc<TcpConnectionInfo>) -> Self {
pub fn new(app: Arc<AppContext>, tcp_connection_info: Arc<TcpConnectionInfo>) -> Self {
Self {
tcp_connection_info,
app,
}
}
}
Expand All @@ -20,5 +22,9 @@ impl TcpConnectionSendEventLoop {
impl EventsLoopTick<()> for TcpConnectionSendEventLoop {
async fn tick(&self, _model: ()) {
self.tcp_connection_info.flush_payloads().await;
self.app
.metrics
.update_pending_to_sync(self.tcp_connection_info.as_ref())
.await;
}
}
21 changes: 14 additions & 7 deletions src/data_readers/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ pub enum DataReaderConnection {
}

impl DataReaderConnection {
pub fn get_pending_to_send(&self) -> usize {
match self {
DataReaderConnection::Tcp(tcp_info) => tcp_info.get_pending_to_send(),
DataReaderConnection::Http(http_info) => http_info.get_pending_to_send(),
}
}

pub async fn get_name(&self) -> Option<String> {
match self {
DataReaderConnection::Tcp(tcp_info) => tcp_info.get_name().await,
Expand All @@ -29,3 +22,17 @@ impl DataReaderConnection {
}
}
}

#[async_trait::async_trait]
impl crate::app::UpdatePendingToSyncModel for DataReaderConnection {
async fn get_name(&self) -> Option<String> {
self.get_name().await
}

fn get_pending_to_sync(&self) -> usize {
match self {
DataReaderConnection::Tcp(tcp_info) => tcp_info.get_pending_to_send(),
DataReaderConnection::Http(http_info) => http_info.get_pending_to_send(),
}
}
}
1 change: 1 addition & 0 deletions src/data_readers/data_readers_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl DataReadersList {
connection_info
.flush_events_loop
.register_event_loop(Arc::new(TcpConnectionSendEventLoop::new(
app.clone(),
connection_info.clone(),
)))
.await;
Expand Down
11 changes: 11 additions & 0 deletions src/data_readers/tcp_connection/tcp_connection_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,14 @@ impl TcpConnectionInfo {
.load(std::sync::atomic::Ordering::Relaxed)
}
}

#[async_trait::async_trait]
impl crate::app::UpdatePendingToSyncModel for TcpConnectionInfo {
async fn get_name(&self) -> Option<String> {
self.get_name().await
}

fn get_pending_to_sync(&self) -> usize {
self.get_pending_to_send()
}
}

0 comments on commit 083ecaa

Please sign in to comment.