Skip to content

Commit 5ce9d2b

Browse files
author
Devdutt Shenoi
authored
feat: DEPRECATE caching (#1035)
Caching will be replaced by hot tier feature
1 parent eea4841 commit 5ce9d2b

24 files changed

+41
-1453
lines changed

src/banner.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919

2020
use crossterm::style::Stylize;
21-
use human_size::SpecificSize;
2221

2322
use crate::about;
2423
use crate::utils::uid::Uid;
@@ -93,7 +92,6 @@ fn status_info(config: &Config, scheme: &str, id: Uid) {
9392
/// Prints information about the `ObjectStorage`.
9493
/// - Mode (`Local drive`, `S3 bucket`)
9594
/// - Staging (temporary landing point for incoming events)
96-
/// - Cache (local cache of data)
9795
/// - Store (path where the data is stored and its latency)
9896
async fn storage_info(config: &Config) {
9997
let storage = config.storage();
@@ -109,20 +107,6 @@ async fn storage_info(config: &Config) {
109107
config.staging_dir().to_string_lossy(),
110108
);
111109

112-
if let Some(path) = &config.parseable.local_cache_path {
113-
let size: SpecificSize<human_size::Gigibyte> =
114-
SpecificSize::new(config.parseable.local_cache_size as f64, human_size::Byte)
115-
.unwrap()
116-
.into();
117-
118-
eprintln!(
119-
"\
120-
{:8}Cache: \"{}\", (size: {})",
121-
"",
122-
path.display(),
123-
size
124-
);
125-
}
126110
if let Some(path) = &config.parseable.hot_tier_storage_path {
127111
eprintln!(
128112
"\

src/cli.rs

Lines changed: 3 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,9 @@ pub struct Cli {
4545
pub domain_address: Option<Url>,
4646

4747
/// The local staging path is used as a temporary landing point
48-
/// for incoming events and local cache
48+
/// for incoming events
4949
pub local_staging_path: PathBuf,
5050

51-
/// The local cache path is used for speeding up query on latest data
52-
pub local_cache_path: Option<PathBuf>,
53-
54-
/// Size for local cache
55-
pub local_cache_size: u64,
56-
5751
/// Username for the basic authentication on the server
5852
pub username: String,
5953

@@ -96,12 +90,6 @@ pub struct Cli {
9690
/// port use by airplane(flight query service)
9791
pub flight_port: u16,
9892

99-
/// to query cached data
100-
pub query_cache_path: Option<PathBuf>,
101-
102-
/// Size for local cache
103-
pub query_cache_size: u64,
104-
10593
/// CORS behaviour
10694
pub cors: bool,
10795

@@ -129,10 +117,6 @@ impl Cli {
129117
pub const ADDRESS: &'static str = "address";
130118
pub const DOMAIN_URI: &'static str = "origin";
131119
pub const STAGING: &'static str = "local-staging-path";
132-
pub const CACHE: &'static str = "cache-path";
133-
pub const QUERY_CACHE: &'static str = "query-cache-path";
134-
pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size";
135-
pub const CACHE_SIZE: &'static str = "cache-size";
136120
pub const USERNAME: &'static str = "username";
137121
pub const PASSWORD: &'static str = "password";
138122
pub const CHECK_UPDATE: &'static str = "check-update";
@@ -255,45 +239,7 @@ impl Cli {
255239
.help("Local path on this device to be used as landing point for incoming events")
256240
.next_line_help(true),
257241
)
258-
.arg(
259-
Arg::new(Self::CACHE)
260-
.long(Self::CACHE)
261-
.env("P_CACHE_DIR")
262-
.value_name("DIR")
263-
.value_parser(validation::canonicalize_path)
264-
.help("Local path on this device to be used for caching data")
265-
.next_line_help(true),
266-
)
267-
.arg(
268-
Arg::new(Self::CACHE_SIZE)
269-
.long(Self::CACHE_SIZE)
270-
.env("P_CACHE_SIZE")
271-
.value_name("size")
272-
.default_value("1GiB")
273-
.value_parser(validation::cache_size)
274-
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
275-
.next_line_help(true),
276-
)
277-
.arg(
278-
Arg::new(Self::QUERY_CACHE)
279-
.long(Self::QUERY_CACHE)
280-
.env("P_QUERY_CACHE_DIR")
281-
.value_name("DIR")
282-
.value_parser(validation::canonicalize_path)
283-
.help("Local path on this device to be used for caching data")
284-
.next_line_help(true),
285-
)
286-
.arg(
287-
Arg::new(Self::QUERY_CACHE_SIZE)
288-
.long(Self::QUERY_CACHE_SIZE)
289-
.env("P_QUERY_CACHE_SIZE")
290-
.value_name("size")
291-
.default_value("1GiB")
292-
.value_parser(validation::cache_size)
293-
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
294-
.next_line_help(true),
295-
)
296-
.arg(
242+
.arg(
297243
Arg::new(Self::USERNAME)
298244
.long(Self::USERNAME)
299245
.env("P_USERNAME")
@@ -423,7 +369,7 @@ impl Cli {
423369
.arg(
424370
// RowGroupSize controls the number of rows present in one row group
425371
// More rows = better compression but HIGHER Memory consumption during read/write
426-
// 1048576 is the default value for DataFusion
372+
// 1048576 is the default value for DataFusion
427373
Arg::new(Self::ROW_GROUP_SIZE)
428374
.long(Self::ROW_GROUP_SIZE)
429375
.env("P_PARQUET_ROW_GROUP_SIZE")
@@ -520,8 +466,6 @@ impl FromArgMatches for Cli {
520466
self.trino_schema = m.get_one::<String>(Self::TRINO_SCHEMA).cloned();
521467
self.trino_username = m.get_one::<String>(Self::TRINO_USER_NAME).cloned();
522468

523-
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
524-
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
525469
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
526470
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
527471
self.trusted_ca_certs_path = m.get_one::<PathBuf>(Self::TRUSTED_CA_CERTS_PATH).cloned();
@@ -541,14 +485,6 @@ impl FromArgMatches for Cli {
541485
.get_one::<PathBuf>(Self::STAGING)
542486
.cloned()
543487
.expect("default value for staging");
544-
self.local_cache_size = m
545-
.get_one::<u64>(Self::CACHE_SIZE)
546-
.cloned()
547-
.expect("default value for cache size");
548-
self.query_cache_size = m
549-
.get_one(Self::QUERY_CACHE_SIZE)
550-
.cloned()
551-
.expect("default value for query cache size");
552488
self.username = m
553489
.get_one::<String>(Self::USERNAME)
554490
.cloned()

src/handlers/airplane.rs

Lines changed: 4 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,13 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
3434
use tonic_web::GrpcWebLayer;
3535

3636
use crate::handlers::http::cluster::get_ingestor_info;
37-
38-
use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
39-
use crate::metrics::QUERY_EXECUTE_TIME;
40-
use crate::option::CONFIG;
41-
42-
use crate::handlers::livetail::cross_origin_config;
43-
4437
use crate::handlers::http::query::{
45-
authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed,
38+
authorize_and_set_filter_tags, into_query, update_schema_when_distributed,
4639
};
40+
use crate::handlers::livetail::cross_origin_config;
41+
use crate::metrics::QUERY_EXECUTE_TIME;
42+
use crate::option::CONFIG;
4743
use crate::query::{TableScanVisitor, QUERY_SESSION};
48-
use crate::querycache::QueryCacheManager;
4944
use crate::utils::arrow::flight::{
5045
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
5146
send_to_ingester,
@@ -64,8 +59,6 @@ use crate::metadata::STREAM_INFO;
6459
use crate::rbac;
6560
use crate::rbac::Users;
6661

67-
use super::http::query::get_results_from_cache;
68-
6962
#[derive(Clone, Debug)]
7063
pub struct AirServiceImpl {}
7164

@@ -156,46 +149,11 @@ impl FlightService for AirServiceImpl {
156149

157150
let streams = visitor.into_inner();
158151

159-
let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
160-
.await
161-
.unwrap_or(None);
162-
163-
let cache_results = req
164-
.metadata()
165-
.get(CACHE_RESULTS_HEADER_KEY)
166-
.and_then(|value| value.to_str().ok()); // I dont think we need to own this.
167-
168-
let show_cached = req
169-
.metadata()
170-
.get(CACHE_VIEW_HEADER_KEY)
171-
.and_then(|value| value.to_str().ok());
172-
173-
let user_id = req
174-
.metadata()
175-
.get(USER_ID_HEADER_KEY)
176-
.and_then(|value| value.to_str().ok());
177152
let stream_name = streams
178153
.first()
179154
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
180155
.to_owned();
181156

182-
// send the cached results
183-
if let Ok(cache_results) = get_results_from_cache(
184-
show_cached,
185-
query_cache_manager,
186-
&stream_name,
187-
user_id,
188-
&ticket.start_time,
189-
&ticket.end_time,
190-
&ticket.query,
191-
ticket.send_null,
192-
ticket.fields,
193-
)
194-
.await
195-
{
196-
return cache_results.into_flight();
197-
}
198-
199157
update_schema_when_distributed(streams)
200158
.await
201159
.map_err(|err| Status::internal(err.to_string()))?;
@@ -258,21 +216,6 @@ impl FlightService for AirServiceImpl {
258216
.await
259217
.map_err(|err| Status::internal(err.to_string()))?;
260218

261-
if let Err(err) = put_results_in_cache(
262-
cache_results,
263-
user_id,
264-
query_cache_manager,
265-
&stream_name,
266-
&records,
267-
query.start.to_rfc3339(),
268-
query.end.to_rfc3339(),
269-
ticket.query,
270-
)
271-
.await
272-
{
273-
error!("{}", err);
274-
};
275-
276219
/*
277220
* INFO: No returning the schema with the data.
278221
* kept it in case it needs to be sent in the future.

src/handlers/http/cache.rs

Lines changed: 0 additions & 95 deletions
This file was deleted.

src/handlers/http/ingest.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use crate::event::{
2727
};
2828
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
2929
use crate::handlers::STREAM_NAME_HEADER_KEY;
30-
use crate::localcache::CacheError;
3130
use crate::metadata::error::stream_info::MetadataError;
3231
use crate::metadata::STREAM_INFO;
3332
use crate::option::{Mode, CONFIG};
@@ -236,8 +235,6 @@ pub enum PostError {
236235
#[error("Error: {0}")]
237236
DashboardError(#[from] DashboardError),
238237
#[error("Error: {0}")]
239-
CacheError(#[from] CacheError),
240-
#[error("Error: {0}")]
241238
StreamError(#[from] StreamError),
242239
}
243240

@@ -259,7 +256,6 @@ impl actix_web::ResponseError for PostError {
259256
PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
260257
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
261258
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
262-
PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR,
263259
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
264260
}
265261
}

0 commit comments

Comments
 (0)