Skip to content

Commit f1af947

Browse files
Devdutt Shenoinitisht
Devdutt Shenoi
andauthored
refactor: utils/time parsing (#1024)
TimeRange is a new type to de-serialize strings into start and end Datetime and store it for representing a time range elsewhere in code. Currently usage is mainly in query. --------- Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 386a662 commit f1af947

File tree

5 files changed

+207
-75
lines changed

5 files changed

+207
-75
lines changed

src/handlers/airplane.rs

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::utils::arrow::flight::{
4545
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
4646
send_to_ingester,
4747
};
48+
use crate::utils::time::TimeRange;
4849
use arrow_flight::{
4950
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
5051
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
@@ -143,6 +144,8 @@ impl FlightService for AirServiceImpl {
143144
Status::internal("Failed to create logical plan")
144145
})?;
145146

147+
let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
148+
.map_err(|e| Status::internal(e.to_string()))?;
146149
// create a visitor to extract the table name
147150
let mut visitor = TableScanVisitor::default();
148151
let _ = raw_logical_plan.visit(&mut visitor);
@@ -159,38 +162,40 @@ impl FlightService for AirServiceImpl {
159162
.map_err(|err| Status::internal(err.to_string()))?;
160163

161164
// map payload to query
162-
let mut query = into_query(&ticket, &session_state)
165+
let mut query = into_query(&ticket, &session_state, time_range)
163166
.await
164167
.map_err(|_| Status::internal("Failed to parse query"))?;
165168

166-
let event =
167-
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
168-
let sql = format!("select * from {}", &stream_name);
169-
let start_time = ticket.start_time.clone();
170-
let end_time = ticket.end_time.clone();
171-
let out_ticket = json!({
172-
"query": sql,
173-
"startTime": start_time,
174-
"endTime": end_time
175-
})
176-
.to_string();
177-
178-
let ingester_metadatas = get_ingestor_info()
179-
.await
180-
.map_err(|err| Status::failed_precondition(err.to_string()))?;
181-
let mut minute_result: Vec<RecordBatch> = vec![];
182-
183-
for im in ingester_metadatas {
184-
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
185-
minute_result.append(&mut batches);
186-
}
169+
let event = if send_to_ingester(
170+
query.time_range.start.timestamp_millis(),
171+
query.time_range.end.timestamp_millis(),
172+
) {
173+
let sql = format!("select * from {}", &stream_name);
174+
let start_time = ticket.start_time.clone();
175+
let end_time = ticket.end_time.clone();
176+
let out_ticket = json!({
177+
"query": sql,
178+
"startTime": start_time,
179+
"endTime": end_time
180+
})
181+
.to_string();
182+
183+
let ingester_metadatas = get_ingestor_info()
184+
.await
185+
.map_err(|err| Status::failed_precondition(err.to_string()))?;
186+
let mut minute_result: Vec<RecordBatch> = vec![];
187+
188+
for im in ingester_metadatas {
189+
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
190+
minute_result.append(&mut batches);
187191
}
188-
let mr = minute_result.iter().collect::<Vec<_>>();
189-
let event = append_temporary_events(&stream_name, mr).await?;
190-
Some(event)
191-
} else {
192-
None
193-
};
192+
}
193+
let mr = minute_result.iter().collect::<Vec<_>>();
194+
let event = append_temporary_events(&stream_name, mr).await?;
195+
Some(event)
196+
} else {
197+
None
198+
};
194199

195200
// try authorize
196201
match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) {

src/handlers/http/query.rs

Lines changed: 10 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use crate::response::QueryResponse;
4747
use crate::storage::object_storage::commit_schema_to_storage;
4848
use crate::storage::ObjectStorageError;
4949
use crate::utils::actix::extract_session_key_from_req;
50+
use crate::utils::time::{TimeParseError, TimeRange};
5051

5152
use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
5253

@@ -80,13 +81,17 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
8081
.await?
8182
}
8283
};
84+
85+
let time_range =
86+
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
87+
8388
// create a visitor to extract the table name
8489
let mut visitor = TableScanVisitor::default();
8590
let _ = raw_logical_plan.visit(&mut visitor);
8691

8792
let tables = visitor.into_inner();
8893
update_schema_when_distributed(tables).await?;
89-
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;
94+
let mut query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
9095

9196
let creds = extract_session_key_from_req(&req)?;
9297
let permissions = Users.get_permissions(&creds);
@@ -218,6 +223,7 @@ impl FromRequest for Query {
218223
pub async fn into_query(
219224
query: &Query,
220225
session_state: &SessionState,
226+
time_range: TimeRange,
221227
) -> Result<LogicalQuery, QueryError> {
222228
if query.query.is_empty() {
223229
return Err(QueryError::EmptyQuery);
@@ -231,42 +237,13 @@ pub async fn into_query(
231237
return Err(QueryError::EmptyEndTime);
232238
}
233239

234-
let (start, end) = parse_human_time(&query.start_time, &query.end_time)?;
235-
236-
if start.timestamp() > end.timestamp() {
237-
return Err(QueryError::StartTimeAfterEndTime);
238-
}
239-
240240
Ok(crate::query::Query {
241241
raw_logical_plan: session_state.create_logical_plan(&query.query).await?,
242-
start,
243-
end,
242+
time_range,
244243
filter_tag: query.filter_tags.clone(),
245244
})
246245
}
247246

248-
fn parse_human_time(
249-
start_time: &str,
250-
end_time: &str,
251-
) -> Result<(DateTime<Utc>, DateTime<Utc>), QueryError> {
252-
let start: DateTime<Utc>;
253-
let end: DateTime<Utc>;
254-
255-
if end_time == "now" {
256-
end = Utc::now();
257-
start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?;
258-
} else {
259-
start = DateTime::parse_from_rfc3339(start_time)
260-
.map_err(|_| QueryError::StartTimeParse)?
261-
.into();
262-
end = DateTime::parse_from_rfc3339(end_time)
263-
.map_err(|_| QueryError::EndTimeParse)?
264-
.into();
265-
};
266-
267-
Ok((start, end))
268-
}
269-
270247
/// unused for now, might need it in the future
271248
#[allow(unused)]
272249
fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
@@ -312,16 +289,8 @@ pub enum QueryError {
312289
EmptyStartTime,
313290
#[error("End time cannot be empty")]
314291
EmptyEndTime,
315-
#[error("Could not parse start time correctly")]
316-
StartTimeParse,
317-
#[error("Could not parse end time correctly")]
318-
EndTimeParse,
319-
#[error("While generating times for 'now' failed to parse duration")]
320-
NotValidDuration(#[from] humantime::DurationError),
321-
#[error("Parsed duration out of range")]
322-
OutOfRange(#[from] chrono::OutOfRangeError),
323-
#[error("Start time cannot be greater than the end time")]
324-
StartTimeAfterEndTime,
292+
#[error("Error while parsing provided time range: {0}")]
293+
TimeParse(#[from] TimeParseError),
325294
#[error("Unauthorized")]
326295
Unauthorized,
327296
#[error("Datafusion Error: {0}")]

src/query/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use crate::event;
4646
use crate::metadata::STREAM_INFO;
4747
use crate::option::CONFIG;
4848
use crate::storage::{ObjectStorageProvider, StorageDir};
49+
use crate::utils::time::TimeRange;
4950

5051
pub static QUERY_SESSION: Lazy<SessionContext> =
5152
Lazy::new(|| Query::create_session_context(CONFIG.storage()));
@@ -54,8 +55,7 @@ pub static QUERY_SESSION: Lazy<SessionContext> =
5455
#[derive(Debug)]
5556
pub struct Query {
5657
pub raw_logical_plan: LogicalPlan,
57-
pub start: DateTime<Utc>,
58-
pub end: DateTime<Utc>,
58+
pub time_range: TimeRange,
5959
pub filter_tag: Option<Vec<String>>,
6060
}
6161

@@ -164,8 +164,8 @@ impl Query {
164164
LogicalPlan::Explain(plan) => {
165165
let transformed = transform(
166166
plan.plan.as_ref().clone(),
167-
self.start.naive_utc(),
168-
self.end.naive_utc(),
167+
self.time_range.start.naive_utc(),
168+
self.time_range.end.naive_utc(),
169169
filters,
170170
time_partition,
171171
);
@@ -182,8 +182,8 @@ impl Query {
182182
x => {
183183
transform(
184184
x,
185-
self.start.naive_utc(),
186-
self.end.naive_utc(),
185+
self.time_range.start.naive_utc(),
186+
self.time_range.end.naive_utc(),
187187
filters,
188188
time_partition,
189189
)

src/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub mod actix;
2020
pub mod arrow;
2121
pub mod header_parsing;
2222
pub mod json;
23+
pub mod time;
2324
pub mod uid;
2425
pub mod update;
2526
use crate::handlers::http::rbac::RBACError;

src/utils/time.rs

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use chrono::{DateTime, Utc};
20+
21+
#[derive(Debug, thiserror::Error)]
22+
pub enum TimeParseError {
23+
#[error("Parsing humantime")]
24+
HumanTime(#[from] humantime::DurationError),
25+
#[error("Out of Range")]
26+
OutOfRange(#[from] chrono::OutOfRangeError),
27+
#[error("Error parsing time: {0}")]
28+
Chrono(#[from] chrono::ParseError),
29+
#[error("Start time cannot be greater than the end time")]
30+
StartTimeAfterEndTime,
31+
}
32+
33+
/// Represents a range of time with a start and end point.
34+
#[derive(Debug)]
35+
pub struct TimeRange {
36+
pub start: DateTime<Utc>,
37+
pub end: DateTime<Utc>,
38+
}
39+
40+
impl TimeRange {
41+
/// Parses human-readable time strings into a `TimeRange` object.
42+
///
43+
/// # Arguments
44+
/// - `start_time`: A string representing the start of the time range. This can either be
45+
/// a human-readable duration (e.g., `"2 hours"`) or an RFC 3339 formatted timestamp.
46+
/// - `end_time`: A string representing the end of the time range. This can either be
47+
/// the keyword `"now"` (to represent the current time) or an RFC 3339 formatted timestamp.
48+
///
49+
/// # Errors
50+
/// - `TimeParseError::StartTimeAfterEndTime`: Returned when the parsed start time is later than the end time.
51+
/// - Any error that might occur during parsing of durations or RFC 3339 timestamps.
52+
///
53+
/// # Example
54+
/// ```ignore
55+
/// let range = TimeRange::parse_human_time("2 hours", "now");
56+
/// let range = TimeRange::parse_human_time("2023-01-01T12:00:00Z", "2023-01-01T15:00:00Z");
57+
/// ```
58+
pub fn parse_human_time(start_time: &str, end_time: &str) -> Result<Self, TimeParseError> {
59+
let start: DateTime<Utc>;
60+
let end: DateTime<Utc>;
61+
62+
if end_time == "now" {
63+
end = Utc::now();
64+
start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?;
65+
} else {
66+
start = DateTime::parse_from_rfc3339(start_time)?.into();
67+
end = DateTime::parse_from_rfc3339(end_time)?.into();
68+
};
69+
70+
if start > end {
71+
return Err(TimeParseError::StartTimeAfterEndTime);
72+
}
73+
74+
Ok(Self { start, end })
75+
}
76+
}
77+
78+
#[cfg(test)]
79+
mod tests {
80+
use super::*;
81+
use chrono::{Duration, SecondsFormat, Utc};
82+
83+
#[test]
84+
fn valid_rfc3339_timestamps() {
85+
let start_time = "2023-01-01T12:00:00Z";
86+
let end_time = "2023-01-01T13:00:00Z";
87+
88+
let result = TimeRange::parse_human_time(start_time, end_time);
89+
let parsed = result.unwrap();
90+
91+
assert_eq!(
92+
parsed.start.to_rfc3339_opts(SecondsFormat::Secs, true),
93+
start_time
94+
);
95+
assert_eq!(
96+
parsed.end.to_rfc3339_opts(SecondsFormat::Secs, true),
97+
end_time
98+
);
99+
}
100+
101+
#[test]
102+
fn end_time_now_with_valid_duration() {
103+
let start_time = "1h";
104+
let end_time = "now";
105+
106+
let result = TimeRange::parse_human_time(start_time, end_time);
107+
let parsed = result.unwrap();
108+
109+
assert!(parsed.end <= Utc::now());
110+
assert_eq!(parsed.end - parsed.start, Duration::hours(1));
111+
112+
let start_time = "30 minutes";
113+
let end_time = "now";
114+
115+
let result = TimeRange::parse_human_time(start_time, end_time);
116+
let parsed = result.unwrap();
117+
118+
assert!(parsed.end <= Utc::now());
119+
assert_eq!(parsed.end - parsed.start, Duration::minutes(30));
120+
}
121+
122+
#[test]
123+
fn start_time_after_end_time() {
124+
let start_time = "2023-01-01T14:00:00Z";
125+
let end_time = "2023-01-01T13:00:00Z";
126+
127+
let result = TimeRange::parse_human_time(start_time, end_time);
128+
assert!(matches!(result, Err(TimeParseError::StartTimeAfterEndTime)));
129+
}
130+
131+
#[test]
132+
fn invalid_start_time_format() {
133+
let start_time = "not-a-valid-time";
134+
let end_time = "2023-01-01T13:00:00Z";
135+
136+
let result = TimeRange::parse_human_time(start_time, end_time);
137+
assert!(matches!(result, Err(TimeParseError::Chrono(_))));
138+
}
139+
140+
#[test]
141+
fn invalid_end_time_format() {
142+
let start_time = "2023-01-01T12:00:00Z";
143+
let end_time = "not-a-valid-time";
144+
145+
let result = TimeRange::parse_human_time(start_time, end_time);
146+
assert!(matches!(result, Err(TimeParseError::Chrono(_))));
147+
}
148+
149+
#[test]
150+
fn invalid_duration_with_end_time_now() {
151+
let start_time = "not-a-duration";
152+
let end_time = "now";
153+
154+
let result = TimeRange::parse_human_time(start_time, end_time);
155+
assert!(matches!(result, Err(TimeParseError::HumanTime(_))));
156+
}
157+
}

0 commit comments

Comments
 (0)