Skip to content

Commit

Permalink
Merge pull request #109 from nightly-labs/connection-stats
Browse files Browse the repository at this point in the history
Connection stats
  • Loading branch information
Giems authored Mar 4, 2024
2 parents 162aa69 + 6f66dca commit 841d542
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 33 deletions.
14 changes: 6 additions & 8 deletions database/migrations/0009_connection_events.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
CREATE TABLE connection_events (
event_id BIGSERIAL PRIMARY KEY,
event_id BIGSERIAL NOT NULL,
app_id TEXT NOT NULL, -- Always references the related app, whether an app or client connection
session_id TEXT NOT NULL,
connection_id TEXT, -- NULL for clients, unique for app connections
entity_id TEXT NOT NULL, -- client_profile_id or app_id
entity_type entity_type_enum NOT NULL, -- 'client' or 'app'
connection_id TEXT, -- Unique for each connection instance, NULL for clients, UNIQUE per app connection
entity_id TEXT NOT NULL, -- The ID of the connecting entity (could be the same app_id or client_profile_id)
entity_type entity_type_enum NOT NULL, -- Distinguishes between 'client' and 'app'
network TEXT NOT NULL,
connected_at TIMESTAMPTZ NOT NULL,
disconnected_at TIMESTAMPTZ
);

CREATE INDEX idx_connection_events_session ON connection_events(session_id);
CREATE INDEX idx_connection_events_entity ON connection_events(entity_id, entity_type);
);
8 changes: 6 additions & 2 deletions database/migrations/0011_create_hypertables.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
SELECT
create_hypertable('sessions', 'session_open_timestamp');
create_hypertable('connection_events', 'connected_at');

SELECT
create_hypertable('requests', 'creation_timestamp');

SELECT
create_hypertable('requests', 'creation_timestamp');
create_hypertable('sessions', 'session_open_timestamp');

4 changes: 4 additions & 0 deletions database/migrations/0012_requests_stats.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ ALTER MATERIALIZED VIEW hourly_requests_stats_per_app
set
(timescaledb.materialized_only = false);



----------------- Daily requests stats per app -----------------
--- View
CREATE MATERIALIZED VIEW daily_requests_stats_per_app WITH (timescaledb.continuous) AS
Expand Down Expand Up @@ -63,6 +65,8 @@ ALTER MATERIALIZED VIEW daily_requests_stats_per_app
set
(timescaledb.materialized_only = false);



----------------- Monthly requests per app -----------------
--- View
CREATE MATERIALIZED VIEW monthly_requests_stats_per_app WITH (timescaledb.continuous) AS
Expand Down
90 changes: 90 additions & 0 deletions database/migrations/0014_connection_stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
----------------- Hourly connection stats per app -----------------
--- View
CREATE MATERIALIZED VIEW hourly_connection_stats_per_app_and_network WITH (timescaledb.continuous) AS
SELECT
app_id,
network,
time_bucket('1 hour', connected_at) AS hourly_bucket,
COUNT(*) FILTER (WHERE entity_type = 'App') :: BIGINT AS hourly_app_connection_count,
COUNT(*) FILTER (WHERE entity_type = 'Client') :: BIGINT AS hourly_clients_connection_count
FROM
connection_events
GROUP BY
app_id,
network,
hourly_bucket WITH NO DATA;

--- Refresh policy
SELECT
add_continuous_aggregate_policy('hourly_connection_stats_per_app_and_network',
start_offset => INTERVAL '2 day',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);

--- Real time aggregation
ALTER MATERIALIZED VIEW hourly_connection_stats_per_app_and_network
set
(timescaledb.materialized_only = false);



----------------- Daily connection stats per app -----------------
--- View
CREATE MATERIALIZED VIEW daily_connection_stats_per_app_and_network WITH (timescaledb.continuous) AS
SELECT
app_id,
network,
time_bucket('1 day', hourly_bucket) AS daily_bucket,
SUM(hourly_app_connection_count) :: BIGINT AS daily_app_connection_count,
SUM(hourly_clients_connection_count) :: BIGINT AS daily_clients_connection_count
FROM
hourly_connection_stats_per_app_and_network
GROUP BY
app_id,
network,
daily_bucket WITH NO DATA;

--- Refresh policy
SELECT
add_continuous_aggregate_policy('daily_connection_stats_per_app_and_network',
start_offset => INTERVAL '1 month',
end_offset => INTERVAL '1 day',
schedule_interval => INTERVAL '1 day'
);

--- Real time aggregation
ALTER MATERIALIZED VIEW daily_connection_stats_per_app_and_network
set
(timescaledb.materialized_only = false);



----------------- Monthly connection per app -----------------
--- View
CREATE MATERIALIZED VIEW monthly_connection_stats_per_app_and_network WITH (timescaledb.continuous) AS
SELECT
app_id,
network,
time_bucket('1 month', daily_bucket) AS monthly_bucket,
SUM(daily_app_connection_count) :: BIGINT AS monthly_app_connection_count,
SUM(daily_clients_connection_count) :: BIGINT AS monthly_clients_connection_count
FROM
daily_connection_stats_per_app_and_network
GROUP BY
app_id,
network,
monthly_bucket WITH NO DATA;

--- Refresh policy
SELECT
add_continuous_aggregate_policy('monthly_connection_stats_per_app_and_network',
start_offset => INTERVAL '1 year',
end_offset => INTERVAL '1 month',
schedule_interval => INTERVAL '1 month'
);

--- Real time aggregation
ALTER MATERIALIZED VIEW monthly_connection_stats_per_app_and_network
set
(timescaledb.materialized_only = false);
166 changes: 166 additions & 0 deletions database/src/aggregated_views_queries/connections_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
use crate::{
db::Db,
structs::{filter_requests::ConnectionStats, time_filters::TimeFilter},
tables::utils::{format_view_keys, format_view_name},
};
use sqlx::Error;

pub const CONNECTIONS_STATS_BASE_VIEW_NAME: &str = "connection_stats_per_app_and_network";
pub const CONNECTIONS_STATS_BASE_KEYS: [(&'static str, bool); 5] = [
("app_id", false),
("network", false),
("bucket", true),
("app_connection_count", true),
("clients_connection_count", true),
];

impl Db {
pub async fn get_connections_stats_by_app_id(
&self,
app_id: &str,
network: Option<&str>,
filter: TimeFilter,
) -> Result<Vec<ConnectionStats>, Error> {
let start_date = filter.to_date();
let bucket_size = filter.bucket_size();

// Correctly selecting the view based on the bucket_size
let prefix = match bucket_size {
"1 hour" => "hourly",
"1 day" => "daily",
"1 month" => "monthly",
// TODO for now return WorkerCrashed but later create custom error
_ => return Err(Error::WorkerCrashed),
};

let formatted_keys = format_view_keys(prefix, &CONNECTIONS_STATS_BASE_KEYS);
let formatted_view_name = format_view_name(prefix, CONNECTIONS_STATS_BASE_VIEW_NAME);
let date_filter_key = CONNECTIONS_STATS_BASE_KEYS[2].0;
let filter = format!("{prefix}_{date_filter_key}");

let network_filter = match network {
Some(network) => format!("AND network = '{network}'"),
None => "".to_string(),
};

let query = format!(
"SELECT {formatted_keys}
FROM {formatted_view_name}
WHERE app_id = $1 AND {filter} >= $2 {network_filter}
ORDER BY {filter} DESC",
);

sqlx::query_as::<_, ConnectionStats>(&query)
.bind(app_id)
.bind(start_date)
.fetch_all(&self.connection_pool)
.await
}
}

#[cfg(test)]
mod tests {

use crate::{structs::time_filters::TimeFilter, tables::sessions::table_struct::DbNcSession};
use sqlx::types::chrono::Utc;

#[tokio::test]
async fn test_connections_all_networks() {
let db = super::Db::connect_to_the_pool().await;
db.truncate_all_tables().await.unwrap();

// Create test team instance
let team_id = "test_team_id".to_string();
let app_id = "test_app_id".to_string();

db.setup_test_team(&team_id, &app_id, Utc::now())
.await
.unwrap();

let networks = vec![
"test_network_1",
"test_network_2",
"test_network_3",
"test_network_4",
"test_network_5",
];
// Create persistent a session for each odd number of network, for each session connect via app 3 times and for client connect number of network times
for (i, network) in networks.iter().enumerate() {
let session_id = format!("session_{app_id}_{i}");

let session = DbNcSession {
session_id: session_id.clone(),
app_id: app_id.clone(),
app_metadata: "test_metadata".to_string(),
app_ip_address: "".to_string(),
persistent: true,
network: network.to_string(),
client_profile_id: None,
client: None,
session_open_timestamp: Utc::now(),
session_close_timestamp: None,
};

db.handle_new_session(&session, &format!("connection_id_{app_id}_{i}").to_string())
.await
.unwrap();

// Each time a session is created, means that app has been connected, create 2 more connections
let mut tx = db.connection_pool.begin().await.unwrap();
db.create_new_connection_event_by_app(
&mut tx,
&session_id,
&format!("connection_id_{app_id}_{i}_1").to_string(),
&app_id,
&network.to_string(),
)
.await
.unwrap();

db.create_new_connection_event_by_app(
&mut tx,
&session_id,
&format!("connection_id_{app_id}_{i}_2").to_string(),
&app_id,
&network.to_string(),
)
.await
.unwrap();

for j in 0..i {
db.create_new_connection_by_client(
&mut tx,
&app_id,
&session_id,
j as i64,
&network.to_string(),
)
.await
.unwrap();
}

tx.commit().await.unwrap();
}

// Manually refresh the continuous aggregates
db.refresh_continuous_aggregates(vec![
"hourly_connection_stats_per_app_and_network".to_string(),
"daily_connection_stats_per_app_and_network".to_string(),
"monthly_connection_stats_per_app_and_network".to_string(),
])
.await
.unwrap();

// Get stats for all networks
let stats = db
.get_connections_stats_by_app_id(&app_id, None, TimeFilter::LastMonth)
.await
.unwrap();

for (i, network) in networks.iter().enumerate() {
let network_stats = stats.iter().find(|s| s.network == *network).unwrap();
assert_eq!(network_stats.app_connection_count, 3);
assert_eq!(network_stats.clients_connection_count, i as i64);
}
}
}
1 change: 1 addition & 0 deletions database/src/aggregated_views_queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod connections_stats;
pub mod requests_stats;
pub mod session_average_time;
pub mod sessions_stats;
14 changes: 8 additions & 6 deletions database/src/aggregated_views_queries/sessions_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,12 @@ mod tests {
session_close_timestamp: None,
};

db.handle_new_session(&session, &"connection_id".to_string())
.await
.unwrap();
db.handle_new_session(
&session,
&format!("connection_id_{}_{}", app_id, i).to_string(),
)
.await
.unwrap();
db.close_session(&session.session_id, session_end)
.await
.unwrap();
Expand All @@ -116,9 +119,8 @@ mod tests {
.await
.unwrap();

println!("{:?}", stats);
// assert_eq!(stats.len(), 1);
// assert_eq!(stats[0].sessions_opened, num_sessions as i64);
assert_eq!(stats.len(), 1);
assert_eq!(stats[0].sessions_opened, num_sessions as i64);
}

#[tokio::test]
Expand Down
9 changes: 9 additions & 0 deletions database/src/structs/filter_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,12 @@ pub struct SessionAvgTime {
pub bucket: DateTime<Utc>,
pub average_duration_seconds: f64,
}

#[derive(Debug, sqlx::FromRow)]
pub struct ConnectionStats {
pub app_id: String,
pub bucket: DateTime<Utc>,
pub network: String,
pub app_connection_count: i64,
pub clients_connection_count: i64,
}
17 changes: 14 additions & 3 deletions database/src/tables/connection_events/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,20 @@ impl Db {
&self,
app_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
let query = format!(
"SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE entity_id = $1 AND entity_type = $2"
);
let query = format!("SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE app_id = $1");
let typed_query = query_as::<_, ConnectionEvent>(&query);

return typed_query
.bind(&app_id)
.fetch_all(&self.connection_pool)
.await;
}

pub async fn get_connection_events_by_app(
&self,
app_id: &String,
) -> Result<Vec<ConnectionEvent>, sqlx::Error> {
let query = format!("SELECT * FROM {CONNECTION_EVENTS_TABLE_NAME} WHERE entity_id = $1 AND entity_type = $2");
let typed_query = query_as::<_, ConnectionEvent>(&query);

return typed_query
Expand Down
Loading

0 comments on commit 841d542

Please sign in to comment.