Skip to content

Commit 16c97a4

Browse files
authored
fix analytics for the cluster (#732)
* fix analytics for the cluster * Add active_ingesters and inactive_ingesters metrics * updated ingesters' count and event related metrics
1 parent 3680f00 commit 16c97a4

File tree

3 files changed

+146
-30
lines changed

3 files changed

+146
-30
lines changed

server/src/analytics.rs

Lines changed: 127 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
*/
1919

2020
use crate::about::{current, platform};
21-
use crate::option::CONFIG;
21+
use crate::handlers::http::cluster::utils::check_liveness;
22+
use crate::handlers::http::{base_path_without_preceding_slash, cluster};
23+
use crate::option::{Mode, CONFIG};
2224
use crate::storage;
2325
use crate::{metadata, stats};
2426

27+
use actix_web::{web, HttpRequest, Responder};
2528
use chrono::{DateTime, Utc};
2629
use clokwerk::{AsyncScheduler, Interval};
30+
use http::header;
2731
use once_cell::sync::Lazy;
2832
use serde::{Deserialize, Serialize};
2933
use serde_json::Value;
@@ -56,14 +60,21 @@ pub struct Report {
5660
cpu_count: usize,
5761
memory_total_bytes: u64,
5862
platform: String,
59-
mode: String,
63+
storage_mode: String,
64+
server_mode: String,
6065
version: String,
6166
commit_hash: String,
67+
active_ingesters: u64,
68+
inactive_ingesters: u64,
69+
stream_count: usize,
70+
total_events_count: u64,
71+
total_json_bytes: u64,
72+
total_parquet_bytes: u64,
6273
metrics: HashMap<String, Value>,
6374
}
6475

6576
impl Report {
66-
pub fn new() -> Self {
77+
pub async fn new() -> Self {
6778
let mut upt: f64 = 0.0;
6879
if let Ok(uptime) = uptime_lib::get() {
6980
upt = uptime.as_secs_f64();
@@ -80,6 +91,7 @@ impl Report {
8091
cpu_count = info.cpus().len();
8192
mem_total = info.total_memory();
8293
}
94+
let ingester_metrics = fetch_ingesters_metrics().await;
8395

8496
Self {
8597
deployment_id: storage::StorageMetadata::global().deployment_id,
@@ -90,10 +102,17 @@ impl Report {
90102
cpu_count,
91103
memory_total_bytes: mem_total,
92104
platform: platform().to_string(),
93-
mode: CONFIG.get_storage_mode_string().to_string(),
105+
storage_mode: CONFIG.get_storage_mode_string().to_string(),
106+
server_mode: CONFIG.parseable.mode.to_string(),
94107
version: current().released_version.to_string(),
95108
commit_hash: current().commit_hash,
96-
metrics: build_metrics(),
109+
active_ingesters: ingester_metrics.0,
110+
inactive_ingesters: ingester_metrics.1,
111+
stream_count: ingester_metrics.2,
112+
total_events_count: ingester_metrics.3,
113+
total_json_bytes: ingester_metrics.4,
114+
total_parquet_bytes: ingester_metrics.5,
115+
metrics: build_metrics().await,
97116
}
98117
}
99118

@@ -103,6 +122,12 @@ impl Report {
103122
}
104123
}
105124

125+
/// build the node metrics for the node ingester endpoint
126+
pub async fn get_analytics(_: HttpRequest) -> impl Responder {
127+
let json = NodeMetrics::build();
128+
web::Json(json)
129+
}
130+
106131
fn total_streams() -> usize {
107132
metadata::STREAM_INFO.list_streams().len()
108133
}
@@ -123,25 +148,65 @@ fn total_event_stats() -> (u64, u64, u64) {
123148
(total_events, total_json_bytes, total_parquet_bytes)
124149
}
125150

126-
fn build_metrics() -> HashMap<String, Value> {
151+
async fn fetch_ingesters_metrics() -> (u64, u64, usize, u64, u64, u64) {
152+
let event_stats = total_event_stats();
153+
let mut node_metrics =
154+
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);
155+
156+
let mut vec = vec![];
157+
let mut active_ingesters = 0u64;
158+
let mut offline_ingesters = 0u64;
159+
if CONFIG.parseable.mode == Mode::Query {
160+
// send analytics for ingest servers
161+
162+
// ingester infos should be valid here, if not some thing is wrong
163+
let ingester_infos = cluster::get_ingester_info().await.unwrap();
164+
165+
for im in ingester_infos {
166+
if !check_liveness(&im.domain_name).await {
167+
offline_ingesters += 1;
168+
continue;
169+
}
170+
171+
let uri = url::Url::parse(&format!(
172+
"{}{}/analytics",
173+
im.domain_name,
174+
base_path_without_preceding_slash()
175+
))
176+
.expect("Should be a valid URL");
177+
178+
let resp = reqwest::Client::new()
179+
.get(uri)
180+
.header(header::AUTHORIZATION, im.token.clone())
181+
.header(header::CONTENT_TYPE, "application/json")
182+
.send()
183+
.await
184+
.unwrap(); // should respond
185+
186+
let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await.unwrap()).unwrap();
187+
vec.push(data);
188+
active_ingesters += 1;
189+
}
190+
191+
node_metrics.accumulate(&mut vec);
192+
}
193+
194+
(
195+
active_ingesters,
196+
offline_ingesters,
197+
node_metrics.stream_count,
198+
node_metrics.total_events_count,
199+
node_metrics.total_json_bytes,
200+
node_metrics.total_parquet_bytes,
201+
)
202+
}
203+
204+
async fn build_metrics() -> HashMap<String, Value> {
127205
// sysinfo refreshed in previous function
128206
// so no need to refresh again
129207
let sys = SYS_INFO.lock().unwrap();
130208

131209
let mut metrics = HashMap::new();
132-
metrics.insert("stream_count".to_string(), total_streams().into());
133-
134-
// total_event_stats returns event count, json bytes, parquet bytes in that order
135-
metrics.insert(
136-
"total_events_count".to_string(),
137-
total_event_stats().0.into(),
138-
);
139-
metrics.insert("total_json_bytes".to_string(), total_event_stats().1.into());
140-
metrics.insert(
141-
"total_parquet_bytes".to_string(),
142-
total_event_stats().2.into(),
143-
);
144-
145210
metrics.insert("memory_in_use_bytes".to_string(), sys.used_memory().into());
146211
metrics.insert("memory_free_bytes".to_string(), sys.free_memory().into());
147212

@@ -162,7 +227,7 @@ pub fn init_analytics_scheduler() {
162227
scheduler
163228
.every(ANALYTICS_SEND_INTERVAL_SECONDS)
164229
.run(move || async {
165-
Report::new().send().await;
230+
Report::new().await.send().await;
166231
});
167232

168233
tokio::spawn(async move {
@@ -172,3 +237,45 @@ pub fn init_analytics_scheduler() {
172237
}
173238
});
174239
}
240+
241+
#[derive(Serialize, Deserialize, Default, Debug)]
242+
struct NodeMetrics {
243+
stream_count: usize,
244+
total_events_count: u64,
245+
total_json_bytes: u64,
246+
total_parquet_bytes: u64,
247+
}
248+
249+
impl NodeMetrics {
250+
fn build() -> Self {
251+
let event_stats = total_event_stats();
252+
Self {
253+
stream_count: total_streams(),
254+
total_events_count: event_stats.0,
255+
total_json_bytes: event_stats.1,
256+
total_parquet_bytes: event_stats.2,
257+
}
258+
}
259+
260+
fn new(
261+
stream_count: usize,
262+
total_events_count: u64,
263+
total_json_bytes: u64,
264+
total_parquet_bytes: u64,
265+
) -> Self {
266+
Self {
267+
stream_count,
268+
total_events_count,
269+
total_json_bytes,
270+
total_parquet_bytes,
271+
}
272+
}
273+
274+
fn accumulate(&mut self, other: &mut [NodeMetrics]) {
275+
other.iter().for_each(|nm| {
276+
self.total_events_count += nm.total_events_count;
277+
self.total_json_bytes += nm.total_json_bytes;
278+
self.total_parquet_bytes += nm.total_parquet_bytes;
279+
});
280+
}
281+
}

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,24 @@ impl IngestServer {
138138
.service(Server::get_query_factory())
139139
.service(Server::get_ingest_factory())
140140
.service(Self::logstream_api())
141-
.service(Server::get_about_factory()),
141+
.service(Server::get_about_factory())
142+
.service(Self::analytics_factory()),
142143
)
143144
.service(Server::get_liveness_factory())
144145
.service(Server::get_readiness_factory());
145146
}
146147

148+
fn analytics_factory() -> Scope {
149+
web::scope("/analytics").service(
150+
// GET "/analytics" ==> Get analytics data
151+
web::resource("").route(
152+
web::get()
153+
.to(analytics::get_analytics)
154+
.authorize(Action::GetAnalytics),
155+
),
156+
)
157+
}
158+
147159
fn logstream_api() -> Scope {
148160
web::scope("/logstream")
149161
.service(
@@ -298,11 +310,6 @@ impl IngestServer {
298310
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
299311
sync::object_store_sync();
300312

301-
// all internal data structures populated now.
302-
// start the analytics scheduler if enabled
303-
if CONFIG.parseable.send_analytics {
304-
analytics::init_analytics_scheduler();
305-
}
306313
let app = self.start(prometheus, CONFIG.parseable.openid.clone());
307314
tokio::pin!(app);
308315
loop {

server/src/rbac/role.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub enum Action {
4949
ListClusterMetrics,
5050
DeleteIngester,
5151
All,
52+
GetAnalytics,
5253
}
5354

5455
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -102,7 +103,11 @@ impl RoleBuilder {
102103
| Action::CreateStream
103104
| Action::DeleteStream
104105
| Action::GetStream
105-
| Action::ListStream => Permission::Unit(action),
106+
| Action::ListStream
107+
| Action::ListCluster
108+
| Action::ListClusterMetrics
109+
| Action::DeleteIngester
110+
| Action::GetAnalytics => Permission::Unit(action),
106111
Action::Ingest
107112
| Action::GetSchema
108113
| Action::GetStats
@@ -113,9 +118,6 @@ impl RoleBuilder {
113118
| Action::PutAlert
114119
| Action::GetAlert
115120
| Action::All => Permission::Stream(action, self.stream.clone().unwrap()),
116-
Action::ListCluster => Permission::Unit(action),
117-
Action::ListClusterMetrics => Permission::Unit(action),
118-
Action::DeleteIngester => Permission::Unit(action),
119121
};
120122
perms.push(perm);
121123
}

0 commit comments

Comments
 (0)