Skip to content

Commit

Permalink
feat: auto-migrate old index metadata (#3428)
Browse files Browse the repository at this point in the history
Follow up to #3377. That PR made
`index_statistics()` error by default. This ended up being a footgun for
some users who rely heavily on that method. So instead of forcing the
user to do the migration themself, we do it for them. It can be disabled
using an environment variable.
  • Loading branch information
wjones127 authored Feb 1, 2025
1 parent d34fa95 commit c73d717
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 23 deletions.
1 change: 1 addition & 0 deletions rust/lance-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod deletion;
pub mod futures;
pub mod hash;
pub mod mask;
pub mod parse;
pub mod path;
pub mod testing;
pub mod tokio;
Expand Down
11 changes: 11 additions & 0 deletions rust/lance-core/src/utils/parse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

/// Parse a string into a boolean value.
pub fn str_is_truthy(val: &str) -> bool {
val.eq_ignore_ascii_case("1")
| val.eq_ignore_ascii_case("true")
| val.eq_ignore_ascii_case("on")
| val.eq_ignore_ascii_case("yes")
| val.eq_ignore_ascii_case("y")
}
9 changes: 1 addition & 8 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use deepsize::DeepSizeOf;
use futures::{future, stream::BoxStream, StreamExt, TryStreamExt};
use lance_core::utils::parse::str_is_truthy;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use object_store::aws::{
AmazonS3ConfigKey, AwsCredential as ObjectStoreAwsCredential, AwsCredentialProvider,
Expand Down Expand Up @@ -1039,14 +1040,6 @@ fn infer_block_size(scheme: &str) -> usize {
}
}

fn str_is_truthy(val: &str) -> bool {
val.eq_ignore_ascii_case("1")
| val.eq_ignore_ascii_case("true")
| val.eq_ignore_ascii_case("on")
| val.eq_ignore_ascii_case("yes")
| val.eq_ignore_ascii_case("y")
}

/// Attempt to create a Url from given table location.
///
/// The location could be:
Expand Down
13 changes: 6 additions & 7 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4395,10 +4395,13 @@ mod tests {
let validate_res = dataset.validate().await;
assert!(validate_res.is_err());
assert_eq!(dataset.load_indices().await.unwrap()[0].name, "vector_idx");
assert!(dataset.index_statistics("vector_idx").await.is_err());

// Force a migration
dataset.delete("false").await.unwrap();
// Calling index statistics will force a migration
let stats = dataset.index_statistics("vector_idx").await.unwrap();
let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
assert_eq!(stats["num_indexed_fragments"], 2);

dataset.checkout_latest().await.unwrap();
dataset.validate().await.unwrap();

let indices = dataset.load_indices().await.unwrap();
Expand All @@ -4408,10 +4411,6 @@ mod tests {
}
assert_eq!(get_bitmap(&indices[0]), vec![0]);
assert_eq!(get_bitmap(&indices[1]), vec![1]);

let stats = dataset.index_statistics("vector_idx").await.unwrap();
let stats: serde_json::Value = serde_json::from_str(&stats).unwrap();
assert_eq!(stats["num_indexed_fragments"], 2);
}

#[rstest]
Expand Down
60 changes: 52 additions & 8 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
//!
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use arrow_schema::DataType;
use async_trait::async_trait;
use futures::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use lance_core::utils::parse::str_is_truthy;
use lance_file::reader::FileReader;
use lance_file::v2;
use lance_file::v2::reader::FileReaderOptions;
Expand Down Expand Up @@ -68,6 +69,17 @@ use self::append::merge_indices;
use self::scalar::build_scalar_index;
use self::vector::{build_vector_index, VectorIndexParams, LANCE_VECTOR_INDEX};

// Whether to auto-migrate a dataset when we encounter corruption.
fn auto_migrate_corruption() -> bool {
static LANCE_AUTO_MIGRATION: OnceLock<bool> = OnceLock::new();
*LANCE_AUTO_MIGRATION.get_or_init(|| {
std::env::var("LANCE_AUTO_MIGRATION")
.ok()
.map(|s| str_is_truthy(&s))
.unwrap_or(true)
})
}

/// Builds index.
#[async_trait]
pub trait IndexBuilder {
Expand Down Expand Up @@ -590,7 +602,8 @@ impl DatasetIndexExt for Dataset {
let index_type = indices[0].index_type().to_string();

let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?;
let num_indexed_rows_per_delta = indexed_fragments_per_delta

let res = indexed_fragments_per_delta
.iter()
.map(|frags| {
let mut sum = 0;
Expand All @@ -604,18 +617,49 @@ impl DatasetIndexExt for Dataset {
}
Ok(sum)
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>();

async fn migrate_and_recompute(ds: &Dataset, index_name: &str) -> Result<String> {
let mut ds = ds.clone();
log::warn!(
"Detecting out-dated fragment metadata, migrating dataset. \
To disable migration, set LANCE_AUTO_MIGRATION=false"
);
ds.delete("false").await.map_err(|err| {
Error::Execution {
message: format!("Failed to migrate dataset while calculating index statistics. \
To disable migration, set LANCE_AUTO_MIGRATION=false. Original error: {}", err),
location: location!(),
}
})?;
ds.index_statistics(index_name).await
}

let num_indexed_rows_per_delta = match res {
Ok(rows) => rows,
Err(Error::Internal { message, .. })
if auto_migrate_corruption() && message.contains("trigger a single write") =>
{
return migrate_and_recompute(self, index_name).await;
}
Err(e) => return Err(e),
};

let mut fragment_ids = HashSet::new();
for frags in indexed_fragments_per_delta.iter() {
for frag in frags.iter() {
if !fragment_ids.insert(frag.id) {
return Err(Error::Internal {
message: "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
if auto_migrate_corruption() {
return migrate_and_recompute(self, index_name).await;
} else {
return Err(Error::Internal {
message:
"Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \
and trigger a single write to fix this"
.to_string(),
location: location!(),
});
.to_string(),
location: location!(),
});
}
}
}
}
Expand Down

0 comments on commit c73d717

Please sign in to comment.