diff --git a/admin/src/lib.rs b/admin/src/lib.rs index 9f3673ff..8351ac1a 100644 --- a/admin/src/lib.rs +++ b/admin/src/lib.rs @@ -34,6 +34,7 @@ pub fn run() -> Result<(), String> { let mut options = parity_db::Options::with_columns(db_path.as_path(), 0); options.columns = metadata.columns; options.salt = Some(metadata.salt); + options.max_file_size = metadata.max_file_size; options } else { let mut options = parity_db::Options::with_columns(db_path.as_path(), nb_column); @@ -47,6 +48,7 @@ pub fn run() -> Result<(), String> { options.sync_wal = !cli.shared().no_sync; options.sync_data = !cli.shared().no_sync; options.stats = cli.shared().with_stats; + options.max_file_size = cli.shared().max_file_size; log::debug!("Options: {:?}, {:?}", cli, options); match cli.subcommand { SubCommand::Stats(stat) => { @@ -185,6 +187,10 @@ pub struct Shared { #[clap(long)] pub with_stats: bool, + /// If define use multiple files with a size limit. May have effect on performance. + #[clap(long)] + pub max_file_size: Option, + /// Indicate the number of column, when using /// a new or temporary db, defaults to one. #[clap(long)] diff --git a/src/column.rs b/src/column.rs index f50f9e09..b78d8698 100644 --- a/src/column.rs +++ b/src/column.rs @@ -121,6 +121,7 @@ pub struct HashColumn { salt: Salt, stats: ColumnStats, compression: Compress, + max_file_size: Option, db_version: u32, } @@ -355,7 +356,16 @@ impl Column { let column_options = &metadata.columns[col as usize]; let db_version = metadata.version; let value = (0..SIZE_TIERS) - .map(|i| Self::open_table(arc_path.clone(), col, i as u8, column_options, db_version)) + .map(|i| { + Self::open_table( + arc_path.clone(), + col, + i as u8, + column_options, + db_version, + metadata.max_file_size, + ) + }) .collect::>()?; if column_options.btree_index { @@ -371,10 +381,11 @@ impl Column { tier: u8, options: &ColumnOptions, db_version: u32, + max_file_size: Option, ) -> Result { let id = ValueTableId::new(col, tier); let entry_size = SIZES.get(tier as usize).cloned(); - ValueTable::open(path, id, entry_size, options, db_version) + ValueTable::open(path, id, entry_size, options, db_version, max_file_size) } pub(crate) fn drop_files(column: ColId, path: PathBuf) -> Result<()> { @@ -433,14 +444,16 @@ impl HashColumn { options: &Options, metadata: &Metadata, ) -> Result { - let (index, mut reindexing, stats) = Self::open_index(&options.path, col)?; + let max_size = options.max_file_size; + let (index, mut reindexing, stats) = Self::open_index(&options.path, col, max_size)?; let collect_stats = options.stats; let path = &options.path; + let max_file_size = options.max_file_size; let col_options = &metadata.columns[col as usize]; let db_version = metadata.version; let (ref_count, ref_count_cache) = if col_options.multitree && !col_options.append_only { ( - Some(Self::open_ref_count(&options.path, col, &mut reindexing)?), + Some(Self::open_ref_count(&options.path, col, &mut reindexing, max_size)?), Some(RwLock::new(Default::default())), ) } else { @@ -452,6 +465,7 @@ impl HashColumn { reindex: RwLock::new(Reindex { queue: reindexing, progress: AtomicU64::new(0) }), ref_count_cache, path: path.into(), + max_file_size, preimage: col_options.preimage, uniform_keys: col_options.uniform, ref_counted: col_options.ref_counted, @@ -528,13 +542,14 @@ impl HashColumn { fn open_index( path: &std::path::Path, col: ColId, + max_size: Option, ) -> Result<(IndexTable, VecDeque, ColumnStats)> { let mut reindexing = VecDeque::new(); let mut top = None; let mut stats = ColumnStats::empty(); for bits in (MIN_INDEX_BITS..65).rev() { let id = IndexTableId::new(col, bits); - if let Some(table) = IndexTable::open_existing(path, id)? { + if let Some(table) = IndexTable::open_existing(path, id, max_size)? { if top.is_none() { stats = table.load_stats()?; log::trace!(target: "parity-db", "Opened main index {}", table.id); @@ -547,7 +562,7 @@ impl HashColumn { } let table = match top { Some(table) => table, - None => IndexTable::create_new(path, IndexTableId::new(col, MIN_INDEX_BITS)), + None => IndexTable::create_new(path, IndexTableId::new(col, MIN_INDEX_BITS), max_size), }; Ok((table, reindexing, stats)) } @@ -556,11 +571,12 @@ impl HashColumn { path: &std::path::Path, col: ColId, reindexing: &mut VecDeque, + max_size: Option, ) -> Result { let mut top = None; for bits in (MIN_REF_COUNT_BITS..65).rev() { let id = RefCountTableId::new(col, bits); - if let Some(table) = RefCountTable::open_existing(path, id)? { + if let Some(table) = RefCountTable::open_existing(path, id, max_size)? { if top.is_none() { log::trace!(target: "parity-db", "Opened main ref count {}", table.id); top = Some(table); @@ -572,7 +588,11 @@ impl HashColumn { } let table = match top { Some(table) => table, - None => RefCountTable::create_new(path, RefCountTableId::new(col, MIN_REF_COUNT_BITS)), + None => RefCountTable::create_new( + path, + RefCountTableId::new(col, MIN_REF_COUNT_BITS), + max_size, + ), }; Ok(table) } @@ -581,6 +601,7 @@ impl HashColumn { tables: RwLockUpgradableReadGuard<'a, Tables>, reindex: RwLockUpgradableReadGuard<'b, Reindex>, path: &std::path::Path, + max_file_size: Option, ) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) { let mut tables = RwLockUpgradableReadGuard::upgrade(tables); let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex); @@ -592,7 +613,7 @@ impl HashColumn { // Start reindex let new_index_id = IndexTableId::new(tables.index.id.col(), tables.index.id.index_bits() + 1); - let new_table = IndexTable::create_new(path, new_index_id); + let new_table = IndexTable::create_new(path, new_index_id, max_file_size); let old_table = std::mem::replace(&mut tables.index, new_table); reindex.queue.push_back(ReindexEntry::Index(old_table)); ( @@ -629,7 +650,8 @@ impl HashColumn { tables.index.write_insert_plan(key, address, None, log)? { log::debug!(target: "parity-db", "{}: Index chunk full {} when reindexing", tables.index.id, hex(key)); - (tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path()); + (tables, reindex) = + Self::trigger_reindex(tables, reindex, self.path.as_path(), self.max_file_size); outcome = PlanOutcome::NeedReindex; } Ok(outcome) @@ -804,7 +826,8 @@ impl HashColumn { tables.index.write_insert_plan(key, address, None, log)? { log::debug!(target: "parity-db", "{}: Index chunk full {}", tables.index.id, hex(key)); - (tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path()); + (tables, reindex) = + Self::trigger_reindex(tables, reindex, self.path.as_path(), self.max_file_size); outcome = PlanOutcome::NeedReindex; } Ok((outcome, tables, reindex)) @@ -814,6 +837,7 @@ impl HashColumn { tables: RwLockUpgradableReadGuard<'a, Tables>, reindex: RwLockUpgradableReadGuard<'b, Reindex>, path: &std::path::Path, + max_size: Option, ) -> (RwLockUpgradableReadGuard<'a, Tables>, RwLockUpgradableReadGuard<'b, Reindex>) { let mut tables = RwLockUpgradableReadGuard::upgrade(tables); let mut reindex = RwLockUpgradableReadGuard::upgrade(reindex); @@ -827,7 +851,8 @@ impl HashColumn { tables.get_ref_count().id.col(), tables.get_ref_count().id.index_bits() + 1, ); - let new_table = Some(RefCountTable::create_new(path, new_id)); + let max_size = max_size; + let new_table = Some(RefCountTable::create_new(path, new_id, max_size)); let old_table = std::mem::replace(&mut tables.ref_count, new_table); reindex.queue.push_back(ReindexEntry::RefCount(old_table.unwrap())); ( @@ -879,8 +904,12 @@ impl HashColumn { tables.get_ref_count().write_insert_plan(address, ref_count, None, log)? { log::debug!(target: "parity-db", "{}: Ref count chunk full {} when reindexing", tables.get_ref_count().id, address); - (tables, reindex) = - Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path()); + (tables, reindex) = Self::trigger_ref_count_reindex( + tables, + reindex, + self.path.as_path(), + self.max_file_size, + ); outcome = PlanOutcome::NeedReindex; } Ok(outcome) @@ -980,8 +1009,12 @@ impl HashColumn { tables.get_ref_count().write_insert_plan(address, ref_count, None, log)? { log::debug!(target: "parity-db", "{}: Ref count chunk full {}", tables.get_ref_count().id, address); - (tables, reindex) = - Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path()); + (tables, reindex) = Self::trigger_ref_count_reindex( + tables, + reindex, + self.path.as_path(), + self.max_file_size, + ); outcome = PlanOutcome::NeedReindex; } let (test_ref_count, _test_sub_index) = tables.get_ref_count().get(address, log)?.unwrap(); @@ -1379,7 +1412,12 @@ impl HashColumn { "Missing table {}, starting reindex", record.table, ); - let lock = Self::trigger_reindex(tables, reindex, self.path.as_path()); + let lock = Self::trigger_reindex( + tables, + reindex, + self.path.as_path(), + self.max_file_size, + ); std::mem::drop(lock); return self.validate_plan(LogAction::InsertIndex(record), log) } @@ -1410,8 +1448,12 @@ impl HashColumn { "Missing ref count {}, starting reindex", record.table, ); - let lock = - Self::trigger_ref_count_reindex(tables, reindex, self.path.as_path()); + let lock = Self::trigger_ref_count_reindex( + tables, + reindex, + self.path.as_path(), + self.max_file_size, + ); std::mem::drop(lock); return self.validate_plan(LogAction::InsertRefCount(record), log) } diff --git a/src/db.rs b/src/db.rs index 55f7b272..dad7ee20 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2337,6 +2337,7 @@ mod tests { sync_wal: true, sync_data: true, stats: true, + max_file_size: None, salt: None, columns: (0..num_columns).map(|_| Default::default()).collect(), compression_threshold: HashMap::new(), diff --git a/src/file.rs b/src/file.rs index e07095cf..7c4cd3dd 100644 --- a/src/file.rs +++ b/src/file.rs @@ -14,6 +14,15 @@ trait OpenOptionsExt { fn disable_read_ahead(&mut self) -> &mut Self; } +#[inline] +fn offset_to_file_index(offset: u64, max_size: Option) -> (u64, usize) { + if let Some(m) = max_size { + (offset % m as u64, offset as usize / m) + } else { + (offset, 0) + } +} + impl OpenOptionsExt for std::fs::OpenOptions { #[cfg(not(windows))] fn disable_read_ahead(&mut self) -> &mut Self { @@ -65,7 +74,7 @@ pub fn madvise_random(map: &mut memmap2::MmapMut) { pub fn madvise_random(_map: &mut memmap2::MmapMut) {} #[cfg(not(windows))] -fn mmap(file: &std::fs::File, len: usize) -> Result { +fn mmap(file: &std::fs::File, len: usize, max_size: Option) -> Result { #[cfg(not(test))] const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; // 1 Gb // Use a different value for tests to work around docker limits on the test machine. @@ -73,13 +82,14 @@ fn mmap(file: &std::fs::File, len: usize) -> Result { const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; // 64 Mb let map_len = len + RESERVE_ADDRESS_SPACE; + let map_len = max_size.map(|m| std::cmp::min(map_len, m)).unwrap_or(map_len); let mut map = try_io!(unsafe { memmap2::MmapOptions::new().len(map_len).map_mut(file) }); madvise_random(&mut map); Ok(map) } #[cfg(windows)] -fn mmap(file: &std::fs::File, _len: usize) -> Result { +fn mmap(file: &std::fs::File, _len: usize, _max_size: Option) -> Result { Ok(try_io!(unsafe { memmap2::MmapOptions::new().map_mut(file) })) } @@ -87,84 +97,111 @@ const GROW_SIZE_BYTES: u64 = 256 * 1024; #[derive(Debug)] pub struct TableFile { - pub map: RwLock>, - pub path: std::path::PathBuf, + pub maps: RwLock>, + pub path_base: std::path::PathBuf, pub capacity: AtomicU64, pub id: TableId, + pub max_size: Option, } impl TableFile { - pub fn open(filepath: std::path::PathBuf, entry_size: u16, id: TableId) -> Result { - let mut capacity = 0u64; - let map = if std::fs::metadata(&filepath).is_ok() { - let file = try_io!(std::fs::OpenOptions::new() - .read(true) - .write(true) - .disable_read_ahead() - .open(filepath.as_path())); - try_io!(disable_read_ahead(&file)); - let len = try_io!(file.metadata()).len(); - if len == 0 { - // Preallocate. - capacity += GROW_SIZE_BYTES / entry_size as u64; - try_io!(file.set_len(GROW_SIZE_BYTES)); - } else { - capacity = len / entry_size as u64; - } - let map = mmap(&file, len as usize)?; - Some((map, file)) + pub fn open( + path_base: std::path::PathBuf, + entry_size: u16, + id: TableId, + max_file_size: Option, + ) -> Result { + let max_size = if let Some(m) = max_file_size { + let m = m * 1024 * 1024; + Some((m / entry_size as usize) * entry_size as usize) } else { None }; + let mut capacity = 0u64; + let mut maps = Vec::new(); + for i in 0.. { + let mut filepath = path_base.clone(); + filepath.push(id.file_name(max_file_size.is_some().then(|| i))); + if std::fs::metadata(&filepath).is_ok() { + let file = try_io!(std::fs::OpenOptions::new() + .read(true) + .write(true) + .disable_read_ahead() + .open(filepath.as_path())); + try_io!(disable_read_ahead(&file)); + let len = try_io!(file.metadata()).len(); + if len == 0 { + // Preallocate. + capacity += GROW_SIZE_BYTES / entry_size as u64; + try_io!(file.set_len(GROW_SIZE_BYTES)); + } else { + capacity += len / entry_size as u64; + } + let map = mmap(&file, len as usize, max_size)?; + maps.push((map, file)); + } else { + break; + }; + if max_file_size.is_none() { + break; + } + } Ok(TableFile { - path: filepath, - map: RwLock::new(map), + path_base, + maps: RwLock::new(maps), capacity: AtomicU64::new(capacity), id, + max_size, }) } - fn create_file(&self) -> Result { + fn create_file(&self, index: Option) -> Result { log::debug!(target: "parity-db", "Created value table {}", self.id); + let mut path = self.path_base.clone(); + path.push(self.id.file_name(index)); let file = try_io!(std::fs::OpenOptions::new() .create(true) .read(true) .write(true) .disable_read_ahead() - .open(self.path.as_path())); + .open(&path)); try_io!(disable_read_ahead(&file)); Ok(file) } pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> { + let (offset, file_index) = offset_to_file_index(offset, self.max_size); let offset = offset as usize; - let map = self.map.read(); - let (map, _) = map.as_ref().unwrap(); + let map = self.maps.read(); + let (map, _) = map.get(file_index).unwrap(); buf.copy_from_slice(&map[offset..offset + buf.len()]); Ok(()) } #[cfg(not(feature = "loom"))] pub fn slice_at(&self, offset: u64, len: usize) -> MappedBytesGuard { + let (offset, file_index) = offset_to_file_index(offset, self.max_size); let offset = offset as usize; - let map = self.map.read(); + let map = self.maps.read(); parking_lot::RwLockReadGuard::map(map, |map| { - let (map, _) = map.as_ref().unwrap(); + let (map, _) = map.get(file_index).unwrap(); &map[offset..offset + len] }) } #[cfg(feature = "loom")] pub fn slice_at(&self, offset: u64, len: usize) -> MappedBytesGuard { + let (offset, file_index) = offset_to_file_index(offset, self.max_size); let offset = offset as usize; - let map = self.map.read(); - let (map, _) = map.as_ref().unwrap(); + let map = self.maps.read(); + let (map, _) = map.get(file_index).unwrap(); MappedBytesGuard::new(map[offset..offset + len].to_vec()) } pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> { - let map = self.map.read(); - let (map, _) = map.as_ref().unwrap(); + let map = self.maps.read(); + let (offset, file_index) = offset_to_file_index(offset, self.max_size); + let (map, _) = map.get(file_index).unwrap(); let offset = offset as usize; // Nasty mutable pointer cast. We do ensure that all chunks that are being written are @@ -179,45 +216,72 @@ impl TableFile { } pub fn grow(&self, entry_size: u16) -> Result<()> { - let mut map_and_file = self.map.write(); - let new_len = match map_and_file.as_mut() { - None => { - let file = self.create_file()?; - let len = GROW_SIZE_BYTES; - try_io!(file.set_len(len)); - let map = mmap(&file, 0)?; - *map_and_file = Some((map, file)); - len - }, - Some((map, file)) => { - let new_len = try_io!(file.metadata()).len() + GROW_SIZE_BYTES; - try_io!(file.set_len(new_len)); - if map.len() < new_len as usize { - let new_map = mmap(&file, new_len as usize)?; - let old_map = std::mem::replace(map, new_map); - try_io!(old_map.flush()); + let mut maps = self.maps.write(); + let num_maps = maps.len(); + let (current_len, push) = match maps.last() { + None => (0, true), + Some((_, file)) => { + let len = try_io!(file.metadata()).len(); + if self.max_size == Some(len as usize) { + (0, true) + } else { + (len, false) } - new_len }, }; - let capacity = new_len / entry_size as u64; + let per_file_capacity = self.max_size.map(|m| m / entry_size as usize).unwrap_or(0); + let (new_len, prev_page_capacity) = if push { + let file = self.create_file(self.max_size.is_some().then(|| num_maps as u32))?; + let new_len = self + .max_size + .map(|m| std::cmp::min(m as u64, GROW_SIZE_BYTES)) + .unwrap_or(GROW_SIZE_BYTES); + try_io!(file.set_len(new_len)); + let map = mmap(&file, new_len as usize, self.max_size)?; + maps.push((map, file)); + (new_len, num_maps * per_file_capacity) + } else { + let (map, file) = maps.last_mut().unwrap(); + let new_len = current_len + GROW_SIZE_BYTES as u64; + let new_len = + self.max_size.map(|m| std::cmp::min(m as u64, new_len)).unwrap_or(new_len); + try_io!(file.set_len(new_len)); + { + let new_map = mmap(&file, new_len as usize, self.max_size)?; + let old_map = std::mem::replace(map, new_map); + try_io!(old_map.flush()); + } + if num_maps > 0 { + (new_len, (num_maps - 1) * per_file_capacity) + } else { + (new_len, 0) + } + }; + let capacity = new_len / entry_size as u64 + prev_page_capacity as u64; self.capacity.store(capacity, Ordering::Relaxed); Ok(()) } pub fn flush(&self) -> Result<()> { - if let Some((map, _)) = self.map.read().as_ref() { + let maps = self.maps.read(); + for (map, _) in maps.iter() { try_io!(map.flush()); } Ok(()) } pub fn remove(&self) -> Result<()> { - let mut map = self.map.write(); - if let Some((map, file)) = map.take() { + let mut maps_lock = self.maps.write(); + let mut maps = std::mem::take(&mut *maps_lock); + let maps_len = maps.len(); + maps.reverse(); + for i in 0..maps_len as u32 { + let (map, file) = maps.pop().unwrap(); drop(map); drop(file); - try_io!(std::fs::remove_file(&self.path)); + let mut path = self.path_base.clone(); + path.push(self.id.file_name(self.max_size.is_some().then(|| i))); + try_io!(std::fs::remove_file(&path)); } Ok(()) } diff --git a/src/index.rs b/src/index.rs index 2501a6d7..313517fa 100644 --- a/src/index.rs +++ b/src/index.rs @@ -30,6 +30,18 @@ pub const ENTRY_BYTES: usize = ENTRY_BITS as usize / 8; const EMPTY_CHUNK: Chunk = Chunk([0u8; CHUNK_LEN]); const EMPTY_ENTRIES: [Entry; CHUNK_ENTRIES] = [Entry::empty(); CHUNK_ENTRIES]; +#[inline] +fn chunk_to_file_index(mut chunk_index: u64, max_chunks: Option) -> (u64, usize) { + // count meta in. + chunk_index += (META_SIZE / CHUNK_LEN) as u64; + + if let Some(i) = max_chunks { + (chunk_index % i, (chunk_index / i) as usize) + } else { + (chunk_index, 0) + } +} + #[repr(C, align(8))] #[derive(PartialEq, Eq, Clone, Debug)] pub struct Chunk(pub [u8; CHUNK_LEN]); @@ -37,6 +49,9 @@ pub struct Chunk(pub [u8; CHUNK_LEN]); #[allow(clippy::assertions_on_constants)] const _: () = assert!(META_SIZE >= HEADER_SIZE + stats::TOTAL_SIZE); +#[allow(clippy::assertions_on_constants)] +const _: () = assert!(META_SIZE % CHUNK_LEN == 0); + #[derive(PartialEq, Eq, Clone, Copy)] pub struct Entry(u64); @@ -130,8 +145,9 @@ pub enum PlanOutcome { #[derive(Debug)] pub struct IndexTable { pub id: TableId, - map: RwLock>, - path: std::path::PathBuf, + map: RwLock>, + path_base: std::path::PathBuf, + max_chunks: Option, } fn total_entries(index_bits: u8) -> u64 { @@ -142,8 +158,10 @@ fn total_chunks(index_bits: u8) -> u64 { 1u64 << index_bits } -fn file_size(index_bits: u8) -> u64 { - total_entries(index_bits) * 8 + META_SIZE as u64 +fn file_size(index_bits: u8, max_chunks: Option) -> u64 { + let max_size = max_chunks.map(|c| c * CHUNK_LEN as u64); + let total = total_entries(index_bits) * 8 + META_SIZE as u64; + max_size.map(|m| std::cmp::min(m, total)).unwrap_or(total) } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] @@ -166,8 +184,12 @@ impl TableId { (self.0 & 0xff) as u8 } - pub fn file_name(&self) -> String { - format!("index_{:02}_{}", self.col(), self.index_bits()) + pub fn file_name(&self, file_index: Option) -> String { + if let Some(i) = file_index { + format!("index_{:02}_{}_{:08x}", self.col(), self.index_bits(), i) + } else { + format!("index_{:02}_{}", self.col(), self.index_bits()) + } } pub fn is_file_name(col: ColId, name: &str) -> bool { @@ -208,31 +230,53 @@ impl std::fmt::Display for TableId { } impl IndexTable { - pub fn open_existing(path: &std::path::Path, id: TableId) -> Result> { - let mut path: std::path::PathBuf = path.into(); - path.push(id.file_name()); - - let file = match std::fs::OpenOptions::new().read(true).write(true).open(path.as_path()) { - Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), - Err(e) => return Err(Error::Io(e)), - Ok(file) => file, - }; + pub fn open_existing( + path: &std::path::Path, + id: TableId, + max_size: Option, + ) -> Result> { + let path_base: std::path::PathBuf = path.into(); + let mut maps = Vec::new(); + let max_chunks = max_size.map(|s| (s * 1024 * 1024 / CHUNK_LEN) as u64); + for i in 0.. { + let mut path = path_base.clone(); + path.push(id.file_name(max_size.is_some().then(|| i))); + + let file = match std::fs::OpenOptions::new().read(true).write(true).open(path.as_path()) + { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => + if i == 0 { + return Ok(None) + } else { + break + }, + Err(e) => return Err(Error::Io(e)), + Ok(file) => file, + }; - try_io!(file.set_len(file_size(id.index_bits()))); - let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); - madvise_random(&mut map); - log::debug!(target: "parity-db", "Opened existing index {}", id); - Ok(Some(IndexTable { id, path, map: RwLock::new(Some(map)) })) + try_io!(file.set_len(file_size(id.index_bits(), max_chunks))); + let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); + madvise_random(&mut map); + log::debug!(target: "parity-db", "Opened existing index {}", id); + maps.push(map); + if max_size.is_none() { + break; + } + } + Ok(Some(IndexTable { id, path_base, map: RwLock::new(maps), max_chunks })) } - pub fn create_new(path: &std::path::Path, id: TableId) -> IndexTable { - let mut path: std::path::PathBuf = path.into(); - path.push(id.file_name()); - IndexTable { id, path, map: RwLock::new(None) } + pub fn create_new(path: &std::path::Path, id: TableId, max_size: Option) -> IndexTable { + IndexTable { + id, + path_base: path.into(), + map: RwLock::new(Vec::new()), + max_chunks: max_size.map(|s| (s * 1024 * 1024 / CHUNK_LEN) as u64), + } } pub fn load_stats(&self) -> Result { - if let Some(map) = &*self.map.read() { + if let Some(map) = self.map.read().first() { Ok(ColumnStats::from_slice(try_io!(Ok( &map[HEADER_SIZE..HEADER_SIZE + stats::TOTAL_SIZE] )))) @@ -242,7 +286,7 @@ impl IndexTable { } pub fn write_stats(&self, stats: &ColumnStats) -> Result<()> { - if let Some(map) = &mut *self.map.write() { + if let Some(map) = &mut self.map.write().first_mut() { let slice = try_io!(Ok(&mut map[HEADER_SIZE..HEADER_SIZE + stats::TOTAL_SIZE])); stats.to_slice(slice); } @@ -250,18 +294,19 @@ impl IndexTable { } fn chunk_at(index: u64, map: &memmap2::MmapMut) -> Result<&Chunk> { - let offset = META_SIZE + index as usize * CHUNK_LEN; + let offset = index as usize * CHUNK_LEN; let ptr = unsafe { &*(map[offset..offset + CHUNK_LEN].as_ptr() as *const Chunk) }; Ok(try_io!(Ok(ptr))) } fn chunk_entries_at(index: u64, map: &memmap2::MmapMut) -> Result<&[Entry; CHUNK_ENTRIES]> { - let offset = META_SIZE + index as usize * CHUNK_LEN; + let offset = index as usize * CHUNK_LEN; let ptr = unsafe { &*(map[offset..offset + CHUNK_LEN].as_ptr() as *const [Entry; CHUNK_ENTRIES]) }; Ok(try_io!(Ok(ptr))) } + #[cfg(target_arch = "x86_64")] fn find_entry(&self, key_prefix: u64, sub_index: usize, chunk: &Chunk) -> (Entry, usize) { self.find_entry_sse2(key_prefix, sub_index, chunk) @@ -352,7 +397,8 @@ impl IndexTable { return Ok(entry) } - if let Some(map) = &*self.map.read() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = self.map.read().get(file_index) { log::trace!(target: "parity-db", "{}: Querying chunk at {}", self.id, chunk_index); let chunk = Self::chunk_at(chunk_index, map)?; return Ok(self.find_entry(key, sub_index, chunk)) @@ -366,7 +412,8 @@ impl IndexTable { { return Ok(entry) } - if let Some(map) = &*self.map.read() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = self.map.read().get(file_index) { let chunk = Self::chunk_at(chunk_index, map)?; return Ok(*Self::transmute_chunk(chunk)) } @@ -376,16 +423,21 @@ impl IndexTable { pub fn sorted_entries(&self) -> Result> { log::info!(target: "parity-db", "{}: Loading into memory", self.id); let mut target = Vec::with_capacity(self.id.total_entries() as usize / 2); - if let Some(map) = &*self.map.read() { - for chunk_index in 0..self.id.total_chunks() { + let maps = self.map.read(); + for chunk_index in 0..self.id.total_chunks() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = maps.get(file_index) { let source = Self::chunk_entries_at(chunk_index, map)?; for e in source { if !e.is_empty() { target.push(*e); } } + } else { + break; } } + drop(maps); log::info!(target: "parity-db", "{}: Sorting index", self.id); target.sort_unstable_by(|a, b| { let a = a.address(self.id.index_bits()); @@ -470,7 +522,8 @@ impl IndexTable { return self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log) } - if let Some(map) = &*self.map.read() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = self.map.read().get(file_index) { let chunk = Self::chunk_at(chunk_index, map)?.clone(); return self.plan_insert_chunk(key_prefix, address, chunk, sub_index, log) } @@ -516,7 +569,8 @@ impl IndexTable { return self.plan_remove_chunk(key_prefix, chunk, sub_index, log) } - if let Some(map) = &*self.map.read() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = self.map.read().get(file_index) { let chunk = Self::chunk_at(chunk_index, map)?.clone(); return self.plan_remove_chunk(key_prefix, chunk, sub_index, log) } @@ -526,23 +580,29 @@ impl IndexTable { pub fn enact_plan(&self, index: u64, log: &mut LogReader) -> Result<()> { let mut map = self.map.upgradable_read(); - if map.is_none() { + let (chunk_index, file_index) = chunk_to_file_index(index, self.max_chunks); + let map_len = map.len(); + if map_len <= file_index { let mut wmap = RwLockUpgradableReadGuard::upgrade(map); - let file = try_io!(std::fs::OpenOptions::new() - .write(true) - .read(true) - .create_new(true) - .open(self.path.as_path())); - log::debug!(target: "parity-db", "Created new index {}", self.id); - try_io!(file.set_len(file_size(self.id.index_bits()))); - let mut mmap = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); - madvise_random(&mut mmap); - *wmap = Some(mmap); + for i in map_len..file_index + 1 { + let mut path = self.path_base.clone(); + path.push(self.id.file_name(self.max_chunks.is_some().then(|| i as u32))); + let file = try_io!(std::fs::OpenOptions::new() + .write(true) + .read(true) + .create_new(true) + .open(path.as_path())); + log::debug!(target: "parity-db", "Created new index {}", self.id); + try_io!(file.set_len(file_size(self.id.index_bits(), self.max_chunks))); + let mut mmap = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); + madvise_random(&mut mmap); + wmap.push(mmap); + } map = RwLockWriteGuard::downgrade_to_upgradable(wmap); } - let map = map.as_ref().unwrap(); - let offset = META_SIZE + index as usize * CHUNK_LEN; + let map = map.get(file_index).unwrap(); + let offset = chunk_index as usize * CHUNK_LEN; // Nasty mutable pointer cast. We do ensure that all chunks that are being written are // accessed through the overlay in other threads. let ptr: *mut u8 = map.as_ptr() as *mut u8; @@ -594,15 +654,33 @@ impl IndexTable { pub fn drop_file(self) -> Result<()> { drop(self.map); - try_io!(std::fs::remove_file(self.path.as_path())); + for i in 0.. { + let mut path = self.path_base.clone(); + path.push(self.id.file_name(self.max_chunks.is_some().then(|| i))); + match std::fs::remove_file(path.as_path()) { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => break, + Err(e) => return Err(Error::Io(e)), + Ok(()) => (), + }; + if self.max_chunks.is_none() { + break; + } + } log::debug!(target: "parity-db", "{}: Dropped table", self.id); Ok(()) } pub fn flush(&self) -> Result<()> { - if let Some(map) = &*self.map.read() { - // Flush everything except stats. - try_io!(map.flush_range(META_SIZE, map.len() - META_SIZE)); + let maps = self.map.read(); + let mut first = true; + for map in maps.iter() { + if first { + first = false; + // Flush everything except stats. + try_io!(map.flush_range(META_SIZE, map.len() - META_SIZE)); + } else { + try_io!(map.flush()); + } } Ok(()) } @@ -644,8 +722,9 @@ mod test { for index_bits in [16, 18, 20, 22] { let index_table = IndexTable { id: TableId(index_bits.into()), - map: RwLock::new(None), - path: PathBuf::new(), + map: RwLock::new(Vec::new()), + path_base: PathBuf::new(), + max_chunks: None, }; let data_address = Address::from_u64((1 << index_bits) - 1); @@ -674,8 +753,12 @@ mod test { #[test] fn test_find_any_entry() { - let table = - IndexTable { id: TableId(18), map: RwLock::new(None), path: Default::default() }; + let table = IndexTable { + id: TableId(18), + map: RwLock::new(Vec::new()), + path_base: Default::default(), + max_chunks: None, + }; let mut chunk = Chunk([0u8; CHUNK_LEN]); let mut entries = [Entry::empty(); CHUNK_ENTRIES]; let mut keys = [0u64; CHUNK_ENTRIES]; @@ -711,8 +794,12 @@ mod test { #[test] fn test_find_entry_same_value() { - let table = - IndexTable { id: TableId(18), map: RwLock::new(None), path: Default::default() }; + let table = IndexTable { + id: TableId(18), + map: RwLock::new(Vec::new()), + path_base: Default::default(), + max_chunks: None, + }; let mut chunk = Chunk([0u8; CHUNK_LEN]); let key = 0x4242424242424242; let partial_key = Entry::extract_key(key, 18); @@ -734,8 +821,12 @@ mod test { #[test] fn test_find_entry_zero_pk() { - let table = - IndexTable { id: TableId(16), map: RwLock::new(None), path: Default::default() }; + let table = IndexTable { + id: TableId(16), + map: RwLock::new(Vec::new()), + path_base: Default::default(), + max_chunks: None, + }; let mut chunk = Chunk([0u8; CHUNK_LEN]); let zero_key = 0x0000000000000000; let entry = Entry::new(Address::new(1, 1), zero_key, 16); diff --git a/src/options.rs b/src/options.rs index 7fefbf86..44594731 100644 --- a/src/options.rs +++ b/src/options.rs @@ -31,6 +31,9 @@ pub struct Options { pub sync_data: bool, /// Collect database statistics. May have effect on performance. pub stats: bool, + /// If define use multiple files with a size limit. May have effect on performance. + /// In megabytes. + pub max_file_size: Option, /// Override salt value. If `None` is specified salt is loaded from metadata /// or randomly generated when creating a new database. pub salt: Option, @@ -84,6 +87,8 @@ pub struct Metadata { pub version: u32, /// Column metadata. pub columns: Vec, + /// Maximum size for files in megabytes. + pub max_file_size: Option, } impl ColumnOptions { @@ -177,6 +182,7 @@ impl Options { sync_data: true, stats: true, salt: None, + max_file_size: None, columns: (0..num_columns).map(|_| Default::default()).collect(), compression_threshold: HashMap::new(), #[cfg(any(test, feature = "instrumentation"))] @@ -217,6 +223,9 @@ impl Options { format!("version={}", version.unwrap_or(CURRENT_VERSION)), format!("salt={}", hex::encode(salt)), ]; + if let Some(i) = self.max_file_size { + metadata.push(format!("max_file_size={}", hex::encode((i as u32).to_le_bytes()))); + } for i in 0..self.columns.len() { metadata.push(format!("col{}={}", i, self.columns[i].as_string())); } @@ -248,11 +257,22 @@ impl Options { }) } } + if meta.max_file_size != self.max_file_size { + return Err(Error::InvalidConfiguration(format!( + "Cannot change max file size from {:?} to {:?}", + meta.max_file_size, self.max_file_size + ))); + } Ok(meta) } else if create { let s: Salt = self.salt.unwrap_or_else(|| rand::thread_rng().gen()); self.write_metadata(&self.path, &s)?; - Ok(Metadata { version: CURRENT_VERSION, columns: self.columns.clone(), salt: s }) + Ok(Metadata { + version: CURRENT_VERSION, + columns: self.columns.clone(), + salt: s, + max_file_size: self.max_file_size, + }) } else { Err(Error::DatabaseNotFound) } @@ -272,6 +292,7 @@ impl Options { } let file = std::io::BufReader::new(try_io!(std::fs::File::open(path))); let mut salt = None; + let mut max_file_size = None; let mut columns = Vec::new(); let mut version = 0; for l in file.lines() { @@ -288,6 +309,12 @@ impl Options { let mut s = Salt::default(); s.copy_from_slice(&salt_slice); salt = Some(s); + } else if k == "max_file_size" { + let size_slice = + hex::decode(v).map_err(|_| Error::Corruption("Bad max file string".into()))?; + let mut m = [0u8; 4]; + m.copy_from_slice(&size_slice[..]); + max_file_size = Some(u32::from_le_bytes(m) as usize); } else if k.starts_with("col") { let col = ColumnOptions::from_string(v) .ok_or_else(|| Error::Corruption("Bad column metadata".into()))?; @@ -300,7 +327,7 @@ impl Options { ))) } let salt = salt.ok_or_else(|| Error::InvalidConfiguration("Missing salt value".into()))?; - Ok(Some(Metadata { version, columns, salt })) + Ok(Some(Metadata { version, columns, salt, max_file_size })) } pub fn is_valid(&self) -> bool { diff --git a/src/ref_count.rs b/src/ref_count.rs index b546d95e..4eb9dc46 100644 --- a/src/ref_count.rs +++ b/src/ref_count.rs @@ -21,6 +21,21 @@ pub const ENTRY_BYTES: usize = ENTRY_BITS as usize / 8; const EMPTY_CHUNK: Chunk = Chunk([0u8; CHUNK_LEN]); const EMPTY_ENTRIES: [Entry; CHUNK_ENTRIES] = [Entry::empty(); CHUNK_ENTRIES]; +#[allow(clippy::assertions_on_constants)] +const _: () = assert!(META_SIZE % CHUNK_LEN == 0); + +#[inline] +fn chunk_to_file_index(mut chunk_index: u64, max_chunks: Option) -> (u64, usize) { + // count meta in. + chunk_index += (META_SIZE / CHUNK_LEN) as u64; + + if let Some(i) = max_chunks { + (chunk_index % i, (chunk_index / i) as usize) + } else { + (chunk_index, 0) + } +} + #[repr(C, align(8))] #[derive(PartialEq, Eq, Clone, Debug)] pub struct Chunk(pub [u8; CHUNK_LEN]); @@ -65,8 +80,9 @@ impl Entry { #[derive(Debug)] pub struct RefCountTable { pub id: RefCountTableId, - map: RwLock>, - path: std::path::PathBuf, + map: RwLock>, + path_base: std::path::PathBuf, + max_chunks: Option, } fn total_entries(index_bits: u8) -> u64 { @@ -77,8 +93,10 @@ fn total_chunks(index_bits: u8) -> u64 { 1u64 << index_bits } -fn file_size(index_bits: u8) -> u64 { - total_entries(index_bits) * ENTRY_BYTES as u64 + META_SIZE as u64 +fn file_size(index_bits: u8, max_chunks: Option) -> u64 { + let max_size = max_chunks.map(|c| c * CHUNK_LEN as u64); + let total = total_entries(index_bits) * ENTRY_BYTES as u64 + META_SIZE as u64; + max_size.map(|m| std::cmp::min(m, total)).unwrap_or(total) } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] @@ -101,8 +119,12 @@ impl RefCountTableId { (self.0 & 0xff) as u8 } - pub fn file_name(&self) -> String { - format!("refcount_{:02}_{}", self.col(), self.index_bits()) + pub fn file_name(&self, file_index: Option) -> String { + if let Some(i) = file_index { + format!("refcount_{:02}_{}_{:08x}", self.col(), self.index_bits(), i) + } else { + format!("refcount_{:02}_{}", self.col(), self.index_bits()) + } } pub fn is_file_name(col: ColId, name: &str) -> bool { @@ -142,31 +164,54 @@ impl RefCountTable { pub fn open_existing( path: &std::path::Path, id: RefCountTableId, + max_size: Option, ) -> Result> { - let mut path: std::path::PathBuf = path.into(); - path.push(id.file_name()); - - let file = match std::fs::OpenOptions::new().read(true).write(true).open(path.as_path()) { - Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), - Err(e) => return Err(Error::Io(e)), - Ok(file) => file, - }; - - try_io!(file.set_len(file_size(id.index_bits()))); - let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); - madvise_random(&mut map); - log::debug!(target: "parity-db", "Opened existing refcount table {}", id); - Ok(Some(RefCountTable { id, path, map: RwLock::new(Some(map)) })) + let path_base: std::path::PathBuf = path.into(); + let mut maps = Vec::new(); + let max_chunks = max_size.map(|s| (s * 1024 * 1024 / CHUNK_LEN) as u64); + for i in 0.. { + let mut path = path_base.clone(); + path.push(id.file_name(max_size.is_some().then(|| i))); + + let file = match std::fs::OpenOptions::new().read(true).write(true).open(path.as_path()) + { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => + if i == 0 { + return Ok(None) + } else { + break + }, + Err(e) => return Err(Error::Io(e)), + Ok(file) => file, + }; + + try_io!(file.set_len(file_size(id.index_bits(), max_chunks))); + let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); + madvise_random(&mut map); + log::debug!(target: "parity-db", "Opened existing refcount table {}", id); + maps.push(map); + if max_chunks.is_none() { + break; + } + } + Ok(Some(RefCountTable { id, path_base, map: RwLock::new(maps), max_chunks })) } - pub fn create_new(path: &std::path::Path, id: RefCountTableId) -> RefCountTable { - let mut path: std::path::PathBuf = path.into(); - path.push(id.file_name()); - RefCountTable { id, path, map: RwLock::new(None) } + pub fn create_new( + path: &std::path::Path, + id: RefCountTableId, + max_size: Option, + ) -> RefCountTable { + RefCountTable { + id, + path_base: path.into(), + map: RwLock::new(Vec::new()), + max_chunks: max_size.map(|s| (s * 1024 * 1024 / CHUNK_LEN) as u64), + } } fn chunk_at(index: u64, map: &memmap2::MmapMut) -> Result<&Chunk> { - let offset = META_SIZE + index as usize * CHUNK_LEN; + let offset = index as usize * CHUNK_LEN; let ptr = unsafe { &*(map[offset..offset + CHUNK_LEN].as_ptr() as *const Chunk) }; Ok(try_io!(Ok(ptr))) } @@ -196,7 +241,8 @@ impl RefCountTable { return Ok(entry.map(|(e, sub_index)| (e.ref_count(), sub_index))) } - if let Some(map) = &*self.map.read() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = self.map.read().get(file_index) { log::trace!(target: "parity-db", "{}: Querying ref count chunk at {}", self.id, chunk_index); let chunk = Self::chunk_at(chunk_index, map)?; return Ok(self @@ -212,7 +258,8 @@ impl RefCountTable { { return Ok(entry) } - if let Some(map) = &*self.map.read() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = self.map.read().get(file_index) { let chunk = Self::chunk_at(chunk_index, map)?; return Ok(*Self::transmute_chunk(chunk)) } @@ -220,7 +267,8 @@ impl RefCountTable { } pub fn table_entries(&self, chunk_index: u64) -> Result<[Entry; CHUNK_ENTRIES]> { - if let Some(map) = &*self.map.read() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = self.map.read().get(file_index) { let chunk = Self::chunk_at(chunk_index, map)?; return Ok(*Self::transmute_chunk(chunk)) } @@ -299,7 +347,8 @@ impl RefCountTable { return self.plan_insert_chunk(address, ref_count, chunk, sub_index, log) } - if let Some(map) = &*self.map.read() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = self.map.read().get(file_index) { let chunk = Self::chunk_at(chunk_index, map)?.clone(); return self.plan_insert_chunk(address, ref_count, chunk, sub_index, log) } @@ -343,7 +392,8 @@ impl RefCountTable { return self.plan_remove_chunk(address, chunk, sub_index, log) } - if let Some(map) = &*self.map.read() { + let (chunk_index, file_index) = chunk_to_file_index(chunk_index, self.max_chunks); + if let Some(map) = self.map.read().get(file_index) { let chunk = Self::chunk_at(chunk_index, map)?.clone(); return self.plan_remove_chunk(address, chunk, sub_index, log) } @@ -354,23 +404,29 @@ impl RefCountTable { pub fn enact_plan(&self, index: u64, log: &mut LogReader) -> Result<()> { let mut map = self.map.upgradable_read(); - if map.is_none() { + let (chunk_index, file_index) = chunk_to_file_index(index, self.max_chunks); + let map_len = map.len(); + if map_len <= file_index { let mut wmap = RwLockUpgradableReadGuard::upgrade(map); - let file = try_io!(std::fs::OpenOptions::new() - .write(true) - .read(true) - .create_new(true) - .open(self.path.as_path())); - log::debug!(target: "parity-db", "Created new ref count {}", self.id); - try_io!(file.set_len(file_size(self.id.index_bits()))); - let mut mmap = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); - madvise_random(&mut mmap); - *wmap = Some(mmap); + for i in map_len..file_index + 1 { + let mut path = self.path_base.clone(); + path.push(self.id.file_name(self.max_chunks.is_some().then(|| i as u32))); + let file = try_io!(std::fs::OpenOptions::new() + .write(true) + .read(true) + .create_new(true) + .open(path.as_path())); + log::debug!(target: "parity-db", "Created new ref count {}", self.id); + try_io!(file.set_len(file_size(self.id.index_bits(), self.max_chunks))); + let mut mmap = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); + madvise_random(&mut mmap); + wmap.push(mmap); + } map = RwLockWriteGuard::downgrade_to_upgradable(wmap); } - let map = map.as_ref().unwrap(); - let offset = META_SIZE + index as usize * CHUNK_LEN; + let map = map.get(file_index).unwrap(); + let offset = chunk_index as usize * CHUNK_LEN; // Nasty mutable pointer cast. We do ensure that all chunks that are being written are // accessed through the overlay in other threads. let ptr: *mut u8 = map.as_ptr() as *mut u8; @@ -424,15 +480,33 @@ impl RefCountTable { pub fn drop_file(self) -> Result<()> { drop(self.map); - try_io!(std::fs::remove_file(self.path.as_path())); + for i in 0.. { + let mut path = self.path_base.clone(); + path.push(self.id.file_name(self.max_chunks.is_some().then(|| i))); + match std::fs::remove_file(path.as_path()) { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => break, + Err(e) => return Err(Error::Io(e)), + Ok(()) => (), + }; + if self.max_chunks.is_none() { + break; + } + } log::debug!(target: "parity-db", "{}: Dropped ref count table", self.id); Ok(()) } pub fn flush(&self) -> Result<()> { - if let Some(map) = &*self.map.read() { - // Flush everything except stats. - try_io!(map.flush_range(META_SIZE, map.len() - META_SIZE)); + let maps = self.map.read(); + let mut first = true; + for map in maps.iter() { + if first { + first = false; + // Flush everything except stats. + try_io!(map.flush_range(META_SIZE, map.len() - META_SIZE)); + } else { + try_io!(map.flush()); + } } Ok(()) } @@ -471,8 +545,9 @@ mod test { fn test_find_any_entry() { let table = RefCountTable { id: RefCountTableId(18), - map: RwLock::new(None), - path: Default::default(), + map: RwLock::new(Vec::new()), + path_base: Default::default(), + max_chunks: None, }; let mut chunk = Chunk([0u8; CHUNK_LEN]); let mut entries = [Entry::empty(); CHUNK_ENTRIES]; @@ -499,8 +574,9 @@ mod test { fn test_find_entry_zero() { let table = RefCountTable { id: RefCountTableId(16), - map: RwLock::new(None), - path: Default::default(), + map: RwLock::new(Vec::new()), + path_base: Default::default(), + max_chunks: None, }; let mut chunk = Chunk([0u8; CHUNK_LEN]); let address = Address::new(1, 1); diff --git a/src/table.rs b/src/table.rs index b493c71c..d22461c5 100644 --- a/src/table.rs +++ b/src/table.rs @@ -105,8 +105,12 @@ impl TableId { (self.0 & 0xff) as u8 } - pub fn file_name(&self) -> String { - format!("table_{:02}_{}", self.col(), hex(&[self.size_tier()])) + pub fn file_name(&self, file_index: Option) -> String { + if let Some(i) = file_index { + format!("table_{:02}_{}_{:08x}", self.col(), hex(&[self.size_tier()]), i) + } else { + format!("table_{:02}_{}", self.col(), hex(&[self.size_tier()])) + } } pub fn is_file_name(col: ColId, name: &str) -> bool { @@ -410,6 +414,7 @@ impl ValueTable { entry_size: Option, options: &Options, db_version: u32, + max_file_size: Option, ) -> Result { let (multipart, entry_size) = match entry_size { Some(s) => (false, s), @@ -418,12 +423,11 @@ impl ValueTable { assert!(entry_size >= MIN_ENTRY_SIZE as u16); assert!(entry_size <= MAX_ENTRY_SIZE as u16); - let mut filepath: std::path::PathBuf = std::path::PathBuf::clone(&*path); - filepath.push(id.file_name()); - let file = crate::file::TableFile::open(filepath, entry_size, id)?; + let filepath: std::path::PathBuf = std::path::PathBuf::clone(&*path); + let file = crate::file::TableFile::open(filepath, entry_size, id, max_file_size)?; let mut filled = 1; let mut last_removed = 0; - if file.map.read().is_some() { + if !file.maps.read().is_empty() { let mut header = Header::default(); file.read_at(&mut header.0, 0)?; last_removed = header.last_removed(); @@ -1106,7 +1110,7 @@ impl ValueTable { } pub fn refresh_metadata(&self) -> Result<()> { - if self.file.map.read().is_none() { + if self.file.maps.read().is_empty() { return Ok(()) } let _free_entries_guard = if let Some(free_entries) = &self.free_entries { @@ -1191,12 +1195,12 @@ impl ValueTable { } pub fn is_init(&self) -> bool { - self.file.map.read().is_some() + !self.file.maps.read().is_empty() } pub fn init_with_entry(&self, entry: &[u8]) -> Result<()> { if let Err(e) = self.do_init_with_entry(entry) { - log::error!(target: "parity-db", "Failure to initialize file {}", self.file.path.display()); + log::error!(target: "parity-db", "Failure to initialize file {}", self.file.path_base.display()); let _ = self.file.remove(); // We ignore error here return Err(e) } @@ -1358,8 +1362,15 @@ mod test { fn new_table(dir: &TempDir, size: Option, options: &ColumnOptions) -> ValueTable { let id = TableId::new(0, 0); - ValueTable::open(Arc::new(dir.path().to_path_buf()), id, size, options, CURRENT_VERSION) - .unwrap() + ValueTable::open( + Arc::new(dir.path().to_path_buf()), + id, + size, + options, + CURRENT_VERSION, + None, + ) + .unwrap() } fn new_log(dir: &TempDir) -> Log {