Skip to content

Commit 2b6739c

Browse files
authored
feat: Add Users API for Dashboads, caching and filters
* add the dashboards api * add: filters api * add cli args for query caching * misc changes * update errors * refactor: improve time parsing logic in query handler * impl query result caching * update querying with cache * chore: clean up * impl query caching * add ability to store filters and dashboards in memory * fix: bug if user_id is not provided * misc add license headings * cleanup rough edges * update arrow flight server to perform query cache * fix: users root dir excluded when listing streams * change the key for cache * add cache endpoints * update .gitignore to exclude directory called cache .gitignore was excluding anything that matched the pattern `cache*` causing the cache module to not be included * fix bug if user id is not provided and query caching is enabled Request hung up if the user_id was not provided in the header, of the query request and query caching was enabled. * add value check for the show_cached and cache_results headers * clean up for better readablility * fix: the issue with caching data * fix: issue creating multiple cache files
1 parent 87461f8 commit 2b6739c

38 files changed

+1722
-113
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ env-file
1313
parseable
1414
parseable_*
1515
parseable-env-secret
16-
cache*
16+
cache
1717

server/Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ build = "build.rs"
99

1010
[dependencies]
1111
### apache arrow/datafusion dependencies
12+
# arrow = "51.0.0"
1213
arrow-schema = { version = "51.0.0", features = ["serde"] }
1314
arrow-array = { version = "51.0.0" }
1415
arrow-json = "51.0.0"
1516
arrow-ipc = { version = "51.0.0", features = ["zstd"] }
1617
arrow-select = "51.0.0"
1718
datafusion = "37.1.0"
18-
object_store = { version = "0.9.1", features = ["cloud", "aws"] }
19+
object_store = { version = "0.9.1", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
1920
parquet = "51.0.0"
2021
arrow-flight = { version = "51.0.0", features = [ "tls" ] }
2122
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
@@ -72,11 +73,11 @@ relative-path = { version = "1.7", features = ["serde"] }
7273
reqwest = { version = "0.11.27", default_features = false, features = [
7374
"rustls-tls",
7475
"json",
75-
] }
76-
rustls = "0.22.4"
76+
] } # cannot update cause rustls is not latest `see rustls`
77+
rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet
7778
rustls-pemfile = "2.1.2"
7879
semver = "1.0"
79-
serde = { version = "1.0", features = ["rc"] }
80+
serde = { version = "1.0", features = ["rc", "derive"] }
8081
serde_json = "1.0"
8182
static-files = "0.2"
8283
sysinfo = "0.30.11"

server/src/catalog.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ async fn create_manifest(
230230
.ok_or(IOError::new(
231231
ErrorKind::Other,
232232
"Failed to create upper bound for manifest",
233-
))
234-
.map_err(ObjectStorageError::IoError)?,
233+
))?,
235234
)
236235
.and_utc();
237236

server/src/cli.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ pub struct Cli {
9292

9393
/// port use by airplane(flight query service)
9494
pub flight_port: u16,
95+
96+
/// to query cached data
97+
pub query_cache_path: Option<PathBuf>,
98+
99+
/// Size for local cache
100+
pub query_cache_size: u64,
95101
}
96102

97103
impl Cli {
@@ -102,6 +108,8 @@ impl Cli {
102108
pub const DOMAIN_URI: &'static str = "origin";
103109
pub const STAGING: &'static str = "local-staging-path";
104110
pub const CACHE: &'static str = "cache-path";
111+
pub const QUERY_CACHE: &'static str = "query-cache-path";
112+
pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size";
105113
pub const CACHE_SIZE: &'static str = "cache-size";
106114
pub const USERNAME: &'static str = "username";
107115
pub const PASSWORD: &'static str = "password";
@@ -191,6 +199,25 @@ impl Cli {
191199
.next_line_help(true),
192200
)
193201

202+
.arg(
203+
Arg::new(Self::QUERY_CACHE)
204+
.long(Self::QUERY_CACHE)
205+
.env("P_QUERY_CACHE_DIR")
206+
.value_name("DIR")
207+
.value_parser(validation::canonicalize_path)
208+
.help("Local path on this device to be used for caching data")
209+
.next_line_help(true),
210+
)
211+
.arg(
212+
Arg::new(Self::QUERY_CACHE_SIZE)
213+
.long(Self::QUERY_CACHE_SIZE)
214+
.env("P_QUERY_CACHE_SIZE")
215+
.value_name("size")
216+
.default_value("1GiB")
217+
.value_parser(validation::cache_size)
218+
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
219+
.next_line_help(true),
220+
)
194221
.arg(
195222
Arg::new(Self::USERNAME)
196223
.long(Self::USERNAME)
@@ -372,6 +399,7 @@ impl FromArgMatches for Cli {
372399

373400
fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
374401
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
402+
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
375403
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
376404
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
377405
self.domain_address = m.get_one::<Url>(Self::DOMAIN_URI).cloned();
@@ -394,6 +422,10 @@ impl FromArgMatches for Cli {
394422
.get_one::<u64>(Self::CACHE_SIZE)
395423
.cloned()
396424
.expect("default value for cache size");
425+
self.query_cache_size = m
426+
.get_one(Self::QUERY_CACHE_SIZE)
427+
.cloned()
428+
.expect("default value for query cache size");
397429
self.username = m
398430
.get_one::<String>(Self::USERNAME)
399431
.cloned()

server/src/event/writer/file_writer.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,7 @@ impl FileWriter {
4949
) -> Result<(), StreamWriterError> {
5050
match self.get_mut(schema_key) {
5151
Some(writer) => {
52-
writer
53-
.writer
54-
.write(record)
55-
.map_err(StreamWriterError::Writer)?;
52+
writer.writer.write(record)?;
5653
}
5754
// entry is not present thus we create it
5855
None => {
@@ -100,8 +97,6 @@ fn init_new_stream_writer_file(
10097
let mut stream_writer = StreamWriter::try_new(file, &record.schema())
10198
.expect("File and RecordBatch both are checked");
10299

103-
stream_writer
104-
.write(record)
105-
.map_err(StreamWriterError::Writer)?;
100+
stream_writer.write(record)?;
106101
Ok((path, stream_writer))
107102
}

server/src/handlers.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ pub mod livetail;
2323
const PREFIX_TAGS: &str = "x-p-tag-";
2424
const PREFIX_META: &str = "x-p-meta-";
2525
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
26+
const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results";
27+
const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached";
28+
const USER_ID_HEADER_KEY: &str = "x-p-user-id";
2629
const LOG_SOURCE_KEY: &str = "x-p-log-source";
2730
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
2831
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";

server/src/handlers/airplane.rs

Lines changed: 92 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
119
use arrow_array::RecordBatch;
2-
use arrow_flight::encode::FlightDataEncoderBuilder;
320
use arrow_flight::flight_service_server::FlightServiceServer;
421
use arrow_flight::PollInfo;
522
use arrow_schema::ArrowError;
623

724
use datafusion::common::tree_node::TreeNode;
825
use serde_json::json;
926
use std::net::SocketAddr;
10-
use std::sync::Arc;
1127
use std::time::Instant;
1228
use tonic::codec::CompressionEncoding;
1329

@@ -16,34 +32,38 @@ use futures_util::{Future, TryFutureExt};
1632
use tonic::transport::{Identity, Server, ServerTlsConfig};
1733
use tonic_web::GrpcWebLayer;
1834

19-
use crate::event::commit_schema;
2035
use crate::handlers::http::cluster::get_ingestor_info;
21-
use crate::handlers::http::fetch_schema;
2236

37+
use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
2338
use crate::metrics::QUERY_EXECUTE_TIME;
24-
use crate::option::{Mode, CONFIG};
39+
use crate::option::CONFIG;
2540

2641
use crate::handlers::livetail::cross_origin_config;
2742

28-
use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query};
43+
use crate::handlers::http::query::{
44+
authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed,
45+
};
2946
use crate::query::{TableScanVisitor, QUERY_SESSION};
30-
use crate::storage::object_storage::commit_schema_to_storage;
47+
use crate::querycache::QueryCacheManager;
3148
use crate::utils::arrow::flight::{
32-
append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester,
49+
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
50+
send_to_ingester,
3351
};
3452
use arrow_flight::{
3553
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
3654
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
3755
SchemaResult, Ticket,
3856
};
3957
use arrow_ipc::writer::IpcWriteOptions;
40-
use futures::{stream, TryStreamExt};
58+
use futures::stream;
4159
use tonic::{Request, Response, Status, Streaming};
4260

4361
use crate::handlers::livetail::extract_session_key;
4462
use crate::metadata::STREAM_INFO;
4563
use crate::rbac::Users;
4664

65+
use super::http::query::get_results_from_cache;
66+
4767
#[derive(Clone, Debug)]
4868
pub struct AirServiceImpl {}
4969

@@ -112,7 +132,7 @@ impl FlightService for AirServiceImpl {
112132
async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
113133
let key = extract_session_key(req.metadata())?;
114134

115-
let ticket = get_query_from_ticket(req)?;
135+
let ticket = get_query_from_ticket(&req)?;
116136

117137
log::info!("query requested to airplane: {:?}", ticket);
118138

@@ -132,32 +152,57 @@ impl FlightService for AirServiceImpl {
132152
let mut visitor = TableScanVisitor::default();
133153
let _ = raw_logical_plan.visit(&mut visitor);
134154

135-
let tables = visitor.into_inner();
136-
137-
if CONFIG.parseable.mode == Mode::Query {
138-
// using http to get the schema. may update to use flight later
139-
for table in tables {
140-
if let Ok(new_schema) = fetch_schema(&table).await {
141-
// commit schema merges the schema internally and updates the schema in storage.
142-
commit_schema_to_storage(&table, new_schema.clone())
143-
.await
144-
.map_err(|err| Status::internal(err.to_string()))?;
145-
commit_schema(&table, Arc::new(new_schema))
146-
.map_err(|err| Status::internal(err.to_string()))?;
147-
}
148-
}
155+
let streams = visitor.into_inner();
156+
157+
let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
158+
.await
159+
.unwrap_or(None);
160+
161+
let cache_results = req
162+
.metadata()
163+
.get(CACHE_RESULTS_HEADER_KEY)
164+
.and_then(|value| value.to_str().ok()); // I dont think we need to own this.
165+
166+
let show_cached = req
167+
.metadata()
168+
.get(CACHE_VIEW_HEADER_KEY)
169+
.and_then(|value| value.to_str().ok());
170+
171+
let user_id = req
172+
.metadata()
173+
.get(USER_ID_HEADER_KEY)
174+
.and_then(|value| value.to_str().ok());
175+
let stream_name = streams
176+
.first()
177+
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
178+
.to_owned();
179+
180+
// send the cached results
181+
if let Ok(cache_results) = get_results_from_cache(
182+
show_cached,
183+
query_cache_manager,
184+
&stream_name,
185+
user_id,
186+
&ticket.start_time,
187+
&ticket.end_time,
188+
&ticket.query,
189+
ticket.send_null,
190+
ticket.fields,
191+
)
192+
.await
193+
{
194+
return cache_results.into_flight();
149195
}
150196

197+
update_schema_when_distributed(streams)
198+
.await
199+
.map_err(|err| Status::internal(err.to_string()))?;
200+
151201
// map payload to query
152202
let mut query = into_query(&ticket, &session_state)
153203
.await
154204
.map_err(|_| Status::internal("Failed to parse query"))?;
155205

156-
// if table name is not present it is a Malformed Query
157-
let stream_name = query
158-
.first_table_name()
159-
.ok_or_else(|| Status::invalid_argument("Malformed Query"))?;
160-
161206
let event =
162207
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
163208
let sql = format!("select * from {}", &stream_name);
@@ -192,11 +237,26 @@ impl FlightService for AirServiceImpl {
192237
Status::permission_denied("User Does not have permission to access this")
193238
})?;
194239
let time = Instant::now();
195-
let (results, _) = query
240+
let (records, _) = query
196241
.execute(stream_name.clone())
197242
.await
198243
.map_err(|err| Status::internal(err.to_string()))?;
199244

245+
if let Err(err) = put_results_in_cache(
246+
cache_results,
247+
user_id,
248+
query_cache_manager,
249+
&stream_name,
250+
&records,
251+
query.start.to_rfc3339(),
252+
query.end.to_rfc3339(),
253+
ticket.query,
254+
)
255+
.await
256+
{
257+
log::error!("{}", err);
258+
};
259+
200260
/*
201261
* INFO: No returning the schema with the data.
202262
* kept it in case it needs to be sent in the future.
@@ -208,18 +268,7 @@ impl FlightService for AirServiceImpl {
208268
.collect::<Vec<_>>();
209269
let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
210270
*/
211-
let input_stream = futures::stream::iter(results.into_iter().map(Ok));
212-
let write_options = IpcWriteOptions::default()
213-
.try_with_compression(Some(arrow_ipc::CompressionType(1)))
214-
.map_err(|err| Status::failed_precondition(err.to_string()))?;
215-
216-
let flight_data_stream = FlightDataEncoderBuilder::new()
217-
.with_max_flight_data_size(usize::MAX)
218-
.with_options(write_options)
219-
// .with_schema(schema.into())
220-
.build(input_stream);
221-
222-
let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));
271+
let out = into_flight_data(records);
223272

224273
if let Some(event) = event {
225274
event.clear(&stream_name);
@@ -230,9 +279,7 @@ impl FlightService for AirServiceImpl {
230279
.with_label_values(&[&format!("flight-query-{}", stream_name)])
231280
.observe(time);
232281

233-
Ok(Response::new(
234-
Box::pin(flight_data_stream) as Self::DoGetStream
235-
))
282+
out
236283
}
237284

238285
async fn do_put(

server/src/handlers/http.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::option::CONFIG;
2626
use self::{cluster::get_ingestor_info, query::Query};
2727

2828
pub(crate) mod about;
29+
mod cache;
2930
pub mod cluster;
3031
pub(crate) mod health_check;
3132
pub(crate) mod ingest;
@@ -39,7 +40,7 @@ mod otel;
3940
pub(crate) mod query;
4041
pub(crate) mod rbac;
4142
pub(crate) mod role;
42-
43+
pub mod users;
4344
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
4445
pub const API_BASE_PATH: &str = "api";
4546
pub const API_VERSION: &str = "v1";

0 commit comments

Comments
 (0)