Skip to content

Commit 794c8f1

Browse files
parmesantnitisht
andauthored
feat: backend for Correlation (#1052)
--------- Co-authored-by: Nitish Tiwari <[email protected]>
1 parent a1a9073 commit 794c8f1

File tree

12 files changed

+622
-7
lines changed

12 files changed

+622
-7
lines changed

src/correlation/correlation_utils.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use itertools::Itertools;
20+
21+
use crate::rbac::{
22+
map::SessionKey,
23+
role::{Action, Permission},
24+
Users,
25+
};
26+
27+
use super::{CorrelationError, TableConfig};
28+
29+
pub async fn user_auth_for_query(
30+
session_key: &SessionKey,
31+
table_configs: &[TableConfig],
32+
) -> Result<(), CorrelationError> {
33+
let tables = table_configs.iter().map(|t| &t.table_name).collect_vec();
34+
let permissions = Users.get_permissions(session_key);
35+
36+
for table_name in tables {
37+
let mut authorized = false;
38+
39+
// in permission check if user can run query on the stream.
40+
// also while iterating add any filter tags for this stream
41+
for permission in permissions.iter() {
42+
match permission {
43+
Permission::Stream(Action::All, _) => {
44+
authorized = true;
45+
break;
46+
}
47+
Permission::StreamWithTag(Action::Query, ref stream, _)
48+
if stream == table_name || stream == "*" =>
49+
{
50+
authorized = true;
51+
}
52+
_ => (),
53+
}
54+
}
55+
56+
if !authorized {
57+
return Err(CorrelationError::Unauthorized);
58+
}
59+
}
60+
61+
Ok(())
62+
}

src/correlation/mod.rs

Lines changed: 314 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,314 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::collections::HashSet;
20+
21+
use actix_web::http::header::ContentType;
22+
use chrono::Utc;
23+
use correlation_utils::user_auth_for_query;
24+
use datafusion::error::DataFusionError;
25+
use http::StatusCode;
26+
use itertools::Itertools;
27+
use once_cell::sync::Lazy;
28+
use serde::{Deserialize, Serialize};
29+
use serde_json::Error as SerdeError;
30+
use tokio::sync::RwLock;
31+
use tracing::{error, trace, warn};
32+
33+
use crate::{
34+
handlers::http::rbac::RBACError, option::CONFIG, query::QUERY_SESSION, rbac::map::SessionKey,
35+
storage::ObjectStorageError, users::filters::FilterQuery, utils::get_hash,
36+
};
37+
38+
pub mod correlation_utils;
39+
40+
pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);
41+
42+
#[derive(Debug, Default)]
43+
pub struct Correlation(RwLock<Vec<CorrelationConfig>>);
44+
45+
impl Correlation {
46+
pub async fn load(&self) -> Result<(), CorrelationError> {
47+
// lead correlations from storage
48+
let store = CONFIG.storage().get_object_store();
49+
let all_correlations = store.get_correlations().await.unwrap_or_default();
50+
51+
let mut correlations = vec![];
52+
for corr in all_correlations {
53+
if corr.is_empty() {
54+
continue;
55+
}
56+
57+
let correlation: CorrelationConfig = match serde_json::from_slice(&corr) {
58+
Ok(c) => c,
59+
Err(e) => {
60+
error!("Unable to load correlation- {e}");
61+
continue;
62+
}
63+
};
64+
65+
correlations.push(correlation);
66+
}
67+
68+
let mut s = self.0.write().await;
69+
s.append(&mut correlations.clone());
70+
Ok(())
71+
}
72+
73+
pub async fn list_correlations_for_user(
74+
&self,
75+
session_key: &SessionKey,
76+
) -> Result<Vec<CorrelationConfig>, CorrelationError> {
77+
let correlations = self.0.read().await.iter().cloned().collect_vec();
78+
79+
let mut user_correlations = vec![];
80+
for c in correlations {
81+
if user_auth_for_query(session_key, &c.table_configs)
82+
.await
83+
.is_ok()
84+
{
85+
user_correlations.push(c);
86+
}
87+
}
88+
Ok(user_correlations)
89+
}
90+
91+
pub async fn get_correlation_by_id(
92+
&self,
93+
correlation_id: &str,
94+
) -> Result<CorrelationConfig, CorrelationError> {
95+
let read = self.0.read().await;
96+
let correlation = read.iter().find(|c| c.id == correlation_id).cloned();
97+
98+
if let Some(c) = correlation {
99+
Ok(c)
100+
} else {
101+
Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!(
102+
"Unable to find correlation with ID- {correlation_id}"
103+
))))
104+
}
105+
}
106+
107+
pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> {
108+
// save to memory
109+
let mut s = self.0.write().await;
110+
s.retain(|c| c.id != correlation.id);
111+
s.push(correlation.clone());
112+
Ok(())
113+
}
114+
115+
pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> {
116+
// now delete from memory
117+
let read_access = self.0.read().await;
118+
119+
let index = read_access
120+
.iter()
121+
.enumerate()
122+
.find(|(_, c)| c.id == correlation_id)
123+
.to_owned();
124+
125+
if let Some((index, _)) = index {
126+
// drop the read access in order to get exclusive write access
127+
drop(read_access);
128+
self.0.write().await.remove(index);
129+
trace!("removed correlation from memory");
130+
} else {
131+
warn!("Correlation ID- {correlation_id} not found in memory!");
132+
}
133+
Ok(())
134+
}
135+
}
136+
137+
#[derive(Debug, Clone, Serialize, Deserialize)]
138+
#[serde(rename_all = "camelCase")]
139+
pub enum CorrelationVersion {
140+
V1,
141+
}
142+
143+
#[derive(Debug, Clone, Serialize, Deserialize)]
144+
#[serde(rename_all = "camelCase")]
145+
pub struct CorrelationConfig {
146+
pub version: CorrelationVersion,
147+
pub id: String,
148+
pub table_configs: Vec<TableConfig>,
149+
pub join_config: JoinConfig,
150+
pub filter: Option<FilterQuery>,
151+
pub start_time: Option<String>,
152+
pub end_time: Option<String>,
153+
}
154+
155+
impl CorrelationConfig {}
156+
157+
#[derive(Debug, Clone, Serialize, Deserialize)]
158+
#[serde(rename_all = "camelCase")]
159+
pub struct CorrelationRequest {
160+
pub version: CorrelationVersion,
161+
pub table_configs: Vec<TableConfig>,
162+
pub join_config: JoinConfig,
163+
pub filter: Option<FilterQuery>,
164+
pub start_time: Option<String>,
165+
pub end_time: Option<String>,
166+
}
167+
168+
impl From<CorrelationRequest> for CorrelationConfig {
169+
fn from(val: CorrelationRequest) -> Self {
170+
Self {
171+
version: val.version,
172+
id: get_hash(Utc::now().timestamp_micros().to_string().as_str()),
173+
table_configs: val.table_configs,
174+
join_config: val.join_config,
175+
filter: val.filter,
176+
start_time: val.start_time,
177+
end_time: val.end_time,
178+
}
179+
}
180+
}
181+
182+
impl CorrelationRequest {
183+
pub fn generate_correlation_config(self, id: String) -> CorrelationConfig {
184+
CorrelationConfig {
185+
version: self.version,
186+
id,
187+
table_configs: self.table_configs,
188+
join_config: self.join_config,
189+
filter: self.filter,
190+
start_time: self.start_time,
191+
end_time: self.end_time,
192+
}
193+
}
194+
195+
/// This function will validate the TableConfigs, JoinConfig, and user auth
196+
pub async fn validate(&self, session_key: &SessionKey) -> Result<(), CorrelationError> {
197+
let ctx = &QUERY_SESSION;
198+
199+
let h1: HashSet<&String> = self.table_configs.iter().map(|t| &t.table_name).collect();
200+
let h2: HashSet<&String> = self
201+
.join_config
202+
.join_conditions
203+
.iter()
204+
.map(|j| &j.table_name)
205+
.collect();
206+
207+
// check if table config tables are the same
208+
if h1.len() != 2 {
209+
return Err(CorrelationError::Metadata(
210+
"Must provide config for two unique tables",
211+
));
212+
}
213+
214+
// check that the tables mentioned in join config are
215+
// the same as those in table config
216+
if h1 != h2 {
217+
return Err(CorrelationError::Metadata(
218+
"Must provide same tables for join config and table config",
219+
));
220+
}
221+
222+
// check if user has access to table
223+
user_auth_for_query(session_key, &self.table_configs).await?;
224+
225+
// to validate table config, we need to check whether the mentioned fields
226+
// are present in the table or not
227+
for table_config in self.table_configs.iter() {
228+
// table config check
229+
let df = ctx.table(&table_config.table_name).await?;
230+
231+
let mut selected_fields = table_config
232+
.selected_fields
233+
.iter()
234+
.map(|c| c.as_str())
235+
.collect_vec();
236+
237+
// unwrap because we have determined that the tables in table config are the same as those in join config
238+
let condition = self
239+
.join_config
240+
.join_conditions
241+
.iter()
242+
.find(|j| j.table_name == table_config.table_name)
243+
.unwrap();
244+
let join_field = condition.field.as_str();
245+
246+
if !selected_fields.contains(&join_field) {
247+
selected_fields.push(join_field);
248+
}
249+
250+
// if this errors out then the table config is incorrect or join config is incorrect
251+
df.select_columns(selected_fields.as_slice())?;
252+
}
253+
254+
Ok(())
255+
}
256+
}
257+
258+
#[derive(Debug, thiserror::Error)]
259+
pub enum CorrelationError {
260+
#[error("Failed to connect to storage: {0}")]
261+
ObjectStorage(#[from] ObjectStorageError),
262+
#[error("Serde Error: {0}")]
263+
Serde(#[from] SerdeError),
264+
#[error("Cannot perform this operation: {0}")]
265+
Metadata(&'static str),
266+
#[error("User does not exist")]
267+
UserDoesNotExist(#[from] RBACError),
268+
#[error("Error: {0}")]
269+
AnyhowError(#[from] anyhow::Error),
270+
#[error("Unauthorized")]
271+
Unauthorized,
272+
#[error("DataFusion Error: {0}")]
273+
DataFusion(#[from] DataFusionError),
274+
}
275+
276+
impl actix_web::ResponseError for CorrelationError {
277+
fn status_code(&self) -> http::StatusCode {
278+
match self {
279+
Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR,
280+
Self::Serde(_) => StatusCode::BAD_REQUEST,
281+
Self::Metadata(_) => StatusCode::BAD_REQUEST,
282+
Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND,
283+
Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
284+
Self::Unauthorized => StatusCode::BAD_REQUEST,
285+
Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR,
286+
}
287+
}
288+
289+
fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
290+
actix_web::HttpResponse::build(self.status_code())
291+
.insert_header(ContentType::plaintext())
292+
.body(self.to_string())
293+
}
294+
}
295+
296+
#[derive(Debug, Clone, Serialize, Deserialize)]
297+
#[serde(rename_all = "camelCase")]
298+
pub struct TableConfig {
299+
pub selected_fields: Vec<String>,
300+
pub table_name: String,
301+
}
302+
303+
#[derive(Debug, Clone, Serialize, Deserialize)]
304+
#[serde(rename_all = "camelCase")]
305+
pub struct JoinCondition {
306+
pub table_name: String,
307+
pub field: String,
308+
}
309+
310+
#[derive(Debug, Clone, Serialize, Deserialize)]
311+
#[serde(rename_all = "camelCase")]
312+
pub struct JoinConfig {
313+
pub join_conditions: Vec<JoinCondition>,
314+
}

0 commit comments

Comments
 (0)