Skip to content

file size limit? #237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
6 changes: 6 additions & 0 deletions admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub fn run() -> Result<(), String> {
let mut options = parity_db::Options::with_columns(db_path.as_path(), 0);
options.columns = metadata.columns;
options.salt = Some(metadata.salt);
options.max_file_size = metadata.max_file_size;
options
} else {
let mut options = parity_db::Options::with_columns(db_path.as_path(), nb_column);
Expand All @@ -47,6 +48,7 @@ pub fn run() -> Result<(), String> {
options.sync_wal = !cli.shared().no_sync;
options.sync_data = !cli.shared().no_sync;
options.stats = cli.shared().with_stats;
options.max_file_size = cli.shared().max_file_size;
log::debug!("Options: {:?}, {:?}", cli, options);
match cli.subcommand {
SubCommand::Stats(stat) => {
Expand Down Expand Up @@ -185,6 +187,10 @@ pub struct Shared {
#[clap(long)]
pub with_stats: bool,

/// If define use multiple files with a size limit. May have effect on performance.
#[clap(long)]
pub max_file_size: Option<usize>,

/// Indicate the number of column, when using
/// a new or temporary db, defaults to one.
#[clap(long)]
Expand Down
80 changes: 61 additions & 19 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub struct HashColumn {
salt: Salt,
stats: ColumnStats,
compression: Compress,
max_file_size: Option<usize>,
db_version: u32,
}

Expand Down Expand Up @@ -355,7 +356,16 @@ impl Column {
let column_options = &metadata.columns[col as usize];
let db_version = metadata.version;
let value = (0..SIZE_TIERS)
.map(|i| Self::open_table(arc_path.clone(), col, i as u8, column_options, db_version))
.map(|i| {
Self::open_table(
arc_path.clone(),
col,
i as u8,
column_options,
db_version,
metadata.max_file_size,
)
})
.collect::<Result<_>>()?;

if column_options.btree_index {
Expand All @@ -371,10 +381,11 @@ impl Column {
tier: u8,
options: &ColumnOptions,
db_version: u32,
max_file_size: Option<usize>,
) -> Result<ValueTable> {
let id = ValueTableId::new(col, tier);
let entry_size = SIZES.get(tier as usize).cloned();
ValueTable::open(path, id, entry_size, options, db_version)
ValueTable::open(path, id, entry_size, options, db_version, max_file_size)
}

pub(crate) fn drop_files(column: ColId, path: PathBuf) -> Result<()> {
Expand Down Expand Up @@ -433,14 +444,16 @@ impl HashColumn {
options: &Options,
metadata: &Metadata,
) -> Result<HashColumn> {
let (index, mut reindexing, stats) = Self::open_index(&options.path, col)?;
let max_size = options.max_file_size;
let (index, mut reindexing, stats) = Self::open_index(&options.path, col, max_size)?;
let collect_stats = options.stats;
let path = &options.path;
let max_file_size = options.max_file_size;
let col_options = &metadata.columns[col as usize];
let db_version = metadata.version;
let (ref_count, ref_count_cache) = if col_options.multitree && !col_options.append_only {
(
Some(Self::open_ref_count(&options.path, col, &mut reindexing)?),
Some(Self::open_ref_count(&options.path, col, &mut reindexing, max_size)?),
Some(RwLock::new(Default::default())),
)
} else {
Expand All @@ -452,6 +465,7 @@ impl HashColumn {
reindex: RwLock::new(Reindex { queue: reindexing, progress: AtomicU64::new(0) }),
ref_count_cache,
path: path.into(),
max_file_size,
preimage: col_options.preimage,
uniform_keys: col_options.uniform,
ref_counted: col_options.ref_counted,
Expand Down Expand Up @@ -528,13 +542,14 @@ impl HashColumn {
fn open_index(
path: &std::path::Path,
col: ColId,
max_size: Option<usize>,
) -> Result<(IndexTable, VecDeque<ReindexEntry>, ColumnStats)> {
let mut reindexing = VecDeque::new();
let mut top = None;
let mut stats = ColumnStats::empty();
for bits in (MIN_INDEX_BITS..65).rev() {
let id = IndexTableId::new(col, bits);
if let Some(table) = IndexTable::open_existing(path, id)? {
if let Some(table) = IndexTable::open_existing(path, id, max_size)? {
if top.is_none() {
stats = table.load_stats()?;
log::trace!(target: "parity-db", "Opened main index {}", table.id);
Expand All @@ -547,7 +562,7 @@ impl HashColumn {
}
let table = match top {
Some(table) => table,
None => IndexTable::create_new(path, IndexTableId::new(col, MIN_INDEX_BITS)),
None => IndexTable::create_new(path, IndexTableId::new(col, MIN_INDEX_BITS), max_size),
};
Ok((table, reindexing, stats))
}
Expand All @@ -556,11 +571,12 @@ impl HashColumn {
path: &std::path::Path,
col: ColId,
reindexing: &mut VecDeque<ReindexEntry>,
max_size: Option<usize>,
) -> Result<RefCountTable> {
let mut top = None;
for bits in (MIN_REF_COUNT_BITS..65).rev() {
let id = RefCountTableId::new(col, bits);
if let Some(table) = RefCountTable::open_existing(path, id)? {
if let Some(table) = RefCountTable::open_existing(path, id, max_size)? {
if top.is_none() {
log::trace!(target: "parity-db", "Opened main ref count {}", table.id);
top = Some(table);
Expand All @@ -572,7 +588,11 @@ impl HashColumn {
}
let table = match top {
Some(table) => table,
None => RefCountTable::create_new(path, RefCountTableId::new(col, MIN_REF_COUNT_BITS)),
None => RefCountTable::create_new(
path,
RefCountTableId::new(col, MIN_REF_COUNT_BITS),
max_size,
),
};
Ok(table)
}
Expand All @@ -581,6 +601,7 @@ impl HashColumn {
tables: RwLockUpgradableReadGuard<'a, Tables>,
reindex: RwLockUpgradableReadGuard<'b, Reindex>,
path: &std::path::Path,
max_file_size: Option<usize>,
) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) {
let mut tables = RwLockUpgradableReadGuard::upgrade(tables);
let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex);
Expand All @@ -592,7 +613,7 @@ impl HashColumn {
// Start reindex
let new_index_id =
IndexTableId::new(tables.index.id.col(), tables.index.id.index_bits() + 1);
let new_table = IndexTable::create_new(path, new_index_id);
let new_table = IndexTable::create_new(path, new_index_id, max_file_size);
let old_table = std::mem::replace(&mut tables.index, new_table);
reindex.queue.push_back(ReindexEntry::Index(old_table));
(
Expand Down Expand Up @@ -629,7 +650,8 @@ impl HashColumn {
tables.index.write_insert_plan(key, address, None, log)?
{
log::debug!(target: "parity-db", "{}: Index chunk full {} when reindexing", tables.index.id, hex(key));
(tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
(tables, reindex) =
Self::trigger_reindex(tables, reindex, self.path.as_path(), self.max_file_size);
outcome = PlanOutcome::NeedReindex;
}
Ok(outcome)
Expand Down Expand Up @@ -804,7 +826,8 @@ impl HashColumn {
tables.index.write_insert_plan(key, address, None, log)?
{
log::debug!(target: "parity-db", "{}: Index chunk full {}", tables.index.id, hex(key));
(tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
(tables, reindex) =
Self::trigger_reindex(tables, reindex, self.path.as_path(), self.max_file_size);
outcome = PlanOutcome::NeedReindex;
}
Ok((outcome, tables, reindex))
Expand All @@ -814,6 +837,7 @@ impl HashColumn {
tables: RwLockUpgradableReadGuard<'a, Tables>,
reindex: RwLockUpgradableReadGuard<'b, Reindex>,
path: &std::path::Path,
max_size: Option<usize>,
) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) {
let mut tables = RwLockUpgradableReadGuard::upgrade(tables);
let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex);
Expand All @@ -827,7 +851,8 @@ impl HashColumn {
tables.get_ref_count().id.col(),
tables.get_ref_count().id.index_bits() + 1,
);
let new_table = Some(RefCountTable::create_new(path, new_id));
let max_size = max_size;
let new_table = Some(RefCountTable::create_new(path, new_id, max_size));
let old_table = std::mem::replace(&mut tables.ref_count, new_table);
reindex.queue.push_back(ReindexEntry::RefCount(old_table.unwrap()));
(
Expand Down Expand Up @@ -879,8 +904,12 @@ impl HashColumn {
tables.get_ref_count().write_insert_plan(address, ref_count, None, log)?
{
log::debug!(target: "parity-db", "{}: Ref count chunk full {} when reindexing", tables.get_ref_count().id, address);
(tables, reindex) =
Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path());
(tables, reindex) = Self::trigger_ref_count_reindex(
tables,
reindex,
self.path.as_path(),
self.max_file_size,
);
outcome = PlanOutcome::NeedReindex;
}
Ok(outcome)
Expand Down Expand Up @@ -980,8 +1009,12 @@ impl HashColumn {
tables.get_ref_count().write_insert_plan(address, ref_count, None, log)?
{
log::debug!(target: "parity-db", "{}: Ref count chunk full {}", tables.get_ref_count().id, address);
(tables, reindex) =
Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path());
(tables, reindex) = Self::trigger_ref_count_reindex(
tables,
reindex,
self.path.as_path(),
self.max_file_size,
);
outcome = PlanOutcome::NeedReindex;
}
let (test_ref_count, _test_sub_index) = tables.get_ref_count().get(address, log)?.unwrap();
Expand Down Expand Up @@ -1379,7 +1412,12 @@ impl HashColumn {
"Missing table {}, starting reindex",
record.table,
);
let lock = Self::trigger_reindex(tables, reindex, self.path.as_path());
let lock = Self::trigger_reindex(
tables,
reindex,
self.path.as_path(),
self.max_file_size,
);
std::mem::drop(lock);
return self.validate_plan(LogAction::InsertIndex(record), log)
}
Expand Down Expand Up @@ -1410,8 +1448,12 @@ impl HashColumn {
"Missing ref count {}, starting reindex",
record.table,
);
let lock =
Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path());
let lock = Self::trigger_ref_count_reindex(
tables,
reindex,
self.path.as_path(),
self.max_file_size,
);
std::mem::drop(lock);
return self.validate_plan(LogAction::InsertRefCount(record), log)
}
Expand Down
1 change: 1 addition & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2337,6 +2337,7 @@ mod tests {
sync_wal: true,
sync_data: true,
stats: true,
max_file_size: None,
salt: None,
columns: (0..num_columns).map(|_| Default::default()).collect(),
compression_threshold: HashMap::new(),
Expand Down
Loading
Loading