diff --git a/Cargo.lock b/Cargo.lock index ba093d779f22f..9a026404d86d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1644,10 +1644,13 @@ name = "aptos-experimental-layered-map" version = "0.1.0" dependencies = [ "ahash 0.8.11", + "anyhow", "aptos-crypto", "aptos-drop-helper", "aptos-infallible", "aptos-metrics-core", + "aptos-schemadb", + "aptos-temppath", "bitvec 1.0.1", "criterion", "itertools 0.13.0", @@ -1655,6 +1658,7 @@ dependencies = [ "once_cell", "proptest", "rand 0.7.3", + "rocksdb", ] [[package]] diff --git a/experimental/storage/layered-map/Cargo.toml b/experimental/storage/layered-map/Cargo.toml index 0bed1c63fb8ae..9d818453317c2 100644 --- a/experimental/storage/layered-map/Cargo.toml +++ b/experimental/storage/layered-map/Cargo.toml @@ -23,11 +23,18 @@ itertools = { workspace = true } once_cell = { workspace = true } [dev-dependencies] +anyhow = { workspace = true } +aptos-schemadb = { workspace = true } +aptos-temppath = { workspace = true } criterion = { workspace = true } itertools = { workspace = true } jemallocator = { workspace = true } proptest = { workspace = true } rand = { workspace = true } +rocksdb = { workspace = true } + +[lib] +bench = false [[bench]] name = "sorting" diff --git a/experimental/storage/layered-map/benches/maps.rs b/experimental/storage/layered-map/benches/maps.rs index bb61742e1db6e..d2500a64f79e8 100644 --- a/experimental/storage/layered-map/benches/maps.rs +++ b/experimental/storage/layered-map/benches/maps.rs @@ -1,12 +1,21 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use anyhow::Result; use aptos_crypto::HashValue; -use aptos_experimental_layered_map::MapLayer; +use aptos_experimental_layered_map::{LayeredMap, MapLayer}; +use aptos_schemadb::{ + batch::WriteBatch, + define_schema, + schema::{KeyCodec, ValueCodec}, + DB, DEFAULT_COLUMN_FAMILY_NAME, +}; +use aptos_temppath::TempPath; use criterion::{ criterion_group, criterion_main, measurement::WallTime, BatchSize, BenchmarkGroup, Criterion, }; use itertools::Itertools; +use once_cell::sync::OnceCell; use rand::random; use std::{ collections::{BTreeMap, HashMap}, @@ -20,27 +29,75 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; type Key = u128; type Value = HashValue; +define_schema!(DbMap, Key, Value, DEFAULT_COLUMN_FAMILY_NAME); + +impl KeyCodec for Key { + fn encode_key(&self) -> Result> { + Ok(self.to_be_bytes().to_vec()) + } + + fn decode_key(data: &[u8]) -> Result { + assert_eq!(data.len(), 16); + + let mut buffer = [0u8; 16]; + buffer.copy_from_slice(data); + Ok(Self::from_be_bytes(buffer)) + } +} + +impl ValueCodec for Value { + fn encode_value(&self) -> Result> { + Ok(self.to_vec()) + } + + fn decode_value(data: &[u8]) -> Result { + assert_eq!(data.len(), 32); + Ok(Self::from_slice(data)?) + } +} + const K: usize = 1024; -fn gen_update_batches(batch_size_k: usize, n_batches: usize) -> Vec> { - repeat_with(|| { - repeat_with(|| (random(), random())) - .take(batch_size_k * K) - .collect_vec() - }) - .take(n_batches) - .collect_vec() +type BatchCache = HashMap>>>; + +fn gen_update_batches( + cache: &mut BatchCache, + batch_size_k: usize, + n_batches: usize, +) -> &Vec> { + cache + .entry(batch_size_k) + .or_default() + .entry(n_batches) + .or_insert_with(|| { + println!(); + println!(" Generating batch. {batch_size_k}k per batch, {n_batches} batches."); + let timer = std::time::Instant::now(); + let ret = repeat_with(|| { + repeat_with(|| (random(), random())) + .take(batch_size_k * K) + .collect_vec() + }) + .take(n_batches) + .collect_vec(); + println!(" Done in {} secs.", timer.elapsed().as_secs()); + ret + }) } -fn insert_in_batches(group: &mut BenchmarkGroup, batch_size_k: usize, n_batches: usize) { - let batches = gen_update_batches(batch_size_k, n_batches); +fn insert_in_batches( + group: &mut BenchmarkGroup, + cache: &mut BatchCache, + batch_size_k: usize, + n_batches: usize, +) { let total_updates = (batch_size_k * K * n_batches) as u64; group.throughput(criterion::Throughput::Elements(total_updates)); let name = format!("hash_map_{n_batches}_batches_of_{batch_size_k}k_updates"); group.bench_function(&name, |b| { b.iter_batched( - || batches.clone(), + || gen_update_batches(cache, batch_size_k, n_batches).clone(), |batches| { let mut map = HashMap::new(); for batch in batches { @@ -55,7 +112,7 @@ fn insert_in_batches(group: &mut BenchmarkGroup, batch_size_k: usize, let name = format!("btree_map_{n_batches}_batches_of_{batch_size_k}k_updates"); group.bench_function(&name, |b| { b.iter_batched( - || batches.clone(), + || gen_update_batches(cache, batch_size_k, n_batches).clone(), |batches| { let mut map = BTreeMap::new(); for batch in batches { @@ -70,7 +127,7 @@ fn insert_in_batches(group: &mut BenchmarkGroup, batch_size_k: usize, let name = format!("layered_map_{n_batches}_batches_of_{batch_size_k}k_updates"); group.bench_function(&name, |b| { b.iter_batched( - || batches.clone(), + || gen_update_batches(cache, batch_size_k, n_batches).clone(), |batches| { let root_layer = MapLayer::new_family("bench"); let mut latest_layer = root_layer.clone(); @@ -88,88 +145,226 @@ fn insert_in_batches(group: &mut BenchmarkGroup, batch_size_k: usize, fn get( group: &mut BenchmarkGroup, + batch_cache: &mut BatchCache, + keys_cache: &mut KeysCache, map_size_k: usize, - items: &[(Key, Value)], - keys_to_get: &[Key], ) { - assert_eq!(map_size_k * K, items.len()); - group.throughput(criterion::Throughput::Elements(keys_to_get.len() as u64)); + let n_keys_to_get = map_size_k.min(10) * K; - let name = format!("hash_map_{map_size_k}k_items"); - let map: HashMap = items.iter().cloned().collect(); - group.bench_function(&name, |b| { - b.iter_batched( - || (), - |_| keys_to_get.iter().map(|key| map.get(key)).collect_vec(), - BatchSize::SmallInput, - ) - }); + group.throughput(criterion::Throughput::Elements(n_keys_to_get as u64)); - let name = format!("btree_map_{map_size_k}k_items"); - let map: BTreeMap = items.iter().cloned().collect(); - group.bench_function(&name, |b| { - b.iter_batched( - || (), - |_| keys_to_get.iter().map(|key| map.get(key)).collect_vec(), - BatchSize::SmallInput, - ) - }); + get_hash_map(group, batch_cache, keys_cache, map_size_k, n_keys_to_get); - let name = format!("layered_map_{map_size_k}k_items"); - let root_layer = MapLayer::new_family("bench"); - let top_layer = root_layer.view_layers_after(&root_layer).new_layer(items); - let map = top_layer.into_layers_view_after(root_layer); - group.bench_function(&name, |b| { - b.iter_batched( - || (), - |_| keys_to_get.iter().map(|key| map.get(key)).collect_vec(), - BatchSize::SmallInput, - ) - }); + get_btree_map(group, batch_cache, keys_cache, map_size_k, n_keys_to_get); + + get_layered_map(group, batch_cache, keys_cache, map_size_k, n_keys_to_get); + + get_rocksdb(group, batch_cache, keys_cache, map_size_k, n_keys_to_get); } -fn get_existing(group: &mut BenchmarkGroup, map_size_k: usize) { - let items = gen_update_batches(map_size_k, 1).pop().unwrap(); - let num_keys_to_get = map_size_k.min(10) * K; - let keys_to_get = items - .iter() - .map(|(key, _v)| *key) - .take(num_keys_to_get) - .collect_vec(); - group.throughput(criterion::Throughput::Elements(num_keys_to_get as u64)); - - get(group, map_size_k, &items, &keys_to_get); +fn get_hash_map( + group: &mut BenchmarkGroup, + batch_cache: &mut BatchCache, + keys_cache: &mut KeysCache, + map_size_k: usize, + n_keys_to_get: usize, +) { + let map: OnceCell> = OnceCell::new(); + + for (prefix, existing) in [("existing", true), ("non_existing", false)] { + let name = format!("{prefix}/hash_map_{map_size_k}k_items"); + let keys: OnceCell> = OnceCell::new(); + + group.bench_function(&name, |b| { + b.iter_batched( + || { + let (items, keys_) = + gen_get(batch_cache, keys_cache, map_size_k, n_keys_to_get, existing); + let map = map.get_or_init(|| items.iter().cloned().collect()); + let keys = keys.get_or_init(|| keys_.clone()); + (map, keys) + }, + |(map, keys)| keys.iter().map(|key| map.get(key)).collect_vec(), + BatchSize::SmallInput, + ) + }); + } } -fn get_non_existing(group: &mut BenchmarkGroup, map_size_k: usize) { - let items = gen_update_batches(map_size_k, 1).pop().unwrap(); - let num_keys_to_get = map_size_k.min(10) * K; - let keys_to_get = (0..num_keys_to_get).map(|_| random()).collect_vec(); +fn get_btree_map( + group: &mut BenchmarkGroup, + batch_cache: &mut BatchCache, + keys_cache: &mut KeysCache, + map_size_k: usize, + n_keys_to_get: usize, +) { + let map: OnceCell> = OnceCell::new(); + + for (prefix, existing) in [("existing", true), ("non_existing", false)] { + let name = format!("{prefix}/btree_map_{map_size_k}k_items"); + let keys: OnceCell> = OnceCell::new(); + + group.bench_function(&name, |b| { + b.iter_batched( + || { + let (items, keys_) = + gen_get(batch_cache, keys_cache, map_size_k, n_keys_to_get, existing); + let map = map.get_or_init(|| items.iter().cloned().collect()); + let keys = keys.get_or_init(|| keys_.clone()); + (map, keys) + }, + |(map, keys)| keys.iter().map(|key| map.get(key)).collect_vec(), + BatchSize::SmallInput, + ) + }); + } +} + +fn get_layered_map( + group: &mut BenchmarkGroup, + batch_cache: &mut BatchCache, + keys_cache: &mut KeysCache, + map_size_k: usize, + n_keys_to_get: usize, +) { + let map: OnceCell> = OnceCell::new(); + for (prefix, existing) in [("existing", true), ("non_existing", false)] { + let name = format!("{prefix}/layered_map_{map_size_k}k_items"); + let keys: OnceCell> = OnceCell::new(); + + group.bench_function(&name, |b| { + b.iter_batched( + || { + let (items, keys_) = + gen_get(batch_cache, keys_cache, map_size_k, n_keys_to_get, existing); + let map = map.get_or_init(|| { + let root_layer = MapLayer::new_family("bench"); + let top_layer = root_layer.view_layers_after(&root_layer).new_layer(items); + top_layer.into_layers_view_after(root_layer) + }); + let keys = keys.get_or_init(|| keys_.clone()); + (map, keys) + }, + |(map, keys)| keys.iter().map(|key| map.get(key)).collect_vec(), + BatchSize::SmallInput, + ) + }); + } +} + +fn get_rocksdb( + group: &mut BenchmarkGroup, + batch_cache: &mut BatchCache, + keys_cache: &mut KeysCache, + map_size_k: usize, + n_keys_to_get: usize, +) { + let map: OnceCell = OnceCell::new(); + + for (prefix, existing) in [("existing", true), ("non_existing", false)] { + let name = format!("{prefix}/rocksdb_{map_size_k}k_items"); + let keys: OnceCell> = OnceCell::new(); + group.bench_function(&name, |b| { + b.iter_batched( + || { + let (items, keys_) = + gen_get(batch_cache, keys_cache, map_size_k, n_keys_to_get, existing); + let keys = keys.get_or_init(|| keys_.clone()); + let map = map.get_or_init(|| { + println!(); + println!(" Opening rocksdb."); + let mut db_path = TempPath::new(); + db_path.persist(); // leak the temp dir - get(group, map_size_k, &items, &keys_to_get); + let mut options = rocksdb::Options::default(); + options.create_if_missing(true); + + let db = DB::open( + &db_path, + "bench", + vec![DEFAULT_COLUMN_FAMILY_NAME], + &options, + ) + .unwrap(); + + let mut total = 0; + items.iter().chunks(1_000_000).into_iter().for_each(|kvs| { + let mut batch = db.new_native_batch(); + total += kvs + .map(|(key, value)| batch.put::(key, value).unwrap()) + .count(); + db.write_schemas(batch).unwrap(); + + println!(" Inserted {total}."); + }); + + println!(" Flushing mem-tables."); + db.flush_cf(DEFAULT_COLUMN_FAMILY_NAME).unwrap(); + + println!(" Preheat all keys to be queried."); + keys.iter().for_each(|key| { + db.get::(key).unwrap(); + }); + + println!(" RocksDB ready."); + db + }); + + (map, keys) + }, + |(map, keys)| { + keys.iter() + .map(|key| map.get::(key).unwrap()) + .collect_vec() + }, + BatchSize::SmallInput, + ) + }); + } +} + +type KeysCache = HashMap>>; + +fn gen_get<'a>( + batch_cache: &'a mut BatchCache, + keys_cache: &'a mut KeysCache, + map_size_k: usize, + n_keys_to_get: usize, + existing: bool, +) -> (&'a Vec<(Key, Value)>, &'a Vec) { + let items = &gen_update_batches(batch_cache, map_size_k, 1)[0]; + let keys = keys_cache + .entry(map_size_k) + .or_default() + .entry(existing) + .or_insert_with(|| { + if existing { + items.iter().map(|(k, _v)| *k).take(n_keys_to_get).collect() + } else { + repeat_with(random).take(n_keys_to_get).collect() + } + }); + + (items, keys) } fn compare_maps(c: &mut Criterion) { + let mut batch_cache = BatchCache::default(); + let mut keys_cache = KeysCache::default(); + { let mut group = c.benchmark_group("insert_in_batches"); for batch_size_k in [1, 10, 100] { for n_batches in [1, 8] { - insert_in_batches(&mut group, batch_size_k, n_batches); + insert_in_batches(&mut group, &mut batch_cache, batch_size_k, n_batches); } } } { - let mut group = c.benchmark_group("get_existing"); - for map_size_k in [100, 1000, 128_000] { - get_existing(&mut group, map_size_k); - } - } - - { - let mut group = c.benchmark_group("get_non_existing"); - for map_size_k in [100, 1000, 128_000] { - get_non_existing(&mut group, map_size_k); + let mut group = c.benchmark_group("get"); + for map_size_k in [100, 1000, 4_000, 16_000, 128_000] { + get(&mut group, &mut batch_cache, &mut keys_cache, map_size_k); } } }