Skip to content

Commit 2a2d64f

Browse files
fix: saved filters load issue on server restart (#957)
issue: server checks the version of all filters and migrates if version is v1 if v2 (current version), it again calculates hash of the user_id from json and puts the json back in storage and load to memory fix: if v2, load to memory
1 parent f488970 commit 2a2d64f

File tree

6 files changed

+164
-126
lines changed

6 files changed

+164
-126
lines changed

server/src/storage/azure_blob.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME;
4141
use crate::metrics::storage::StorageMetrics;
4242
use object_store::limit::LimitStore;
4343
use object_store::path::Path as StorePath;
44-
use std::collections::BTreeMap;
44+
use std::collections::{BTreeMap, HashMap};
4545
use std::sync::Arc;
4646
use std::time::{Duration, Instant};
4747

@@ -650,8 +650,10 @@ impl ObjectStorage for BlobStore {
650650
.collect::<Vec<_>>())
651651
}
652652

653-
async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
654-
let mut dashboards = vec![];
653+
async fn get_all_dashboards(
654+
&self,
655+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
656+
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
655657
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
656658
let resp = self
657659
.client
@@ -677,13 +679,19 @@ impl ObjectStorage for BlobStore {
677679
Box::new(|file_name| file_name.ends_with(".json")),
678680
)
679681
.await?;
680-
dashboards.extend(dashboard_bytes);
682+
683+
dashboards
684+
.entry(dashboards_path)
685+
.or_default()
686+
.extend(dashboard_bytes);
681687
}
682688
Ok(dashboards)
683689
}
684690

685-
async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
686-
let mut filters = vec![];
691+
async fn get_all_saved_filters(
692+
&self,
693+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
694+
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
687695
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
688696
let resp = self
689697
.client
@@ -720,7 +728,10 @@ impl ObjectStorage for BlobStore {
720728
Box::new(|file_name| file_name.ends_with(".json")),
721729
)
722730
.await?;
723-
filters.extend(filter_bytes);
731+
filters
732+
.entry(filters_path)
733+
.or_default()
734+
.extend(filter_bytes);
724735
}
725736
}
726737
Ok(filters)

server/src/storage/localfs.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use std::{
20-
collections::BTreeMap,
20+
collections::{BTreeMap, HashMap},
2121
path::{Path, PathBuf},
2222
sync::Arc,
2323
time::Instant,
@@ -351,8 +351,10 @@ impl ObjectStorage for LocalFS {
351351
Ok(dirs)
352352
}
353353

354-
async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
355-
let mut dashboards = vec![];
354+
async fn get_all_dashboards(
355+
&self,
356+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
357+
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
356358
let users_root_path = self.root.join(USERS_ROOT_DIR);
357359
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
358360
let users: Vec<DirEntry> = directories.try_collect().await?;
@@ -364,15 +366,25 @@ impl ObjectStorage for LocalFS {
364366
let directories = ReadDirStream::new(fs::read_dir(&dashboards_path).await?);
365367
let dashboards_files: Vec<DirEntry> = directories.try_collect().await?;
366368
for dashboard in dashboards_files {
367-
let file = fs::read(dashboard.path()).await?;
368-
dashboards.push(file.into());
369+
let dashboard_absolute_path = dashboard.path();
370+
let file = fs::read(dashboard_absolute_path.clone()).await?;
371+
let dashboard_relative_path = dashboard_absolute_path
372+
.strip_prefix(self.root.as_path())
373+
.unwrap();
374+
375+
dashboards
376+
.entry(RelativePathBuf::from_path(dashboard_relative_path).unwrap())
377+
.or_default()
378+
.push(file.into());
369379
}
370380
}
371381
Ok(dashboards)
372382
}
373383

374-
async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
375-
let mut filters = vec![];
384+
async fn get_all_saved_filters(
385+
&self,
386+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
387+
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
376388
let users_root_path = self.root.join(USERS_ROOT_DIR);
377389
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
378390
let users: Vec<DirEntry> = directories.try_collect().await?;
@@ -394,8 +406,16 @@ impl ObjectStorage for LocalFS {
394406
let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?);
395407
let filters_files: Vec<DirEntry> = directories.try_collect().await?;
396408
for filter in filters_files {
397-
let file = fs::read(filter.path()).await?;
398-
filters.push(file.into());
409+
let filter_absolute_path = filter.path();
410+
let file = fs::read(filter_absolute_path.clone()).await?;
411+
let filter_relative_path = filter_absolute_path
412+
.strip_prefix(self.root.as_path())
413+
.unwrap();
414+
415+
filters
416+
.entry(RelativePathBuf::from_path(filter_relative_path).unwrap())
417+
.or_default()
418+
.push(file.into());
399419
}
400420
}
401421
}

server/src/storage/object_storage.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,12 @@ pub trait ObjectStorage: Sync + 'static {
8585
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
8686
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
8787
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
88-
async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
89-
async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
88+
async fn get_all_saved_filters(
89+
&self,
90+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
91+
async fn get_all_dashboards(
92+
&self,
93+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
9094
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
9195
async fn list_manifest_files(
9296
&self,

server/src/storage/s3.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ use std::path::Path as StdPath;
3737
use std::sync::Arc;
3838
use std::time::{Duration, Instant};
3939

40-
use crate::handlers::http::users::USERS_ROOT_DIR;
41-
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
42-
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
43-
4440
use super::metrics_layer::MetricLayer;
4541
use super::object_storage::parseable_json_path;
4642
use super::{
4743
ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
4844
};
45+
use crate::handlers::http::users::USERS_ROOT_DIR;
46+
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
47+
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
48+
use std::collections::HashMap;
4949

5050
#[allow(dead_code)]
5151
// in bytes
@@ -689,8 +689,10 @@ impl ObjectStorage for S3 {
689689
.collect::<Vec<_>>())
690690
}
691691

692-
async fn get_all_dashboards(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
693-
let mut dashboards = vec![];
692+
async fn get_all_dashboards(
693+
&self,
694+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
695+
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
694696
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
695697
let resp = self
696698
.client
@@ -716,13 +718,19 @@ impl ObjectStorage for S3 {
716718
Box::new(|file_name| file_name.ends_with(".json")),
717719
)
718720
.await?;
719-
dashboards.extend(dashboard_bytes);
721+
722+
dashboards
723+
.entry(dashboards_path)
724+
.or_default()
725+
.extend(dashboard_bytes);
720726
}
721727
Ok(dashboards)
722728
}
723729

724-
async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
725-
let mut filters = vec![];
730+
async fn get_all_saved_filters(
731+
&self,
732+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
733+
let mut filters: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
726734
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
727735
let resp = self
728736
.client
@@ -759,7 +767,10 @@ impl ObjectStorage for S3 {
759767
Box::new(|file_name| file_name.ends_with(".json")),
760768
)
761769
.await?;
762-
filters.extend(filter_bytes);
770+
filters
771+
.entry(filters_path)
772+
.or_default()
773+
.extend(filter_bytes);
763774
}
764775
}
765776
Ok(filters)

server/src/users/dashboards.rs

Lines changed: 51 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -114,64 +114,61 @@ impl Dashboards {
114114
pub async fn load(&self) -> anyhow::Result<()> {
115115
let mut this = vec![];
116116
let store = CONFIG.storage().get_object_store();
117-
let dashboards = store.get_all_dashboards().await.unwrap_or_default();
118-
for dashboard in dashboards {
119-
if dashboard.is_empty() {
120-
continue;
121-
}
122-
let mut dashboard_value = serde_json::from_slice::<serde_json::Value>(&dashboard)?;
123-
if let Some(meta) = dashboard_value.clone().as_object() {
124-
let version = meta.get("version").and_then(|version| version.as_str());
125-
let dashboard_id = meta
126-
.get("dashboard_id")
127-
.and_then(|dashboard_id| dashboard_id.as_str());
128-
match version {
129-
Some("v1") => {
130-
dashboard_value = migrate_v1_v2(dashboard_value);
131-
dashboard_value = migrate_v2_v3(dashboard_value);
132-
let user_id = dashboard_value
133-
.as_object()
134-
.unwrap()
135-
.get("user_id")
136-
.and_then(|user_id| user_id.as_str());
137-
let path = dashboard_path(
138-
user_id.unwrap(),
139-
&format!("{}.json", dashboard_id.unwrap()),
140-
);
141-
let dashboard_bytes = to_bytes(&dashboard_value);
142-
store.put_object(&path, dashboard_bytes.clone()).await?;
143-
if let Ok(dashboard) = serde_json::from_slice::<Dashboard>(&dashboard_bytes)
144-
{
145-
this.retain(|d: &Dashboard| d.dashboard_id != dashboard.dashboard_id);
146-
this.push(dashboard);
147-
}
148-
}
149-
Some("v2") => {
150-
dashboard_value = migrate_v2_v3(dashboard_value);
151-
let user_id = dashboard_value
152-
.as_object()
153-
.unwrap()
154-
.get("user_id")
155-
.and_then(|user_id| user_id.as_str());
156-
let path = dashboard_path(
157-
user_id.unwrap(),
158-
&format!("{}.json", dashboard_id.unwrap()),
159-
);
160-
let dashboard_bytes = to_bytes(&dashboard_value);
161-
store.put_object(&path, dashboard_bytes.clone()).await?;
162-
if let Ok(dashboard) = serde_json::from_slice::<Dashboard>(&dashboard_bytes)
163-
{
164-
this.retain(|d| d.dashboard_id != dashboard.dashboard_id);
165-
this.push(dashboard);
117+
let all_dashboards = store.get_all_dashboards().await.unwrap_or_default();
118+
for (dashboard_relative_path, dashboards) in all_dashboards {
119+
for dashboard in dashboards {
120+
if dashboard.is_empty() {
121+
continue;
122+
}
123+
let mut dashboard_value = serde_json::from_slice::<serde_json::Value>(&dashboard)?;
124+
if let Some(meta) = dashboard_value.clone().as_object() {
125+
let version = meta.get("version").and_then(|version| version.as_str());
126+
let dashboard_id = meta
127+
.get("dashboard_id")
128+
.and_then(|dashboard_id| dashboard_id.as_str());
129+
match version {
130+
Some("v1") => {
131+
//delete older version of the dashboard
132+
store.delete_object(&dashboard_relative_path).await?;
133+
134+
dashboard_value = migrate_v1_v2(dashboard_value);
135+
dashboard_value = migrate_v2_v3(dashboard_value);
136+
let user_id = dashboard_value
137+
.as_object()
138+
.unwrap()
139+
.get("user_id")
140+
.and_then(|user_id| user_id.as_str());
141+
let path = dashboard_path(
142+
user_id.unwrap(),
143+
&format!("{}.json", dashboard_id.unwrap()),
144+
);
145+
let dashboard_bytes = to_bytes(&dashboard_value);
146+
store.put_object(&path, dashboard_bytes.clone()).await?;
166147
}
167-
}
168-
_ => {
169-
if let Ok(dashboard) = serde_json::from_slice::<Dashboard>(&dashboard) {
170-
this.retain(|d| d.dashboard_id != dashboard.dashboard_id);
171-
this.push(dashboard);
148+
Some("v2") => {
149+
//delete older version of the dashboard
150+
store.delete_object(&dashboard_relative_path).await?;
151+
152+
dashboard_value = migrate_v2_v3(dashboard_value);
153+
let user_id = dashboard_value
154+
.as_object()
155+
.unwrap()
156+
.get("user_id")
157+
.and_then(|user_id| user_id.as_str());
158+
let path = dashboard_path(
159+
user_id.unwrap(),
160+
&format!("{}.json", dashboard_id.unwrap()),
161+
);
162+
let dashboard_bytes = to_bytes(&dashboard_value);
163+
store.put_object(&path, dashboard_bytes.clone()).await?;
172164
}
165+
_ => {}
173166
}
174167
}
168+
if let Ok(dashboard) = serde_json::from_value::<Dashboard>(dashboard_value) {
169+
this.retain(|d: &Dashboard| d.dashboard_id != dashboard.dashboard_id);
170+
this.push(dashboard);
171+
}
175172
}
176173
}
177174

0 commit comments

Comments
 (0)