Skip to content

Commit 04a92b0

Browse files
Devdutt Shenoinikhilsinhaparseable
Devdutt Shenoi
andauthored
feat: audit logging (#1063)
Add remote audit logging with the below schema: ``` { "parseable_server_version": "1.6.3", "parseable_server_deploymentId": "01JGR7PS9K28JRKNS4V6F2WNAK", "audit_version": "V1", "audit_generation_timestamp": "2025-01-04T14:16:28.483+05:30", "audit_id": "01JGR7X443QX5N45B4ZPNN5QEG", "actor_authorizationMethod": "Basic Auth", "actor_remote_host": "127.0.0.1", "actor_username": "admin", "actor_user_agent": "PostmanRuntime/7.43.0", "request_headers_accept": "*/*", "request_headers_accept-encoding": "gzip, deflate, br", "request_headers_connection": "keep-alive", "request_headers_content-length": "100", "request_headers_content-type": "application/json", "request_headers_host": "localhost:8000", "request_headers_postman-token": "6a1bd4ae-0395-4e5c-90df-b7b5105bf0b5", "request_method": "POST", "request_start_time": "2025-01-04T14:16:28.480+05:30", "request_end_time": "2025-01-04T14:16:28.483+05:30", "request_path": "/api/v1/ingest", "request_protocol": "http", "request_body_size": "1234", "response_status_code": 400, "response_error_message": "", "response_body_size": "1234", "stream": "app", } ``` Fixes #765 --------- Signed-off-by: Devdutt Shenoi <[email protected]> Co-authored-by: Nikhil Sinha <[email protected]>
1 parent 71801df commit 04a92b0

File tree

10 files changed

+514
-38
lines changed

10 files changed

+514
-38
lines changed

src/audit.rs

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::{
20+
collections::HashMap,
21+
fmt::{Debug, Display},
22+
};
23+
24+
use crate::{about::current, storage::StorageMetadata};
25+
26+
use super::option::CONFIG;
27+
use chrono::{DateTime, Utc};
28+
use once_cell::sync::Lazy;
29+
use reqwest::Client;
30+
use serde::Serialize;
31+
use serde_json::{json, Value};
32+
use tracing::error;
33+
34+
use ulid::Ulid;
35+
use url::Url;
36+
37+
static AUDIT_LOGGER: Lazy<Option<AuditLogger>> = Lazy::new(AuditLogger::new);
38+
39+
// AuditLogger handles sending audit logs to a remote logging system
40+
pub struct AuditLogger {
41+
client: Client,
42+
log_endpoint: Url,
43+
}
44+
45+
impl AuditLogger {
46+
/// Create an audit logger that can be used to capture and push
47+
/// audit logs to the appropriate logging system over HTTP
48+
pub fn new() -> Option<AuditLogger> {
49+
// Try to construct the log endpoint URL by joining the base URL
50+
// with the ingest path, This can fail if the URL is not valid,
51+
// when the base URL is not set or the ingest path is not valid
52+
let log_endpoint = match CONFIG
53+
.parseable
54+
.audit_logger
55+
.as_ref()?
56+
.join("/api/v1/ingest")
57+
{
58+
Ok(url) => url,
59+
Err(err) => {
60+
eprintln!("Couldn't setup audit logger: {err}");
61+
return None;
62+
}
63+
};
64+
65+
Some(AuditLogger {
66+
client: reqwest::Client::new(),
67+
log_endpoint,
68+
})
69+
}
70+
71+
// Sends the audit log to the configured endpoint with proper authentication
72+
async fn send_log(&self, json: Value) {
73+
let mut req = self
74+
.client
75+
.post(self.log_endpoint.as_str())
76+
.json(&json)
77+
.header("x-p-stream", "audit_log");
78+
79+
// Use basic auth if credentials are configured
80+
if let Some(username) = CONFIG.parseable.audit_username.as_ref() {
81+
req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref())
82+
}
83+
84+
match req.send().await {
85+
Ok(r) => {
86+
if let Err(e) = r.error_for_status() {
87+
error!("{e}")
88+
}
89+
}
90+
Err(e) => error!("Failed to send audit event: {}", e),
91+
}
92+
}
93+
}
94+
95+
// Represents the version of the audit log format
96+
#[non_exhaustive]
97+
#[repr(u8)]
98+
#[derive(Debug, Clone, Copy, Serialize, Default)]
99+
pub enum AuditLogVersion {
100+
// NOTE: default should be latest version
101+
#[default]
102+
V1 = 1,
103+
}
104+
105+
#[derive(Serialize, Default)]
106+
pub struct AuditDetails {
107+
pub version: AuditLogVersion,
108+
pub id: Ulid,
109+
pub generated_at: DateTime<Utc>,
110+
}
111+
112+
#[derive(Serialize, Default)]
113+
pub struct ServerDetails {
114+
pub version: String,
115+
pub deployment_id: Ulid,
116+
}
117+
118+
// Contains information about the actor (user) who performed the action
119+
#[derive(Serialize, Default)]
120+
pub struct ActorDetails {
121+
pub remote_host: String,
122+
pub user_agent: String,
123+
pub username: String,
124+
pub authorization_method: String,
125+
}
126+
127+
// Contains details about the HTTP request that was made
128+
#[derive(Serialize, Default)]
129+
pub struct RequestDetails {
130+
pub stream: String,
131+
pub start_time: DateTime<Utc>,
132+
pub end_time: DateTime<Utc>,
133+
pub method: String,
134+
pub path: String,
135+
pub protocol: String,
136+
pub headers: HashMap<String, String>,
137+
}
138+
139+
/// Contains information about the response sent back to the client
140+
#[derive(Default, Serialize)]
141+
pub struct ResponseDetails {
142+
pub status_code: u16,
143+
pub error: Option<String>,
144+
}
145+
146+
/// The main audit log structure that combines all audit information
147+
#[derive(Serialize)]
148+
pub struct AuditLog {
149+
pub audit: AuditDetails,
150+
pub parseable_server: ServerDetails,
151+
pub actor: ActorDetails,
152+
pub request: RequestDetails,
153+
pub response: ResponseDetails,
154+
}
155+
156+
/// Builder pattern implementation for constructing audit logs
157+
pub struct AuditLogBuilder {
158+
// Used to ensure that log is only constructed if the logger is enabled
159+
enabled: bool,
160+
inner: AuditLog,
161+
}
162+
163+
impl Default for AuditLogBuilder {
164+
fn default() -> Self {
165+
AuditLogBuilder {
166+
enabled: AUDIT_LOGGER.is_some(),
167+
inner: AuditLog {
168+
audit: AuditDetails {
169+
version: AuditLogVersion::V1,
170+
id: Ulid::new(),
171+
..Default::default()
172+
},
173+
parseable_server: ServerDetails {
174+
version: current().released_version.to_string(),
175+
deployment_id: StorageMetadata::global().deployment_id,
176+
},
177+
request: RequestDetails {
178+
start_time: Utc::now(),
179+
..Default::default()
180+
},
181+
actor: ActorDetails::default(),
182+
response: ResponseDetails::default(),
183+
},
184+
}
185+
}
186+
}
187+
188+
impl AuditLogBuilder {
189+
/// Sets the remote host for the audit log
190+
pub fn with_host(mut self, host: impl Into<String>) -> Self {
191+
if self.enabled {
192+
self.inner.actor.remote_host = host.into();
193+
}
194+
self
195+
}
196+
197+
/// Sets the username for the audit log
198+
pub fn with_username(mut self, username: impl Into<String>) -> Self {
199+
if self.enabled {
200+
self.inner.actor.username = username.into();
201+
}
202+
self
203+
}
204+
205+
/// Sets the user agent for the audit log
206+
pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
207+
if self.enabled {
208+
self.inner.actor.user_agent = user_agent.into();
209+
}
210+
self
211+
}
212+
213+
/// Sets the authorization method for the audit log
214+
pub fn with_auth_method(mut self, auth_method: impl Into<String>) -> Self {
215+
if self.enabled {
216+
self.inner.actor.authorization_method = auth_method.into();
217+
}
218+
self
219+
}
220+
221+
/// Sets the stream for the request details
222+
pub fn with_stream(mut self, stream: impl Into<String>) -> Self {
223+
if self.enabled {
224+
self.inner.request.stream = stream.into();
225+
}
226+
self
227+
}
228+
229+
/// Sets the request method details
230+
pub fn with_method(mut self, method: impl Into<String>) -> Self {
231+
if self.enabled {
232+
self.inner.request.method = method.into();
233+
}
234+
self
235+
}
236+
237+
/// Sets the request path
238+
pub fn with_path(mut self, path: impl Into<String>) -> Self {
239+
if self.enabled {
240+
self.inner.request.path = path.into();
241+
}
242+
self
243+
}
244+
245+
/// Sets the request protocol
246+
pub fn with_protocol(mut self, protocol: impl Into<String>) -> Self {
247+
if self.enabled {
248+
self.inner.request.protocol = protocol.into();
249+
}
250+
self
251+
}
252+
253+
/// Sets the request headers
254+
pub fn with_headers(mut self, headers: impl IntoIterator<Item = (String, String)>) -> Self {
255+
if self.enabled {
256+
self.inner.request.headers = headers.into_iter().collect();
257+
}
258+
self
259+
}
260+
261+
/// Sets the response status code
262+
pub fn with_status(mut self, status_code: u16) -> Self {
263+
if self.enabled {
264+
self.inner.response.status_code = status_code;
265+
}
266+
self
267+
}
268+
269+
/// Sets the response error if any
270+
pub fn with_error(mut self, err: impl Display) -> Self {
271+
if self.enabled {
272+
let error = err.to_string();
273+
if !error.is_empty() {
274+
self.inner.response.error = Some(error);
275+
}
276+
}
277+
self
278+
}
279+
280+
/// Sends the audit log to the logging server if configured
281+
pub async fn send(self) {
282+
// ensures that we don't progress if logger is not enabled
283+
if !self.enabled {
284+
return;
285+
}
286+
287+
// build the audit log
288+
let AuditLogBuilder {
289+
inner: mut audit_log,
290+
..
291+
} = self;
292+
293+
let now = Utc::now();
294+
audit_log.audit.generated_at = now;
295+
audit_log.request.end_time = now;
296+
297+
AUDIT_LOGGER
298+
.as_ref()
299+
.unwrap()
300+
.send_log(json!(audit_log))
301+
.await
302+
}
303+
}

src/cli.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ pub struct Cli {
116116
pub kafka_client_id: Option<String>,
117117
pub kafka_security_protocol: Option<SslProtocol>,
118118
pub kafka_partitions: Option<String>,
119+
120+
// Audit Logging env vars
121+
pub audit_logger: Option<Url>,
122+
pub audit_username: Option<String>,
123+
pub audit_password: Option<String>,
119124
}
120125

121126
impl Cli {
@@ -165,6 +170,10 @@ impl Cli {
165170
pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol";
166171
pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions";
167172

173+
pub const AUDIT_LOGGER: &'static str = "audit-logger";
174+
pub const AUDIT_USERNAME: &'static str = "audit-username";
175+
pub const AUDIT_PASSWORD: &'static str = "audit-password";
176+
168177
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
169178
self.local_staging_path.join(stream_name)
170179
}
@@ -219,6 +228,29 @@ impl Cli {
219228
.env("P_KAFKA_PARTITIONS")
220229
.value_name("STRING")
221230
.help("Kafka partitions"),
231+
)
232+
.arg(
233+
Arg::new(Self::AUDIT_LOGGER)
234+
.long(Self::AUDIT_LOGGER)
235+
.env("P_AUDIT_LOGGER")
236+
.value_name("URL")
237+
.required(false)
238+
.value_parser(validation::url)
239+
.help("Audit logger endpoint"),
240+
)
241+
.arg(
242+
Arg::new(Self::AUDIT_USERNAME)
243+
.long(Self::AUDIT_USERNAME)
244+
.env("P_AUDIT_USERNAME")
245+
.value_name("STRING")
246+
.help("Audit logger username"),
247+
)
248+
.arg(
249+
Arg::new(Self::AUDIT_PASSWORD)
250+
.long(Self::AUDIT_PASSWORD)
251+
.env("P_AUDIT_PASSWORD")
252+
.value_name("STRING")
253+
.help("Audit logger password"),
222254
)
223255
.arg(
224256
Arg::new(Self::TRINO_ENDPOINT)
@@ -536,6 +568,10 @@ impl FromArgMatches for Cli {
536568
.cloned();
537569
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();
538570

571+
self.audit_logger = m.get_one::<Url>(Self::AUDIT_LOGGER).cloned();
572+
self.audit_username = m.get_one::<String>(Self::AUDIT_USERNAME).cloned();
573+
self.audit_password = m.get_one::<String>(Self::AUDIT_PASSWORD).cloned();
574+
539575
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
540576
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
541577
self.trusted_ca_certs_path = m.get_one::<PathBuf>(Self::TRUSTED_CA_CERTS_PATH).cloned();

0 commit comments

Comments
 (0)