Skip to content

Commit

Permalink
fix: debug tokio schedule problem (openobserve#2594)
Browse files Browse the repository at this point in the history
  • Loading branch information
hengfeiyang authored Jan 27, 2024
1 parent 9124635 commit 4a623e6
Show file tree
Hide file tree
Showing 14 changed files with 30 additions and 73 deletions.
10 changes: 3 additions & 7 deletions src/common/infra/file_list/dynamo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::common::{
errors::{Error, Result},
},
meta::stream::{PartitionTimeLevel, StreamStats},
utils::time::BASE_TIME,
};

pub struct DynamoFileList {
Expand Down Expand Up @@ -275,12 +274,9 @@ impl super::FileList for DynamoFileList {
time_level: PartitionTimeLevel,
time_range: (i64, i64),
) -> Result<Vec<(String, FileMeta)>> {
let (mut time_start, mut time_end) = time_range;
if time_start == 0 {
time_start = BASE_TIME.timestamp_micros();
}
if time_end == 0 {
time_end = Utc::now().timestamp_micros();
let (time_start, time_end) = time_range;
if time_start == 0 && time_end == 0 {
return Ok(Vec::new());
}

let t1: DateTime<Utc> = Utc.timestamp_nanos(time_start * 1000);
Expand Down
5 changes: 4 additions & 1 deletion src/common/infra/file_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use config::{
use once_cell::sync::Lazy;

use crate::common::{
infra::errors::Result,
infra::errors::{Error, Result},
meta::{
meta_store::MetaStore,
stream::{PartitionTimeLevel, StreamStats},
Expand Down Expand Up @@ -171,6 +171,9 @@ pub async fn query(
time_level: PartitionTimeLevel,
time_range: (i64, i64),
) -> Result<Vec<(String, FileMeta)>> {
if time_range.0 > time_range.1 || time_range.0 == 0 || time_range.1 == 0 {
return Err(Error::Message("[file_list] invalid time range".to_string()));
}
CLIENT
.query(org_id, stream_type, stream_name, time_level, time_range)
.await
Expand Down
7 changes: 3 additions & 4 deletions src/common/infra/file_list/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use ahash::HashMap;
use async_trait::async_trait;
use chrono::Utc;
use config::{
meta::stream::{FileKey, FileMeta, StreamType},
utils::parquet::parse_file_key_columns,
Expand Down Expand Up @@ -344,9 +343,9 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
_time_level: PartitionTimeLevel,
time_range: (i64, i64),
) -> Result<Vec<(String, FileMeta)>> {
let (time_start, mut time_end) = time_range;
if time_end == 0 {
time_end = Utc::now().timestamp_micros();
let (time_start, time_end) = time_range;
if time_start == 0 && time_end == 0 {
return Ok(Vec::new());
}

let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
Expand Down
7 changes: 3 additions & 4 deletions src/common/infra/file_list/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use ahash::HashMap;
use async_trait::async_trait;
use chrono::Utc;
use config::{
meta::stream::{FileKey, FileMeta, StreamType},
utils::parquet::parse_file_key_columns,
Expand Down Expand Up @@ -349,9 +348,9 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
_time_level: PartitionTimeLevel,
time_range: (i64, i64),
) -> Result<Vec<(String, FileMeta)>> {
let (time_start, mut time_end) = time_range;
if time_end == 0 {
time_end = Utc::now().timestamp_micros();
let (time_start, time_end) = time_range;
if time_start == 0 && time_end == 0 {
return Ok(Vec::new());
}

let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
Expand Down
7 changes: 3 additions & 4 deletions src/common/infra/file_list/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use ahash::HashMap;
use async_trait::async_trait;
use chrono::Utc;
use config::{
meta::stream::{FileKey, FileMeta, StreamType},
utils::parquet::parse_file_key_columns,
Expand Down Expand Up @@ -354,9 +353,9 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
_time_level: PartitionTimeLevel,
time_range: (i64, i64),
) -> Result<Vec<(String, FileMeta)>> {
let (time_start, mut time_end) = time_range;
if time_end == 0 {
time_end = Utc::now().timestamp_micros();
let (time_start, time_end) = time_range;
if time_start == 0 && time_end == 0 {
return Ok(Vec::new());
}

let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
Expand Down
6 changes: 1 addition & 5 deletions src/common/meta/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,17 +345,13 @@ impl<'a> TryFrom<Timerange<'a>> for Option<(i64, i64)> {
0
}
};
let mut time_max = {
let time_max = {
if !time_max.is_empty() {
time_max.iter().max().unwrap().to_owned()
} else {
0
}
};
if time_min > 0 && time_max == 0 {
time_max = chrono::Utc::now().timestamp_micros();
}

Ok(Some((time_min, time_max)))
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/handler/http/request/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,11 +677,11 @@ async fn values_v1(
if start_time == 0 {
return Ok(MetaHttpResponse::bad_request("start_time is empty"));
}
let mut end_time = query
let end_time = query
.get("end_time")
.map_or(0, |v| v.parse::<i64>().unwrap_or(0));
if end_time == 0 {
end_time = chrono::Utc::now().timestamp_micros();
return Ok(MetaHttpResponse::bad_request("end_time is empty"));
}

let timeout = query
Expand Down Expand Up @@ -861,11 +861,11 @@ async fn values_v2(
if start_time == 0 {
return Ok(MetaHttpResponse::bad_request("start_time is empty"));
}
let mut end_time = query
let end_time = query
.get("end_time")
.map_or(0, |v| v.parse::<i64>().unwrap_or(0));
if end_time == 0 {
end_time = chrono::Utc::now().timestamp_micros();
return Ok(MetaHttpResponse::bad_request("end_time is empty"));
}

let timeout = query
Expand Down Expand Up @@ -981,7 +981,7 @@ async fn values_v2(
Ok(HttpResponse::Ok().json(resp))
}

/// SearchStreamData
/// SearchStreamPartition
#[utoipa::path(
context_path = "/api",
tag = "Search",
Expand Down
2 changes: 1 addition & 1 deletion src/handler/http/request/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub async fn get_latest_traces(
.get("end_time")
.map_or(0, |v| v.parse::<i64>().unwrap_or(0));
if end_time == 0 {
end_time = chrono::Utc::now().timestamp_micros();
return Ok(MetaHttpResponse::bad_request("end_time is empty"));
}

let timeout = query
Expand Down
3 changes: 1 addition & 2 deletions src/ingester/src/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ pub(crate) async fn persist() -> Result<()> {
let mut tasks = Vec::with_capacity(paths.len());
let semaphore = Arc::new(Semaphore::new(CONFIG.limit.file_move_thread_num));
for path in paths {
let semaphore = semaphore.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
let task: task::JoinHandle<Result<Option<(PathBuf, i64, usize)>>> =
task::spawn(async move {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let r = IMMUTABLES.read().await;
let Some(immutable) = r.get(&path) else {
drop(permit);
Expand Down
7 changes: 2 additions & 5 deletions src/service/db/file_list/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,10 @@ async fn process_file(file: &str) -> Result<Vec<FileKey>, anyhow::Error> {
Ok(records)
}

pub async fn cache_time_range(time_min: i64, mut time_max: i64) -> Result<(), anyhow::Error> {
if time_min == 0 {
pub async fn cache_time_range(time_min: i64, time_max: i64) -> Result<(), anyhow::Error> {
if time_min == 0 || time_max == 0 {
return Ok(());
}
if time_max == 0 {
time_max = Utc::now().timestamp_micros();
}
let mut cur_time = time_min;
while cur_time <= time_max {
let offset_time: DateTime<Utc> = Utc.timestamp_nanos(cur_time * 1000);
Expand Down
2 changes: 1 addition & 1 deletion src/service/search/datafusion/storage/tmpfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl ObjectStore for Tmpfs {
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
log::info!("head: {}", location);
// log::info!("head: {}", location);
let last_modified = Utc::now();
let bytes = self.get_bytes(location).await?;
Ok(ObjectMeta {
Expand Down
24 changes: 2 additions & 22 deletions src/service/search/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::{
utils::{flatten, json, str::find},
},
handler::grpc::cluster_rpc,
service::{db, file_list, format_partition_key, stream},
service::{file_list, format_partition_key, stream},
};

pub(crate) mod datafusion;
Expand Down Expand Up @@ -153,26 +153,6 @@ pub async fn search_partition(
Ok(resp)
}

async fn get_times(sql: &sql::Sql, stream_type: StreamType) -> (i64, i64) {
let (mut time_min, mut time_max) = sql.meta.time_range.unwrap();
if time_min == 0 {
// get created_at from schema
let schema = db::schema::get(&sql.org_id, &sql.stream_name, stream_type)
.await
.unwrap_or_else(|_| Schema::empty());
if schema != Schema::empty() {
time_min = schema
.metadata
.get("created_at")
.map_or(0, |v| v.parse::<i64>().unwrap_or(0));
}
}
if time_max == 0 {
time_max = chrono::Utc::now().timestamp_micros();
}
(time_min, time_max)
}

#[tracing::instrument(skip(sql), fields(session_id = ?_session_id, org_id = sql.org_id, stream_name = sql.stream_name))]
async fn get_file_list(
_session_id: &str,
Expand All @@ -185,7 +165,7 @@ async fn get_file_list(
.unwrap_or_default()
.len()
<= 1;
let (time_min, time_max) = get_times(sql, stream_type).await;
let (time_min, time_max) = sql.meta.time_range.unwrap();
let file_list = match file_list::query(
&sql.org_id,
&sql.stream_name,
Expand Down
5 changes: 1 addition & 4 deletions src/service/search/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl Display for SqlMode {
impl Sql {
pub async fn new(req: &cluster_rpc::SearchRequest) -> Result<Sql, Error> {
let req_query = req.query.as_ref().unwrap();
let mut req_time_range = (req_query.start_time, req_query.end_time);
let req_time_range = (req_query.start_time, req_query.end_time);
let org_id = req.org_id.clone();
let stream_type = StreamType::from(req.stream_type.as_str());

Expand Down Expand Up @@ -255,9 +255,6 @@ impl Sql {
// Hack time_range for sql
let meta_time_range_is_empty = meta.time_range.is_none() || meta.time_range == Some((0, 0));
if meta_time_range_is_empty && (req_time_range.0 > 0 || req_time_range.1 > 0) {
if req_time_range.1 == 0 {
req_time_range.1 = chrono::Utc::now().timestamp_micros();
}
meta.time_range = Some(req_time_range); // update meta
};
if let Some(time_range) = meta.time_range {
Expand Down
8 changes: 0 additions & 8 deletions src/service/usage/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,6 @@ pub async fn publish_stats() -> Result<(), anyhow::Error> {
}
}
}
// set cache expiry
// let expiry_ts = chrono::Utc::now()
// + chrono::Duration::minutes(
// (CONFIG.limit.calculate_stats_interval - 2)
// .try_into()
// .unwrap(),
// );
// set_cache_expiry(expiry_ts.timestamp_micros()).await;
Ok(())
}

Expand Down

0 comments on commit 4a623e6

Please sign in to comment.