Skip to content

Commit d4dcd0e

Browse files
fix: multiple fixes for distributed deployment mode (#786)
* ensure to subtract the size of the arrow file removed from staging fix for GET /cache * ensure to return error in case global cache is not set * query result fix for migration from standalone to distributed deployments * fix for retention for migration from standalone to distributed. Query node should cleanup its snapshot upon retention * fix for edge case when ingestor was down/unreachable at the timeout of retention activity and ingestor's stream.json did not get updated now when ingestion happens from ingestor, local to storage sync fails with error - "Manifest found in snapshot but not in object-storage". Change done is to create the manifest but not update the snapshot because snapshot already has the manifest entry
1 parent b251e05 commit d4dcd0e

File tree

4 files changed

+100
-84
lines changed

4 files changed

+100
-84
lines changed

server/src/catalog.rs

Lines changed: 82 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@ use std::io::Error as IOError;
3434
pub mod column;
3535
pub mod manifest;
3636
pub mod snapshot;
37-
37+
use crate::storage::ObjectStoreFormat;
3838
pub use manifest::create_from_parquet_file;
39-
4039
pub trait Snapshot {
4140
fn manifests(&self, time_predicates: &[PartialTimeFilter]) -> Vec<ManifestItem>;
4241
}
@@ -97,7 +96,6 @@ pub async fn update_snapshot(
9796
// get current snapshot
9897
let mut meta = storage.get_object_store_format(stream_name).await?;
9998
let manifests = &mut meta.snapshot.manifest_list;
100-
10199
let (lower_bound, _) = get_file_bounds(&change);
102100
let pos = manifests.iter().position(|item| {
103101
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
@@ -120,82 +118,89 @@ pub async fn update_snapshot(
120118
}
121119
}
122120
if ch {
123-
let Some(mut manifest) = storage.get_manifest(&path).await? else {
124-
return Err(ObjectStorageError::UnhandledError(
125-
"Manifest found in snapshot but not in object-storage"
126-
.to_string()
127-
.into(),
128-
));
129-
};
130-
manifest.apply_change(change);
131-
storage.put_manifest(&path, manifest).await?;
132-
} else {
133-
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
134-
let upper_bound = lower_bound
135-
.date_naive()
136-
.and_time(
137-
NaiveTime::from_num_seconds_from_midnight_opt(
138-
23 * 3600 + 59 * 60 + 59,
139-
999_999_999,
140-
)
141-
.ok_or(IOError::new(
142-
ErrorKind::Other,
143-
"Failed to create upper bound for manifest",
144-
))
145-
.map_err(ObjectStorageError::IoError)?,
121+
if let Some(mut manifest) = storage.get_manifest(&path).await? {
122+
manifest.apply_change(change);
123+
storage.put_manifest(&path, manifest).await?;
124+
} else {
125+
//instead of returning an error, create a new manifest (otherwise local to storage sync fails)
126+
//but don't update the snapshot
127+
create_manifest(
128+
lower_bound,
129+
change,
130+
storage.clone(),
131+
stream_name,
132+
false,
133+
meta,
146134
)
147-
.and_utc();
148-
149-
let manifest = Manifest {
150-
files: vec![change],
151-
..Manifest::default()
152-
};
153-
154-
let mainfest_file_name = manifest_path("").to_string();
155-
let path =
156-
partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
157-
storage
158-
.put_object(&path, serde_json::to_vec(&manifest)?.into())
159135
.await?;
160-
let path = storage.absolute_url(&path);
161-
let new_snapshot_entriy = snapshot::ManifestItem {
162-
manifest_path: path.to_string(),
163-
time_lower_bound: lower_bound,
164-
time_upper_bound: upper_bound,
165-
};
166-
manifests.push(new_snapshot_entriy);
167-
storage.put_snapshot(stream_name, meta.snapshot).await?;
136+
}
137+
} else {
138+
create_manifest(
139+
lower_bound,
140+
change,
141+
storage.clone(),
142+
stream_name,
143+
true,
144+
meta,
145+
)
146+
.await?;
168147
}
169148
} else {
170-
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
171-
let upper_bound = lower_bound
172-
.date_naive()
173-
.and_time(
174-
NaiveTime::from_num_seconds_from_midnight_opt(
175-
23 * 3600 + 59 * 60 + 59,
176-
999_999_999,
177-
)
178-
.unwrap(),
179-
)
180-
.and_utc();
149+
create_manifest(
150+
lower_bound,
151+
change,
152+
storage.clone(),
153+
stream_name,
154+
true,
155+
meta,
156+
)
157+
.await?;
158+
}
181159

182-
let manifest = Manifest {
183-
files: vec![change],
184-
..Manifest::default()
185-
};
160+
Ok(())
161+
}
186162

187-
let mainfest_file_name = manifest_path("").to_string();
188-
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
189-
storage
190-
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
191-
.await?;
163+
async fn create_manifest(
164+
lower_bound: DateTime<Utc>,
165+
change: manifest::File,
166+
storage: Arc<dyn ObjectStorage + Send>,
167+
stream_name: &str,
168+
update_snapshot: bool,
169+
mut meta: ObjectStoreFormat,
170+
) -> Result<(), ObjectStorageError> {
171+
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
172+
let upper_bound = lower_bound
173+
.date_naive()
174+
.and_time(
175+
NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999)
176+
.ok_or(IOError::new(
177+
ErrorKind::Other,
178+
"Failed to create upper bound for manifest",
179+
))
180+
.map_err(ObjectStorageError::IoError)?,
181+
)
182+
.and_utc();
183+
184+
let manifest = Manifest {
185+
files: vec![change],
186+
..Manifest::default()
187+
};
188+
189+
let mainfest_file_name = manifest_path("").to_string();
190+
let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name);
191+
storage
192+
.put_object(&path, serde_json::to_vec(&manifest)?.into())
193+
.await?;
194+
if update_snapshot {
195+
let mut manifests = meta.snapshot.manifest_list;
192196
let path = storage.absolute_url(&path);
193-
let new_snapshot_entriy = snapshot::ManifestItem {
197+
let new_snapshot_entry = snapshot::ManifestItem {
194198
manifest_path: path.to_string(),
195199
time_lower_bound: lower_bound,
196200
time_upper_bound: upper_bound,
197201
};
198-
manifests.push(new_snapshot_entriy);
202+
manifests.push(new_snapshot_entry);
203+
meta.snapshot.manifest_list = manifests;
199204
storage.put_snapshot(stream_name, meta.snapshot).await?;
200205
}
201206

@@ -207,21 +212,17 @@ pub async fn remove_manifest_from_snapshot(
207212
stream_name: &str,
208213
dates: Vec<String>,
209214
) -> Result<Option<String>, ObjectStorageError> {
215+
if !dates.is_empty() {
216+
// get current snapshot
217+
let mut meta = storage.get_object_store_format(stream_name).await?;
218+
let manifests = &mut meta.snapshot.manifest_list;
219+
// Filter out items whose manifest_path contains any of the dates_to_delete
220+
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
221+
storage.put_snapshot(stream_name, meta.snapshot).await?;
222+
}
210223
match CONFIG.parseable.mode {
211224
Mode::All | Mode::Ingest => {
212-
if !dates.is_empty() {
213-
// get current snapshot
214-
let mut meta = storage.get_object_store_format(stream_name).await?;
215-
let manifests = &mut meta.snapshot.manifest_list;
216-
// Filter out items whose manifest_path contains any of the dates_to_delete
217-
manifests
218-
.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
219-
storage.put_snapshot(stream_name, meta.snapshot).await?;
220-
}
221-
222-
let first_event_at = get_first_event(storage.clone(), stream_name, Vec::new()).await?;
223-
224-
Ok(first_event_at)
225+
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
225226
}
226227
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
227228
}

server/src/handlers/http/logstream.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,16 @@ pub async fn put_retention(
338338

339339
pub async fn get_cache_enabled(req: HttpRequest) -> Result<impl Responder, StreamError> {
340340
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
341+
342+
match CONFIG.parseable.mode {
343+
Mode::Ingest | Mode::All => {
344+
if CONFIG.parseable.local_cache_path.is_none() {
345+
return Err(StreamError::CacheNotEnabled(stream_name));
346+
}
347+
}
348+
_ => {}
349+
}
350+
341351
let cache_enabled = STREAM_INFO.cache_enabled(&stream_name)?;
342352
Ok((web::Json(cache_enabled), StatusCode::OK))
343353
}

server/src/query/stream_schema_provider.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,10 +328,9 @@ impl TableProvider for StandardTableProvider {
328328
let obs = glob_storage
329329
.get_objects(
330330
Some(&path),
331-
Box::new(|file_name| file_name.starts_with(".ingestor")),
331+
Box::new(|file_name| file_name.ends_with("manifest.json")),
332332
)
333333
.await;
334-
335334
if let Ok(obs) = obs {
336335
for ob in obs {
337336
if let Ok(object_store_format) =

server/src/storage/staging.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,16 @@ pub fn convert_disk_files_to_parquet(
247247
writer.close()?;
248248

249249
for file in files {
250-
if fs::remove_file(file).is_err() {
250+
let file_size = file.metadata().unwrap().len();
251+
let file_type = file.extension().unwrap().to_str().unwrap();
252+
253+
if fs::remove_file(file.clone()).is_err() {
251254
log::error!("Failed to delete file. Unstable state");
252255
process::abort()
253256
}
257+
metrics::STORAGE_SIZE
258+
.with_label_values(&["staging", stream, file_type])
259+
.sub(file_size as i64);
254260
}
255261
}
256262

0 commit comments

Comments
 (0)