From b5443b27c4a8305b7c790c89d00bd292f9619dd7 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 2 Dec 2024 11:40:43 +0100 Subject: [PATCH] [Turbopack] Custom persistence layer (#73029) This adds a new custom database implementation for Persistent Caching. Why is this better than an existing database? It can be implemented especially for our use case and can come with assumptions and restrictions that other database can't have. What is special about our use case? * We only do one write at a time, but in a single very large transaction that can potentially push GBs into the database. * We want to fill that large transaction from multiple threads. * The initial cold build is very important from performance perspective. How do we tackle that? * We only allow a single WriteBatch at a time, but we start writing to disk while filling that write batch. * When we commit the WriteBatch we write a sequence number to make these writes visible * Once written and committed, files are immutable (but they can be deleted) * Every WriteBatch writes additional files that logically override the values from earlier files. (Deletions are stored as tombstones) * When the average files to read reaches a threshold we do a compaction. * A compaction runs a merge on multiple files to create new sorted files. This reduces that metric. * We limit the number of merged files to avoid long compactions. * In every file we store a key range, an AMQF to quickly find out if a key can be in that file. The false positive rate per file is 0.1%. * When we need to lookup a key in a file we do a binary search as keys are stored in sorted order (sorted by their hash). * Files are split into blocks that are stored compressed with lz4 with a two shared compression dictionary per file (one for keys and one for values) * We have an additional index block to find the right key block without a search. * We support multiple key families to split the database for different kinds of data. * Depending on the size of the value it will be stored: 1. in a block with other small values, 2. in it's own block, 3. in a separate file. * We have a block cache to cache decompressed blocks. * We have a AMQF to cache deserialized filters * Files are memory mapped for reading to leverage OS cache and memory See more details in the added README.md file. --- Cargo.lock | 133 ++- Cargo.toml | 1 + turbopack/crates/turbo-persistence/Cargo.toml | 33 + turbopack/crates/turbo-persistence/README.md | 251 +++++ .../crates/turbo-persistence/src/arc_slice.rs | 93 ++ .../crates/turbo-persistence/src/collector.rs | 113 +++ .../turbo-persistence/src/collector_entry.rs | 85 ++ .../turbo-persistence/src/compaction/mod.rs | 1 + .../src/compaction/selector.rs | 371 +++++++ .../crates/turbo-persistence/src/constants.rs | 34 + turbopack/crates/turbo-persistence/src/db.rs | 910 ++++++++++++++++++ turbopack/crates/turbo-persistence/src/key.rs | 205 ++++ turbopack/crates/turbo-persistence/src/lib.rs | 24 + .../turbo-persistence/src/lookup_entry.rs | 66 ++ .../turbo-persistence/src/merge_iter.rs | 79 ++ .../src/static_sorted_file.rs | 717 ++++++++++++++ .../src/static_sorted_file_builder.rs | 532 ++++++++++ .../crates/turbo-persistence/src/tests.rs | 347 +++++++ .../turbo-persistence/src/write_batch.rs | 296 ++++++ .../crates/turbo-tasks-backend/Cargo.toml | 4 +- .../turbo-tasks-backend/src/backend/mod.rs | 10 + .../src/backing_storage.rs | 4 + .../src/database/key_value_database.rs | 4 + .../turbo-tasks-backend/src/database/mod.rs | 15 +- .../turbo-tasks-backend/src/database/turbo.rs | 147 +++ .../src/kv_backing_storage.rs | 4 + .../crates/turbo-tasks-backend/src/lib.rs | 35 +- 27 files changed, 4471 insertions(+), 43 deletions(-) create mode 100644 turbopack/crates/turbo-persistence/Cargo.toml create mode 100644 turbopack/crates/turbo-persistence/README.md create mode 100644 turbopack/crates/turbo-persistence/src/arc_slice.rs create mode 100644 turbopack/crates/turbo-persistence/src/collector.rs create mode 100644 turbopack/crates/turbo-persistence/src/collector_entry.rs create mode 100644 turbopack/crates/turbo-persistence/src/compaction/mod.rs create mode 100644 turbopack/crates/turbo-persistence/src/compaction/selector.rs create mode 100644 turbopack/crates/turbo-persistence/src/constants.rs create mode 100644 turbopack/crates/turbo-persistence/src/db.rs create mode 100644 turbopack/crates/turbo-persistence/src/key.rs create mode 100644 turbopack/crates/turbo-persistence/src/lib.rs create mode 100644 turbopack/crates/turbo-persistence/src/lookup_entry.rs create mode 100644 turbopack/crates/turbo-persistence/src/merge_iter.rs create mode 100644 turbopack/crates/turbo-persistence/src/static_sorted_file.rs create mode 100644 turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs create mode 100644 turbopack/crates/turbo-persistence/src/tests.rs create mode 100644 turbopack/crates/turbo-persistence/src/write_batch.rs create mode 100644 turbopack/crates/turbo-tasks-backend/src/database/turbo.rs diff --git a/Cargo.lock b/Cargo.lock index 222d9cbed901d..29cdeb27ed348 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,7 +340,7 @@ checksum = "d7ebdfa2ebdab6b1760375fa7d6f382b9f486eac35fc994625a00e89280bdbb7" dependencies = [ "async-task", "concurrent-queue", - "fastrand 2.0.0", + "fastrand 2.2.0", "futures-lite 2.3.0", "slab", ] @@ -393,7 +393,7 @@ dependencies = [ "futures-lite 2.3.0", "parking", "polling 3.7.2", - "rustix 0.38.31", + "rustix 0.38.41", "slab", "tracing", "windows-sys 0.52.0", @@ -441,7 +441,7 @@ dependencies = [ "cfg-if", "event-listener 3.1.0", "futures-lite 1.13.0", - "rustix 0.38.31", + "rustix 0.38.41", "windows-sys 0.48.0", ] @@ -457,7 +457,7 @@ dependencies = [ "cfg-if", "futures-core", "futures-io", - "rustix 0.38.31", + "rustix 0.38.41", "signal-hook-registry", "slab", "windows-sys 0.52.0", @@ -2148,9 +2148,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" [[package]] name = "fdeflate" @@ -2320,7 +2320,7 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" dependencies = [ - "fastrand 2.0.0", + "fastrand 2.2.0", "futures-core", "futures-io", "parking", @@ -3343,9 +3343,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.155" +version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" [[package]] name = "libfuzzer-sys" @@ -3479,9 +3479,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "linux-raw-sys" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" [[package]] name = "litrs" @@ -3594,7 +3594,16 @@ version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" dependencies = [ - "twox-hash", + "twox-hash 1.6.3", +] + +[[package]] +name = "lzzzz" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac94cca0c9c2ac03c63092f1377df5b83e4c35441f9d83a53ca214c58685f7bd" +dependencies = [ + "cc", ] [[package]] @@ -3695,6 +3704,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memmap2" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.7.1" @@ -4711,7 +4729,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1d5c74c9876f070d3e8fd503d748c7d974c3e48da8f41350fa5222ef9b4391" dependencies = [ "atomic-waker", - "fastrand 2.0.0", + "fastrand 2.2.0", "futures-io", ] @@ -4799,7 +4817,7 @@ dependencies = [ "concurrent-queue", "hermit-abi 0.4.0", "pin-project-lite", - "rustix 0.38.31", + "rustix 0.38.41", "tracing", "windows-sys 0.52.0", ] @@ -5015,6 +5033,17 @@ dependencies = [ "unicase", ] +[[package]] +name = "qfilter" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b36883275f761fe4c69f0ba982d18b36208b72d647ad9d468afcad70fb08a4e" +dependencies = [ + "serde", + "serde_bytes", + "xxhash-rust", +] + [[package]] name = "qstring" version = "0.7.2" @@ -5030,6 +5059,18 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quick_cache" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d7c94f8935a9df96bb6380e8592c70edf497a643f94bd23b2f76b399385dbf4" +dependencies = [ + "ahash 0.8.11", + "equivalent", + "hashbrown 0.14.5", + "parking_lot", +] + [[package]] name = "quote" version = "1.0.36" @@ -5488,14 +5529,14 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.31" +version = "0.38.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" dependencies = [ "bitflags 2.5.0", "errno", "libc", - "linux-raw-sys 0.4.13", + "linux-raw-sys 0.4.14", "windows-sys 0.52.0", ] @@ -7700,15 +7741,15 @@ checksum = "8ae9980cab1db3fceee2f6c6f643d5d8de2997c58ee8d25fb0cc8a9e9e7348e5" [[package]] name = "tempfile" -version = "3.8.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", - "fastrand 2.0.0", - "redox_syscall", - "rustix 0.38.31", - "windows-sys 0.48.0", + "fastrand 2.2.0", + "once_cell", + "rustix 0.38.41", + "windows-sys 0.59.0", ] [[package]] @@ -7726,7 +7767,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21bebf2b7c9e0a515f6e0f8c51dc0f8e4696391e6f1ff30379559f8365fb0df7" dependencies = [ - "rustix 0.38.31", + "rustix 0.38.41", "windows-sys 0.48.0", ] @@ -8313,6 +8354,28 @@ dependencies = [ "utf-8", ] +[[package]] +name = "turbo-persistence" +version = "0.1.0" +dependencies = [ + "anyhow", + "byteorder", + "lzzzz", + "memmap2 0.9.5", + "parking_lot", + "pot", + "qfilter", + "quick_cache", + "rand", + "rayon", + "rustc-hash 1.1.0", + "serde", + "tempfile", + "thread_local", + "twox-hash 2.0.1", + "zstd", +] + [[package]] name = "turbo-prehash" version = "0.1.0" @@ -8411,6 +8474,7 @@ dependencies = [ "tokio", "tokio-scoped", "tracing", + "turbo-persistence", "turbo-prehash", "turbo-rcstr", "turbo-tasks", @@ -8524,7 +8588,7 @@ name = "turbo-tasks-hash" version = "0.1.0" dependencies = [ "turbo-tasks-macros", - "twox-hash", + "twox-hash 1.6.3", ] [[package]] @@ -9234,6 +9298,15 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "twox-hash" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6db6856664807f43c17fbaf2718e2381ac1476a449aa104f5f64622defa1245" +dependencies = [ + "rand", +] + [[package]] name = "typed-arena" version = "2.0.2" @@ -10238,7 +10311,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix 0.38.31", + "rustix 0.38.41", ] [[package]] @@ -10676,8 +10749,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" dependencies = [ "libc", - "linux-raw-sys 0.4.13", - "rustix 0.38.31", + "linux-raw-sys 0.4.14", + "rustix 0.38.41", ] [[package]] @@ -10734,9 +10807,9 @@ dependencies = [ [[package]] name = "zstd" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" dependencies = [ "zstd-safe", ] diff --git a/Cargo.toml b/Cargo.toml index aed0d1881c6b4..b90470bdcc40e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ auto-hash-map = { path = "turbopack/crates/turbo-tasks-auto-hash-map" } swc-ast-explorer = { path = "turbopack/crates/turbopack-swc-ast-explorer" } turbo-prehash = { path = "turbopack/crates/turbo-prehash" } turbo-rcstr = { path = "turbopack/crates/turbo-rcstr" } +turbo-persistence = { path = "turbopack/crates/turbo-persistence" } turbo-tasks-malloc = { path = "turbopack/crates/turbo-tasks-malloc", default-features = false } turbo-tasks = { path = "turbopack/crates/turbo-tasks" } turbo-tasks-backend = { path = "turbopack/crates/turbo-tasks-backend" } diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml new file mode 100644 index 0000000000000..8783c62d456fb --- /dev/null +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "turbo-persistence" +version = "0.1.0" +edition = "2021" +license = "MIT" + +[features] +verify_sst_content = [] +strict_checks = [] +stats = ["quick_cache/stats"] + +[dependencies] +anyhow = { workspace = true } +pot = "3.0.0" +byteorder = "1.5.0" +lzzzz = "1.1.0" +memmap2 = "0.9.5" +parking_lot = { workspace = true } +qfilter = { version = "0.2.1", features = ["serde"] } +quick_cache = { version = "0.6.9" } +rayon = { workspace = true } +rustc-hash = { workspace = true } +serde = { workspace = true } +thread_local = { workspace = true } +twox-hash = { version = "2.0.1", features = ["xxhash64"] } +zstd = { version = "0.13.2", features = ["zdict_builder"] } + +[dev-dependencies] +rand = { workspace = true, features = ["small_rng"] } +tempfile = "3.14.0" + +[lints] +workspace = true diff --git a/turbopack/crates/turbo-persistence/README.md b/turbopack/crates/turbo-persistence/README.md new file mode 100644 index 0000000000000..dedd7ddd96a1c --- /dev/null +++ b/turbopack/crates/turbo-persistence/README.md @@ -0,0 +1,251 @@ +# turbo-persistence + +This crate provides a way to persist key value pairs into a folder and restore them later. + +The API only allows a single write transaction at a time, but multiple threads can fill the transaction with (non-conflicting) data concurrently. + +When pushing data into the WriteBatch it is already persisted to disk, but only becomes visible after the transaction is committed. On startup left-over uncommitted files on disk are automatically cleaned up. + +The architecture is optimized for pushing a lot data to disk in a single transaction, while still allowing for fast random reads. + +It supports having multiple key families, which are stored in separate files, but a write batch can contain keys from multiple families. Each key family defines a separate key space. Entries in different key families doesn't influence each other (also not performance-wise). + +## On disk format + +There is a single `CURRENT` file which stores the latest committed sequence number. + +All other files have a sequence number as file name, e. g. `0000123.sst`. All files are immutable once there sequence number is <= the committed sequence number. But they might be deleted when they are superseeded by other committed files. + +There are two different file types: + +* Static Sorted Table (SST, `*.sst`): These files contain key value pairs. +* Blob files (`*.blob`): These files contain large values. + +Therefore there are there value types: + +* INLINE: Small values that are stored directly in the `*.sst` files. +* BLOB: Large values that are stored in `*.blob` files. +* DELETED: Values that are deleted. (Tombstone) +* Future: + * MERGE: An application specific update operation that is applied on the old value. + +### SST file + +* Headers + * 4 bytes magic number and version + * 4 bytes key family + * 8 bytes min hash + * 8 bytes max hash + * 3 bytes AQMF length + * 2 bytes key Compression Dictionary length + * 2 bytes value Compression Dictionary length + * 2 bytes block count +* serialized AQMF +* serialized key Compression Dictionary +* serialized value Compression Dictionary +* foreach block + * 4 bytes end of block offset relative to start of all blocks +* foreach block + * 4 bytes uncompressed block length + * compressed data + +#### Index Block + +* 1 byte block type (0: index block) +* 2 bytes block index +* `n` times + * 8 bytes hash + * 2 bytes block index + +An Index block contains `n` 8 bytes hashes, which specify `n - 1` hash ranges (eq hash goes into the prev range, except for the first key). Between these `n` hashes there are `n - 1` 2 byte block indicies that point to the block that contains the hash range. + +The hashes are sorted. + +`n` is `(block size + 1) / 10` + +#### Key Block + +* 1 byte block type (1: key block) +* 3 bytes entry count +* foreach entry + * 1 byte type + * 3 bytes position in block after header +* Max block size: 16 MB + +A Key block contains n keys, which specify n key value pairs. + +Depending on the `type` field entry has a different format: +* 0: normal key (small value) + * 8 bytes key hash + * key data + * 2 byte block index + * 2 bytes size + * 4 bytes position in block +* 1: blob reference + * 8 bytes key hash + * key data + * 4 bytes sequence number +* 2: deleted key / tombstone (no data) + * 8 bytes key hash + * key data +* 3: normal key (medium sized value) + * 8 bytes key hash + * key data + * 2 byte block index +* 7: merge key (future) + * key data + * 2 byte block index + * 3 bytes size + * 4 bytes position in block +* 8..255: inlined key (future) + * 8 bytes key hash + * key data + * type - 8 bytes value data + +The entries are sorted by key hash and key. + +TODO: 8 bytes key hash is a bit inefficient for small keys. + +#### Value Block + +* no header, all bytes are data referenced by other blocks +* max block size: 4 GB + +### Blob file + +The plain value compressed with dynamic compression. + +## Reading + +Reading start from the current sequence number and goes downwards. + +* We have all SST files memory mapped +* for i = CURRENT sequence number .. 0 + * Check AQMF from SST file for key existance -> if not continue + * let block = 0 + * loop + * Index Block: find key range that contains the key by binary search + * found -> set block, continue + * not found -> break + * Key Block: find key by binary search + * found -> lookup value from value block, return + * not found -> break + +## Writing + +Writing starts by creating a new WriteBatch. It maintains an atomic counter of the next free sequence number. + +The WriteBatch has a thread local buffer that accumulates operations until a certain threshold is reached. Then the buffer is sorted and written to a new SST file (and maybe some blob files). + +When the WriteBatch is committed all thread local buffers are merged into a single global buffer and written into new SST files (potentially multiple when threshold is reached). + +fsync! The new sequence number is written to the `CURRENT` file. + +After that optimization might take place. + +## Compaction + +For compaction we compute the "coverage" of the SST files. The coverage is the average number of SST files that need to be touched to figure out that a key is missing. The coverage can be computed by looking at the min_hash and max_hash of the SST files only. + +For a single SST file we can compute `(max_hash - min_hash) / u64::MAX` as the coverage of the SST file. We sum up all these coverages to get the total coverage. + +Compaction chooses a few SST files and runs the merge step of merge sort on tham to create a few new SST files with sorted ranges. + +Example: + +``` +key hash range: | 0 ... u64::MAX | +SST 1: |----------------| +SST 2: |----------------| +SST 3: |-----| +``` + +can be compacted into: + +``` +key hash range: | 0 ... u64::MAX | +SST 1': |-------| +SST 2': |------| +SST 3': |-----| +``` + +The merge operation decreases the total coverage since the new SST files will have a coverage of < 1. + +But we need to be careful to insert the SST files in the correct location again, since items in these SST files might be overriden in later SST file and we don't want to change that. + +Since SST files that are smaller than the current sequence number are immutable we can't change the files and we can't insert new files at this sequence numbers. +Instead we need to insert the new SST after the current sequence number and copy all SST files after the original SST files after them. (Actually we only need to copy SST files with overlapping key hash ranges. And we can hardlink them instead). Later we will write the current sequence number and delete them original and all copied SST files. + +We can run multiple merge operations concurrently when the key hash ranges are not overlapping or they are from different key families. The copy operation need to be strictly after all merge operations. + +There must not be another SST file with overlapping key hash range between files of a merge operation. + +During the merge operation we eliminate duplicate keys. When blob references are eliminated we delete the blob file after the current sequence number was updated. + +Since the process might exit unexpectedly, to avoid "forgetting" to delete the SST files we keep track of that in a `*.del` file. This file contains the sequence number of SST and blob files that should be deleted. We write that file before the current sequence number is updated. On restart we execute the deletes again. + +We limit the number of SST files that are merged at once to avoid long compactions. + +Full example: + +Example: + +``` +key hash range: | 0 ... u64::MAX | Family +SST 1: |-| 1 +SST 2: |----------------| 1 +SST 3: |----------------| 1 +SST 4: |-----| 2 +SST 5: |-----| 2 +SST 6: |-------| 1 +SST 7: |-------| 1 +SST 8: |--------| 2 +SST 9: |--------| 2 +CURRENT: 9 +``` + +Compactions could selects SST 2, 3, 6 and SST 4, 5, 8 for merging (we limited to 3 SST files per merge operation). This also selects SST 7, 9 for copying. The current sequence number is 9. + +We merge SST 2, 3, 6 into new SST files 10, 12, 14 and SST 4, 5, 8 into new SST files 11, 13. Both operations are done concurrently so they might choose free sequence numbers in random order. The operation might result in less SST files due to duplicate keys. + +After that we copy SST files 7, 9 to new SST files 15, 16. + +We write a "del" file at sequence number 17. + +After that we write the new current sequence number 17. + +Then we delete SST files 2, 3, 6 and 4, 5, 8 and 7, 9. The + +SST files 1 stays unchanged. + +``` +key hash range: | 0 ... u64::MAX | Family +SST 1: |-| 1 +SST 10: |-----| 1 +SST 12: |-----| 1 +SST 11: |------| 2 +SST 14: |-------| 1 +SST 13: |-----| 2 +SST 15: |-------| 1 +SST 16: |--------| 2 +DEL 17: (2, 3, 4, 5, 6, 7, 8, 9) +CURRENT: 17 +``` + +Configuration options for compations are: +* max number of SST files that are merged at once +* coverage when compaction is triggered (otherwise calling compact is a noop) + +## Opening + +* Read the `CURRENT` file +* Delete all files with a higher sequence number than the one in the `CURRENT` file. +* Read all `*.del` files and delete the files that are listed in there. +* Read all `*.sst` files and memory map them. + +## Closing + +* fsync! +* (this also deleted enqueued files) + + diff --git a/turbopack/crates/turbo-persistence/src/arc_slice.rs b/turbopack/crates/turbo-persistence/src/arc_slice.rs new file mode 100644 index 0000000000000..785331a9262fc --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/arc_slice.rs @@ -0,0 +1,93 @@ +use std::{ + borrow::Borrow, + fmt::{self, Debug, Formatter}, + hash::{Hash, Hasher}, + ops::{Deref, Range}, + sync::Arc, +}; + +/// A owned slice that is backed by an `Arc`. +#[derive(Clone)] +pub struct ArcSlice { + data: *const [T], + arc: Arc<[T]>, +} + +unsafe impl Send for ArcSlice {} +unsafe impl Sync for ArcSlice {} + +impl From> for ArcSlice { + fn from(arc: Arc<[T]>) -> Self { + Self { + data: &*arc as *const [T], + arc, + } + } +} + +impl From> for ArcSlice { + fn from(b: Box<[T]>) -> Self { + Self::from(Arc::from(b)) + } +} + +impl Deref for ArcSlice { + type Target = [T]; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.data } + } +} + +impl Borrow<[T]> for ArcSlice { + fn borrow(&self) -> &[T] { + self + } +} + +impl Hash for ArcSlice { + fn hash(&self, state: &mut H) { + self.deref().hash(state) + } +} + +impl PartialEq for ArcSlice { + fn eq(&self, other: &Self) -> bool { + self.deref().eq(other.deref()) + } +} + +impl Debug for ArcSlice { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + Debug::fmt(&**self, f) + } +} + +impl Eq for ArcSlice {} + +impl ArcSlice { + /// Creates a new `ArcSlice` from a pointer to a slice and an `Arc`. + /// + /// # Safety + /// + /// The caller must ensure that the pointer is pointing to a valid slice that is kept alive by + /// the `Arc`. + pub unsafe fn new_unchecked(data: *const [T], arc: Arc<[T]>) -> Self { + Self { data, arc } + } + + /// Get the backing arc + pub fn full_arc(this: &ArcSlice) -> Arc<[T]> { + this.arc.clone() + } + + /// Returns a new `ArcSlice` that points to a slice of the current slice. + pub fn slice(self, range: Range) -> ArcSlice { + let data = &*self; + let data = &data[range] as *const [T]; + Self { + data, + arc: self.arc, + } + } +} diff --git a/turbopack/crates/turbo-persistence/src/collector.rs b/turbopack/crates/turbo-persistence/src/collector.rs new file mode 100644 index 0000000000000..bfd507be294e0 --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/collector.rs @@ -0,0 +1,113 @@ +use crate::{ + collector_entry::{CollectorEntry, CollectorEntryValue, EntryKey}, + constants::{ + DATA_THRESHOLD_PER_INITIAL_FILE, MAX_ENTRIES_PER_INITIAL_FILE, MAX_SMALL_VALUE_SIZE, + }, + key::{hash_key, StoreKey}, +}; + +/// A collector accumulates entries that should be eventually written to a file. It keeps track of +/// count and size of the entries to decide when it's "full". Accessing the entries sorts them. +pub struct Collector { + total_key_size: usize, + total_value_size: usize, + entries: Vec>, +} + +impl Collector { + /// Creates a new collector. Note that this allocates the full capacity for the entries. + pub fn new() -> Self { + Self { + total_key_size: 0, + total_value_size: 0, + entries: Vec::with_capacity(MAX_ENTRIES_PER_INITIAL_FILE), + } + } + + /// Returns true if the collector has no entries. + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Returns true if the collector is full. + pub fn is_full(&self) -> bool { + self.entries.len() >= MAX_ENTRIES_PER_INITIAL_FILE + || self.total_key_size + self.total_value_size > DATA_THRESHOLD_PER_INITIAL_FILE + } + + /// Adds a normal key-value pair to the collector. + pub fn put(&mut self, key: K, value: Vec) { + let key = EntryKey { + hash: hash_key(&key), + data: key, + }; + let value = if value.len() > MAX_SMALL_VALUE_SIZE { + CollectorEntryValue::Medium { value } + } else { + CollectorEntryValue::Small { value } + }; + self.total_key_size += key.len(); + self.total_value_size += value.len(); + self.entries.push(CollectorEntry { key, value }); + } + + /// Adds a blob key-value pair to the collector. + pub fn put_blob(&mut self, key: K, blob: u32) { + let key = EntryKey { + hash: hash_key(&key), + data: key, + }; + self.total_key_size += key.len(); + self.entries.push(CollectorEntry { + key, + value: CollectorEntryValue::Large { blob }, + }); + } + + /// Adds a tombstone pair to the collector. + pub fn delete(&mut self, key: K) { + let key = EntryKey { + hash: hash_key(&key), + data: key, + }; + self.total_key_size += key.len(); + self.entries.push(CollectorEntry { + key, + value: CollectorEntryValue::Deleted, + }); + } + + /// Adds an entry from another collector to this collector. + pub fn add_entry(&mut self, entry: CollectorEntry) { + self.total_key_size += entry.key.len(); + self.total_value_size += entry.value.len(); + self.entries.push(entry); + } + + /// Sorts the entries and returns them along with the total key and value sizes. This doesn't + /// clear the entries. + pub fn sorted(&mut self) -> (&[CollectorEntry], usize, usize) { + self.entries.sort_by(|a, b| a.key.cmp(&b.key)); + (&self.entries, self.total_key_size, self.total_value_size) + } + + /// Clears the collector. + pub fn clear(&mut self) { + self.entries.clear(); + self.total_key_size = 0; + self.total_value_size = 0; + } + + /// Drains all entries from the collector in un-sorted order. This can be used to move the + /// entries into another collector. + pub fn drain(&mut self) -> impl Iterator> + '_ { + self.total_key_size = 0; + self.total_value_size = 0; + self.entries.drain(..) + } + + /// Returns the number of entries in the collector. + pub fn len(&self) -> usize { + self.entries.len() + } +} diff --git a/turbopack/crates/turbo-persistence/src/collector_entry.rs b/turbopack/crates/turbo-persistence/src/collector_entry.rs new file mode 100644 index 0000000000000..a27db6d1119dc --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/collector_entry.rs @@ -0,0 +1,85 @@ +use std::cmp::Ordering; + +use crate::{ + key::StoreKey, + static_sorted_file_builder::{Entry, EntryValue}, +}; + +pub struct CollectorEntry { + pub key: EntryKey, + pub value: CollectorEntryValue, +} + +pub enum CollectorEntryValue { + Small { value: Vec }, + Medium { value: Vec }, + Large { blob: u32 }, + Deleted, +} + +impl CollectorEntryValue { + pub fn len(&self) -> usize { + match self { + CollectorEntryValue::Small { value } => value.len(), + CollectorEntryValue::Medium { value } => value.len(), + CollectorEntryValue::Large { blob: _ } => 0, + CollectorEntryValue::Deleted => 0, + } + } +} + +pub struct EntryKey { + pub hash: u64, + pub data: K, +} + +impl EntryKey { + pub fn len(&self) -> usize { + std::mem::size_of::() + self.data.len() + } +} + +impl PartialEq for EntryKey { + fn eq(&self, other: &Self) -> bool { + self.hash == other.hash && self.data == other.data + } +} + +impl Eq for EntryKey {} + +impl PartialOrd for EntryKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for EntryKey { + fn cmp(&self, other: &Self) -> Ordering { + self.hash + .cmp(&other.hash) + .then_with(|| self.data.cmp(&other.data)) + } +} + +impl Entry for CollectorEntry { + fn key_hash(&self) -> u64 { + self.key.hash + } + + fn key_len(&self) -> usize { + self.key.data.len() + } + + fn write_key_to(&self, buf: &mut Vec) { + self.key.data.write_to(buf); + } + + fn value(&self) -> EntryValue<'_> { + match &self.value { + CollectorEntryValue::Small { value } => EntryValue::Small { value }, + CollectorEntryValue::Medium { value } => EntryValue::Medium { value }, + CollectorEntryValue::Large { blob } => EntryValue::Large { blob: *blob }, + CollectorEntryValue::Deleted => EntryValue::Deleted, + } + } +} diff --git a/turbopack/crates/turbo-persistence/src/compaction/mod.rs b/turbopack/crates/turbo-persistence/src/compaction/mod.rs new file mode 100644 index 0000000000000..199a414becc3f --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/compaction/mod.rs @@ -0,0 +1 @@ +pub mod selector; diff --git a/turbopack/crates/turbo-persistence/src/compaction/selector.rs b/turbopack/crates/turbo-persistence/src/compaction/selector.rs new file mode 100644 index 0000000000000..2a67cab2acd18 --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/compaction/selector.rs @@ -0,0 +1,371 @@ +/// The merge and move jobs that the compaction algorithm has computed. It's expected that all move +/// jobs are executed in parallel and when that has finished the move jobs are executed in parallel. +#[derive(Debug)] +pub struct CompactionJobs { + pub merge_jobs: Vec>, + pub move_jobs: Vec, +} + +impl CompactionJobs { + #[cfg(test)] + pub(self) fn is_empty(&self) -> bool { + self.merge_jobs.is_empty() && self.move_jobs.is_empty() + } +} + +type Range = (u64, u64); + +/// The trait for the input of the compaction algorithm. +pub trait Compactable { + /// Returns the range of the compactable. + fn range(&self) -> Range; +} + +fn is_overlapping(a: &Range, b: &Range) -> bool { + a.0 <= b.1 && b.0 <= a.1 +} + +fn spread(range: &Range) -> u64 { + range.1 - range.0 +} + +/// Extends the range `a` to include the range `b`, returns `true` if the range was extended. +fn extend_range(a: &mut Range, b: &Range) -> bool { + let mut extended = false; + if b.0 < a.0 { + a.0 = b.0; + extended = true; + } + if b.1 > a.1 { + a.1 = b.1; + extended = true; + } + extended +} + +/// Computes the total coverage of the compactables. +pub fn total_coverage(compactables: &[T], full_range: Range) -> f32 { + let mut coverage = 0.0f32; + for c in compactables { + let range = c.range(); + coverage += spread(&range) as f32; + } + coverage / spread(&full_range) as f32 +} + +/// Configuration for the compaction algorithm. +pub struct CompactConfig { + /// The maximum number of files to merge at once. + pub max_merge: usize, + + /// The minimum number of files to merge at once. + pub min_merge: usize, +} + +/// For a list of compactables, computes merge and move jobs that are expected to perform best. +pub fn get_compaction_jobs( + compactables: &[T], + config: &CompactConfig, +) -> CompactionJobs { + let (jobs, _) = get_compaction_jobs_internal(compactables, config, 0); + jobs +} + +fn get_compaction_jobs_internal( + compactables: &[T], + config: &CompactConfig, + start_index: usize, +) -> (CompactionJobs, f32) { + let len = compactables.len(); + let mut used_compactables = vec![false; len]; + let mut need_move = vec![false; len]; + let mut merge_jobs = Vec::new(); + let mut merge_jobs_reducation = 0.0f32; + let mut move_jobs = Vec::new(); + + let age = |i| (len - 1 - i) as f32; + + loop { + // Find the first unused compactable. + let Some(start) = used_compactables + .iter() + .skip(start_index) + .position(|&used| !used) + .map(|i| i + start_index) + else { + break; + }; + if start >= len - 1 { + break; + } + used_compactables[start] = true; + let start_range = compactables[start].range(); + let mut range = start_range; + + let mut merge_job = Vec::new(); + merge_job.push(start); + let mut merge_job_input_spread = spread(&start_range) as f32; + + 'outer: loop { + // Find the next overlapping unused compactable and extend the range to cover it. + // If it already covers it, add this to the current set. + let mut i = start + 1; + loop { + if !used_compactables[i] { + let range_for_i = compactables[i].range(); + if is_overlapping(&range, &range_for_i) { + let mut extended_range = range; + if !extend_range(&mut extended_range, &range_for_i) { + used_compactables[i] = true; + merge_job.push(i); + merge_job_input_spread += spread(&range_for_i) as f32; + } else { + let s = spread(&range); + // Disallow doubling the range spread + if merge_job.len() >= config.min_merge + && spread(&extended_range) - s > s + { + break 'outer; + } + range = extended_range; + // Need to restart the search from the beginning as the extended range + // may overlap with compactables that were + // already processed. + break; + } + } + } + i += 1; + if i >= compactables.len() { + break 'outer; + } + if merge_job.len() >= config.max_merge { + break 'outer; + } + } + } + + if merge_job.len() < config.min_merge { + continue; + } + let mut merge_range = compactables[start].range(); + if !merge_job + .iter() + .skip(1) + .any(|&i| is_overlapping(&merge_range, &compactables[i].range())) + { + // No overlapping ranges, skip that merge job. + continue; + } + + for &i in merge_job.iter().skip(1) { + extend_range(&mut merge_range, &compactables[i].range()); + } + merge_jobs_reducation = (merge_job_input_spread - spread(&merge_range) as f32) * age(start); + + for (i, compactable) in compactables + .iter() + .enumerate() + .skip(merge_job.last().unwrap() + 1) + { + if used_compactables[i] { + continue; + } + let range = compactable.range(); + if is_overlapping(&merge_range, &range) && !need_move[i] { + need_move[i] = true; + used_compactables[i] = true; + move_jobs.push(i); + } + } + + merge_jobs.push(merge_job); + } + + // Check if there is an alternative with better reduction. + if !move_jobs.is_empty() { + let offset = move_jobs[0]; + let (result, estimated_reduction) = + get_compaction_jobs_internal(compactables, config, offset); + if estimated_reduction > merge_jobs_reducation { + return (result, estimated_reduction); + } + } + + move_jobs.sort_unstable(); + + ( + CompactionJobs { + merge_jobs, + move_jobs, + }, + merge_jobs_reducation, + ) +} + +#[cfg(test)] +mod tests { + use std::{ + fmt::Debug, + mem::{swap, take}, + }; + + use rand::{Rng, SeedableRng}; + + use super::*; + + struct TestCompactable { + range: Range, + } + + impl Compactable for TestCompactable { + fn range(&self) -> Range { + self.range + } + } + + fn compact(ranges: [(u64, u64); N], max_merge: usize) -> CompactionJobs { + let compactables = ranges + .iter() + .map(|&range| TestCompactable { range }) + .collect::>(); + let config = CompactConfig { + max_merge, + min_merge: 2, + }; + get_compaction_jobs(&compactables, &config) + } + + #[test] + fn test_compaction_jobs() { + let CompactionJobs { + merge_jobs, + move_jobs, + .. + } = compact( + [ + (0, 10), + (10, 30), + (9, 13), + (0, 30), + (40, 44), + (41, 42), + (41, 47), + (90, 100), + (30, 40), + ], + 3, + ); + assert_eq!(merge_jobs, vec![vec![0, 1, 2], vec![4, 5, 6]]); + assert_eq!(move_jobs, vec![3, 8]); + } + + #[test] + fn simulate_compactions() { + let mut rnd = rand::rngs::SmallRng::from_seed([0; 32]); + let mut keys = (0..1000) + .map(|_| rnd.gen_range(0..10000)) + .collect::>(); + + let mut containers = keys + .chunks(100) + .map(|keys| Container::new(keys.to_vec())) + .collect::>(); + + let mut warm_keys = (0..100) + .map(|_| { + let i = rnd.gen_range(0..keys.len()); + keys.swap_remove(i) + }) + .collect::>(); + + let mut number_of_compactions = 0; + + for _ in 0..100 { + let coverage = total_coverage(&containers, (0, 10000)); + println!( + "{containers:#?} coverage: {}, items: {}", + coverage, + containers.len() + ); + + if coverage > 10.0 { + let config = CompactConfig { + max_merge: 4, + min_merge: 2, + }; + let jobs = get_compaction_jobs(&containers, &config); + if !jobs.is_empty() { + println!("{jobs:?}"); + + do_compact(&mut containers, jobs); + number_of_compactions += 1; + } + } else { + println!("No compaction needed"); + } + + // Modify warm keys + containers.push(Container::new(warm_keys.clone())); + + // Change some warm keys + for _ in 0..10 { + let i = rnd.gen_range(0..warm_keys.len()); + let j = rnd.gen_range(0..keys.len()); + swap(&mut warm_keys[i], &mut keys[j]); + } + } + println!("Number of compactions: {}", number_of_compactions); + + assert!(containers.len() < 40); + let coverage = total_coverage(&containers, (0, 10000)); + assert!(coverage < 12.0); + } + + struct Container { + keys: Vec, + } + + impl Container { + fn new(mut keys: Vec) -> Self { + keys.sort_unstable(); + Self { keys } + } + } + + impl Compactable for Container { + fn range(&self) -> Range { + (self.keys[0], *self.keys.last().unwrap()) + } + } + + impl Debug for Container { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let (l, r) = self.range(); + write!(f, "{} {l} - {r} ({})", self.keys.len(), r - l) + } + } + + fn do_compact(containers: &mut Vec, jobs: CompactionJobs) { + for merge_job in jobs.merge_jobs { + let mut keys = Vec::new(); + for i in merge_job { + keys.append(&mut containers[i].keys); + } + keys.sort_unstable(); + keys.dedup(); + containers.extend(keys.chunks(100).map(|keys| Container { + keys: keys.to_vec(), + })); + } + + for i in jobs.move_jobs { + let moved_container = Container { + keys: take(&mut containers[i].keys), + }; + containers.push(moved_container); + } + + containers.retain(|c| !c.keys.is_empty()); + } +} diff --git a/turbopack/crates/turbo-persistence/src/constants.rs b/turbopack/crates/turbo-persistence/src/constants.rs new file mode 100644 index 0000000000000..af103a4bc95c1 --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/constants.rs @@ -0,0 +1,34 @@ +/// Values larger than this become blob files +pub const MAX_MEDIUM_VALUE_SIZE: usize = 64 * 1024 * 1024; + +/// Values larger than this become separate value blocks +// Note this must fit into 2 bytes length +pub const MAX_SMALL_VALUE_SIZE: usize = 64 * 1024 - 1; + +/// Maximum number of entries per SST file +pub const MAX_ENTRIES_PER_INITIAL_FILE: usize = 1024 * 1024; + +/// Maximum number of entries per SST file +pub const MAX_ENTRIES_PER_COMPACTED_FILE: usize = 1024 * 1024; + +/// Finish file when total amount of data exceeds this +pub const DATA_THRESHOLD_PER_INITIAL_FILE: usize = 256 * 1024 * 1024; + +/// Finish file when total amount of data exceeds this +pub const DATA_THRESHOLD_PER_COMPACTED_FILE: usize = 256 * 1024 * 1024; + +/// Maximum RAM bytes for AQMF cache +pub const AQMF_CACHE_SIZE: u64 = 300 * 1024 * 1024; +pub const AQMF_AVG_SIZE: usize = 37399; + +/// Maximum RAM bytes for index block cache +pub const INDEX_BLOCK_CACHE_SIZE: u64 = 100 * 1024 * 1024; +pub const INDEX_BLOCK_AVG_SIZE: usize = 152000; + +/// Maximum RAM bytes for key block cache +pub const KEY_BLOCK_CACHE_SIZE: u64 = 300 * 1024 * 1024; +pub const KEY_BLOCK_AVG_SIZE: usize = 16 * 1024; + +/// Maximum RAM bytes for value block cache +pub const VALUE_BLOCK_CACHE_SIZE: u64 = 300 * 1024 * 1024; +pub const VALUE_BLOCK_AVG_SIZE: usize = 132000; diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs new file mode 100644 index 0000000000000..2366e5c7f2b65 --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -0,0 +1,910 @@ +use std::{ + any::{Any, TypeId}, + collections::HashSet, + fs::{self, File, OpenOptions, ReadDir}, + io::Write, + mem::{swap, transmute, MaybeUninit}, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, + }, +}; + +use anyhow::{bail, Context, Result}; +use byteorder::{ReadBytesExt, WriteBytesExt, BE}; +use lzzzz::lz4::decompress; +use parking_lot::{Mutex, RwLock}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; + +use crate::{ + arc_slice::ArcSlice, + compaction::selector::{ + get_compaction_jobs, total_coverage, CompactConfig, Compactable, CompactionJobs, + }, + constants::{ + AQMF_AVG_SIZE, AQMF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, INDEX_BLOCK_AVG_SIZE, + INDEX_BLOCK_CACHE_SIZE, KEY_BLOCK_AVG_SIZE, KEY_BLOCK_CACHE_SIZE, + MAX_ENTRIES_PER_COMPACTED_FILE, VALUE_BLOCK_AVG_SIZE, VALUE_BLOCK_CACHE_SIZE, + }, + key::{hash_key, StoreKey}, + lookup_entry::LookupEntry, + merge_iter::MergeIter, + static_sorted_file::{ + AqmfCache, BlockCache, LookupResult, StaticSortedFile, StaticSortedFileRange, + }, + static_sorted_file_builder::StaticSortedFileBuilder, + write_batch::WriteBatch, + QueryKey, +}; + +#[cfg(feature = "stats")] +#[derive(Debug)] +pub struct CacheStatistics { + pub hit_rate: f32, + pub fill: f32, + pub items: usize, + pub size: u64, + pub hits: u64, + pub misses: u64, +} + +#[cfg(feature = "stats")] +impl CacheStatistics { + fn new(cache: &quick_cache::sync::Cache) -> Self + where + Key: Eq + std::hash::Hash, + Val: Clone, + We: quick_cache::Weighter + Clone, + B: std::hash::BuildHasher + Clone, + L: quick_cache::Lifecycle + Clone, + { + let size = cache.weight(); + let hits = cache.hits(); + let misses = cache.misses(); + Self { + hit_rate: hits as f32 / (hits + misses) as f32, + fill: size as f32 / cache.capacity() as f32, + items: cache.len(), + size, + hits, + misses, + } + } +} + +#[cfg(feature = "stats")] +#[derive(Debug)] +pub struct Statistics { + pub sst_files: usize, + pub index_block_cache: CacheStatistics, + pub key_block_cache: CacheStatistics, + pub value_block_cache: CacheStatistics, + pub aqmf_cache: CacheStatistics, + pub hits: u64, + pub misses: u64, + pub miss_range: u64, + pub miss_aqmf: u64, + pub miss_key: u64, +} + +#[cfg(feature = "stats")] +#[derive(Default)] +struct TrackedStats { + hits_deleted: std::sync::atomic::AtomicU64, + hits_small: std::sync::atomic::AtomicU64, + hits_blob: std::sync::atomic::AtomicU64, + miss_range: std::sync::atomic::AtomicU64, + miss_aqmf: std::sync::atomic::AtomicU64, + miss_key: std::sync::atomic::AtomicU64, + miss_global: std::sync::atomic::AtomicU64, +} + +/// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time +/// using a single write batch. It allows for concurrent reads. +pub struct TurboPersistence { + /// The path to the directory where the database is stored + path: PathBuf, + /// The inner state of the database. Writing will update that. + inner: RwLock, + /// A cache for the last WriteBatch. It is used to avoid reallocation of buffers for the + /// WriteBatch. + idle_write_batch: Mutex)>>, + /// A flag to indicate if a write operation is currently active. Prevents multiple concurrent + /// write operations. + active_write_operation: AtomicBool, + /// A cache for deserialized AQMF filters. + aqmf_cache: AqmfCache, + /// A cache for decompressed index blocks. + index_block_cache: BlockCache, + /// A cache for decompressed key blocks. + key_block_cache: BlockCache, + /// A cache for decompressed value blocks. + value_block_cache: BlockCache, + /// Statistics for the database. + #[cfg(feature = "stats")] + stats: TrackedStats, +} + +/// The inner state of the database. +struct Inner { + /// The list of SST files in the database in order. + static_sorted_files: Vec, + /// The current sequence number for the database. + current_sequence_number: u32, +} + +impl TurboPersistence { + /// Open a TurboPersistence database at the given path. + /// This will read the directory and might performance cleanup when the database was not closed + /// properly. Cleanup only requires to read a few bytes from a few files and to delete + /// files, so it's fast. + pub fn open(path: PathBuf) -> Result { + let mut db = Self { + path, + inner: RwLock::new(Inner { + static_sorted_files: Vec::new(), + current_sequence_number: 0, + }), + idle_write_batch: Mutex::new(None), + active_write_operation: AtomicBool::new(false), + aqmf_cache: AqmfCache::with( + AQMF_CACHE_SIZE as usize / AQMF_AVG_SIZE, + AQMF_CACHE_SIZE, + Default::default(), + Default::default(), + Default::default(), + ), + index_block_cache: BlockCache::with( + INDEX_BLOCK_CACHE_SIZE as usize / INDEX_BLOCK_AVG_SIZE, + INDEX_BLOCK_CACHE_SIZE, + Default::default(), + Default::default(), + Default::default(), + ), + key_block_cache: BlockCache::with( + KEY_BLOCK_CACHE_SIZE as usize / KEY_BLOCK_AVG_SIZE, + KEY_BLOCK_CACHE_SIZE, + Default::default(), + Default::default(), + Default::default(), + ), + value_block_cache: BlockCache::with( + VALUE_BLOCK_CACHE_SIZE as usize / VALUE_BLOCK_AVG_SIZE, + VALUE_BLOCK_CACHE_SIZE, + Default::default(), + Default::default(), + Default::default(), + ), + #[cfg(feature = "stats")] + stats: TrackedStats::default(), + }; + db.open_directory()?; + Ok(db) + } + + /// Performas the initial check on the database directory. + fn open_directory(&mut self) -> Result<()> { + match fs::read_dir(&self.path) { + Ok(entries) => { + if !self + .load_directory(entries) + .context("Loading persistence directory failed")? + { + self.init_directory() + .context("Initializing persistence directory failed")?; + } + Ok(()) + } + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + self.create_and_init_directory() + .context("Creating and initializing persistence directory failed")?; + Ok(()) + } else { + Err(e).context("Failed to open database") + } + } + } + } + + /// Creates the directory and initializes it. + fn create_and_init_directory(&mut self) -> Result<()> { + fs::create_dir_all(&self.path)?; + self.init_directory() + } + + /// Initializes the directory by creating the CURRENT file. + fn init_directory(&mut self) -> Result<()> { + let mut current = File::create(self.path.join("CURRENT"))?; + current.write_u32::(0)?; + current.flush()?; + Ok(()) + } + + /// Loads an existing database directory and performs cleanup if necessary. + fn load_directory(&mut self, entries: ReadDir) -> Result { + let mut sst_files = Vec::new(); + let mut current_file = match File::open(self.path.join("CURRENT")) { + Ok(file) => file, + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(false); + } else { + return Err(e).context("Failed to open CURRENT file"); + } + } + }; + let current = current_file.read_u32::()?; + drop(current_file); + + let mut deleted_files = HashSet::new(); + for entry in entries { + let entry = entry?; + let path = entry.path(); + if let Some(ext) = path.extension().and_then(|s| s.to_str()) { + let seq: u32 = path + .file_stem() + .context("File has no file stem")? + .to_str() + .context("File stem is not valid utf-8")? + .parse()?; + if deleted_files.contains(&seq) { + continue; + } + if seq > current { + fs::remove_file(&path)?; + } else { + match ext { + "sst" => { + sst_files.push(seq); + } + "del" => { + let mut content = &*fs::read(&path)?; + let mut no_existing_files = true; + while !content.is_empty() { + let seq = content.read_u32::()?; + deleted_files.insert(seq); + let sst_file = self.path.join(format!("{:08}.sst", seq)); + let blob_file = self.path.join(format!("{:08}.blob", seq)); + for path in [sst_file, blob_file] { + if fs::exists(&path)? { + fs::remove_file(path)?; + no_existing_files = false; + } + } + } + if no_existing_files { + fs::remove_file(&path)?; + } + } + "blob" => { + // ignore blobs, they are read when needed + } + _ => { + bail!("Unexpected file in persistence directory: {:?}", path); + } + } + } + } else { + match path.file_stem().and_then(|s| s.to_str()) { + Some("CURRENT") => { + // Already read + } + _ => { + bail!("Unexpected file in persistence directory: {:?}", path); + } + } + } + } + + sst_files.retain(|seq| !deleted_files.contains(seq)); + sst_files.sort(); + let sst_files = sst_files + .into_iter() + .map(|seq| self.open_sst(seq)) + .collect::>>()?; + #[cfg(feature = "stats")] + { + for sst in sst_files.iter() { + let crate::static_sorted_file::StaticSortedFileRange { + family, + min_hash, + max_hash, + } = sst.range()?; + println!( + "SST {} {} {:016x} - {:016x} {:016x}", + sst.sequence_number(), + family, + min_hash, + max_hash, + max_hash - min_hash + ); + } + } + let inner = self.inner.get_mut(); + inner.static_sorted_files = sst_files; + inner.current_sequence_number = current; + Ok(true) + } + + /// Opens a single SST file. This memory maps the file, but doesn't read it yet. + fn open_sst(&self, seq: u32) -> Result { + let path = self.path.join(format!("{:08}.sst", seq)); + StaticSortedFile::open(seq, path) + .with_context(|| format!("Unable to open sst file {:08}.sst", seq)) + } + + /// Reads and decompresses a blob file. This is not backed by any cache. + fn read_blob(&self, seq: u32) -> Result> { + let path = self.path.join(format!("{:08}.blob", seq)); + let compressed = + fs::read(path).with_context(|| format!("Unable to read blob file {:08}.blob", seq))?; + let mut compressed = &compressed[..]; + let uncompressed_length = compressed.read_u32::()? as usize; + + let buffer = Arc::new_zeroed_slice(uncompressed_length); + // Safety: MaybeUninit can be safely transmuted to u8. + let mut buffer = unsafe { transmute::]>, Arc<[u8]>>(buffer) }; + // Safety: We know that the buffer is not shared yet. + let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) }; + decompress(compressed, decompressed)?; + Ok(ArcSlice::from(buffer)) + } + + /// Returns true if the database is empty. + pub fn is_empty(&self) -> bool { + self.inner.read().static_sorted_files.is_empty() + } + + /// Starts a new WriteBatch for the database. Only a single write operation is allowed at a + /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`]. + /// Note that the WriteBatch might start writing data to disk while it's filled up with data. + /// This data will only become visible after the WriteBatch is committed. + pub fn write_batch( + &self, + ) -> Result> { + if self + .active_write_operation + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + bail!( + "Another write batch or compaction is already active (Only a single write \ + operations is allowed at a time)" + ); + } + let current = self.inner.read().current_sequence_number; + if let Some((ty, any)) = self.idle_write_batch.lock().take() { + if ty == TypeId::of::>() { + let mut write_batch = *any.downcast::>().unwrap(); + write_batch.reset(current); + return Ok(write_batch); + } + } + Ok(WriteBatch::new(self.path.clone(), current)) + } + + /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it + /// visible to readers. + pub fn commit_write_batch( + &self, + mut write_batch: WriteBatch, + ) -> Result<()> { + let (seq, new_sst_files) = write_batch.finish()?; + self.commit(new_sst_files, vec![], seq)?; + self.active_write_operation.store(false, Ordering::Release); + self.idle_write_batch.lock().replace(( + TypeId::of::>(), + Box::new(write_batch), + )); + Ok(()) + } + + /// fsyncs the new files and updates the CURRENT file. Updates the database state to include the + /// new files. + fn commit( + &self, + new_sst_files: Vec<(u32, File)>, + mut indicies_to_delete: Vec, + mut seq: u32, + ) -> Result<(), anyhow::Error> { + let mut new_sst_files = new_sst_files + .into_iter() + .map(|(seq, file)| { + file.sync_all()?; + self.open_sst(seq) + }) + .collect::>>()?; + + if !indicies_to_delete.is_empty() { + seq += 1; + } + + let removed_ssts; + + { + let mut inner = self.inner.write(); + inner.current_sequence_number = seq; + indicies_to_delete.sort(); + removed_ssts = remove_indicies(&mut inner.static_sorted_files, &indicies_to_delete); + inner.static_sorted_files.append(&mut new_sst_files); + } + + let mut removed_ssts = removed_ssts + .into_iter() + .map(|sst| sst.sequence_number()) + .collect::>(); + removed_ssts.sort(); + + if !indicies_to_delete.is_empty() { + // Write *.del file, marking the selected files as to delete + let mut buf = Vec::with_capacity(removed_ssts.len() * 4); + for seq in removed_ssts.iter() { + buf.write_u32::(*seq)?; + } + let mut file = File::create(self.path.join(format!("{:08}.del", seq)))?; + file.write_all(&buf)?; + file.sync_all()?; + } + + let mut current_file = OpenOptions::new() + .write(true) + .truncate(false) + .read(false) + .open(self.path.join("CURRENT"))?; + current_file.write_u32::(seq)?; + current_file.sync_all()?; + + for seq in removed_ssts { + fs::remove_file(self.path.join(format!("{seq:08}.sst")))?; + } + + Ok(()) + } + + /// Runs a full compaction on the database. This will rewrite all SST files, removing all + /// duplicate keys and separating all key ranges into unique files. + pub fn full_compact(&self) -> Result<()> { + self.compact(0.0, usize::MAX)?; + Ok(()) + } + + /// Runs a (partial) compaction. Compaction will only be performed if the coverage of the SST + /// files is above the given threshold. The coverage is the average number of SST files that + /// need to be read to find a key. It also limits the maximum number of SST files that are + /// merged at once, which is the main factor for the runtime of the compaction. + pub fn compact(&self, max_coverage: f32, max_merge_sequence: usize) -> Result<()> { + if self + .active_write_operation + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + bail!( + "Another write batch or compaction is already active (Only a single write \ + operations is allowed at a time)" + ); + } + + let mut sequence_number; + let mut new_sst_files = Vec::new(); + let mut indicies_to_delete = Vec::new(); + + { + let inner = self.inner.read(); + sequence_number = AtomicU32::new(inner.current_sequence_number); + self.compact_internal( + &inner.static_sorted_files, + &sequence_number, + &mut new_sst_files, + &mut indicies_to_delete, + max_coverage, + max_merge_sequence, + )?; + } + + self.commit( + new_sst_files, + indicies_to_delete, + *sequence_number.get_mut(), + )?; + + self.active_write_operation.store(false, Ordering::Release); + + Ok(()) + } + + /// Internal function to perform a compaction. + fn compact_internal( + &self, + static_sorted_files: &[StaticSortedFile], + sequence_number: &AtomicU32, + new_sst_files: &mut Vec<(u32, File)>, + indicies_to_delete: &mut Vec, + max_coverage: f32, + max_merge_sequence: usize, + ) -> Result { + if static_sorted_files.is_empty() { + return Ok(false); + } + + struct SstWithRange { + index: usize, + range: StaticSortedFileRange, + } + + impl Compactable for SstWithRange { + fn range(&self) -> (u64, u64) { + (self.range.min_hash, self.range.max_hash) + } + } + + let ssts_with_ranges = static_sorted_files + .iter() + .enumerate() + .flat_map(|(index, sst)| sst.range().ok().map(|range| SstWithRange { index, range })) + .collect::>(); + + let families = ssts_with_ranges + .iter() + .map(|s| s.range.family) + .max() + .unwrap() as usize + + 1; + + let mut sst_by_family = Vec::with_capacity(families); + sst_by_family.resize_with(families, Vec::new); + + for sst in ssts_with_ranges { + sst_by_family[sst.range.family as usize].push(sst); + } + + let key_block_cache = &self.key_block_cache; + let value_block_cache = &self.value_block_cache; + let path = &self.path; + + let result = sst_by_family + .into_par_iter() + .with_min_len(1) + .enumerate() + .map(|(family, ssts_with_ranges)| { + let coverage = total_coverage(&ssts_with_ranges, (0, u64::MAX)); + if coverage <= max_coverage { + return Ok((Vec::new(), Vec::new())); + } + + let CompactionJobs { + merge_jobs, + move_jobs, + } = get_compaction_jobs( + &ssts_with_ranges, + &CompactConfig { + max_merge: max_merge_sequence, + min_merge: 2, + }, + ); + + // Later we will remove the merged and moved files + let indicies_to_delete = merge_jobs + .iter() + .flat_map(|l| l.iter().copied()) + .chain(move_jobs.iter().copied()) + .map(|index| ssts_with_ranges[index].index) + .collect::>(); + + // Merge SST files + let merge_result = merge_jobs + .into_par_iter() + .with_min_len(1) + .map(|indicies| { + fn create_sst_file( + family: u32, + entries: &[LookupEntry], + total_key_size: usize, + total_value_size: usize, + path: &Path, + seq: u32, + ) -> Result<(u32, File)> { + let builder = StaticSortedFileBuilder::new( + family, + entries, + total_key_size, + total_value_size, + )?; + Ok((seq, builder.write(&path.join(format!("{:08}.sst", seq)))?)) + } + + let mut new_sst_files = Vec::new(); + + // Iterate all SST files + let iters = indicies + .iter() + .map(|&index| { + let index = ssts_with_ranges[index].index; + let sst = &static_sorted_files[index]; + sst.iter(key_block_cache, value_block_cache) + }) + .collect::>>()?; + + let iter = MergeIter::new(iters.into_iter())?; + + let mut total_key_size = 0; + let mut total_value_size = 0; + let mut current: Option = None; + let mut entries = Vec::new(); + let mut last_entries = Vec::new(); + let mut last_entries_total_sizes = (0, 0); + for entry in iter { + let entry = entry?; + + // Remove duplicates + if let Some(current) = current.take() { + if current.key != entry.key { + let key_size = current.key.len(); + let value_size = current.value.size_in_sst(); + total_key_size += key_size; + total_value_size += value_size; + + if total_key_size + total_value_size + > DATA_THRESHOLD_PER_COMPACTED_FILE + || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE + { + let (selected_total_key_size, selected_total_value_size) = + last_entries_total_sizes; + swap(&mut entries, &mut last_entries); + last_entries_total_sizes = ( + total_key_size - key_size, + total_value_size - value_size, + ); + total_key_size = key_size; + total_value_size = value_size; + + if !entries.is_empty() { + let seq = + sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + new_sst_files.push(create_sst_file( + family as u32, + &entries, + selected_total_key_size, + selected_total_value_size, + path, + seq, + )?); + + entries.clear(); + } + } + + entries.push(current); + } else { + // Override value + } + } + current = Some(entry); + } + if let Some(entry) = current { + total_key_size += entry.key.len(); + total_value_size += entry.value.size_in_sst(); + entries.push(entry); + } + + // If we have one set of entries left, write them to a new SST file + if last_entries.is_empty() && !entries.is_empty() { + let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + new_sst_files.push(create_sst_file( + family as u32, + &entries, + total_key_size, + total_value_size, + path, + seq, + )?); + } else + // If we have two sets of entries left, merge them and + // split it into two SST files, to avoid having a + // single SST file that is very small. + if !last_entries.is_empty() { + last_entries.append(&mut entries); + + last_entries_total_sizes.0 += total_key_size; + last_entries_total_sizes.1 += total_value_size; + + let (part1, part2) = last_entries.split_at(last_entries.len() / 2); + + let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + new_sst_files.push(create_sst_file( + family as u32, + part1, + // We don't know the exact sizes so we estimate them + last_entries_total_sizes.0 / 2, + last_entries_total_sizes.1 / 2, + path, + seq1, + )?); + + new_sst_files.push(create_sst_file( + family as u32, + part2, + last_entries_total_sizes.0 / 2, + last_entries_total_sizes.1 / 2, + path, + seq2, + )?); + } + Ok(new_sst_files) + }) + .collect::>>()?; + + // Move SST files + let mut new_sst_files = move_jobs + .into_par_iter() + .with_min_len(1) + .map(|index| { + let index = ssts_with_ranges[index].index; + let sst = &static_sorted_files[index]; + let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let src_path = self.path.join(format!("{:08}.sst", sst.sequence_number())); + let dst_path = self.path.join(format!("{:08}.sst", seq)); + if fs::hard_link(&src_path, &dst_path).is_err() { + fs::copy(src_path, &dst_path)?; + } + Ok((seq, File::open(dst_path)?)) + }) + .collect::>>()?; + + new_sst_files.extend(merge_result.into_iter().flatten()); + Ok((new_sst_files, indicies_to_delete)) + }) + .collect::>>()?; + + for (mut inner_new_sst_files, mut inner_indicies_to_delete) in result { + new_sst_files.append(&mut inner_new_sst_files); + indicies_to_delete.append(&mut inner_indicies_to_delete); + } + + Ok(true) + } + + /// Get a value from the database. Returns None if the key is not found. The returned value + /// might hold onto a block of the database and it should not be hold long-term. + pub fn get(&self, family: usize, key: &K) -> Result>> { + let hash = hash_key(key); + let inner = self.inner.read(); + for sst in inner.static_sorted_files.iter().rev() { + match sst.lookup( + family as u32, + hash, + key, + &self.aqmf_cache, + &self.index_block_cache, + &self.key_block_cache, + &self.value_block_cache, + )? { + LookupResult::Deleted => { + #[cfg(feature = "stats")] + self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed); + return Ok(None); + } + LookupResult::Slice { value } => { + #[cfg(feature = "stats")] + self.stats.hits_small.fetch_add(1, Ordering::Relaxed); + return Ok(Some(value)); + } + LookupResult::Blob { sequence_number } => { + #[cfg(feature = "stats")] + self.stats.hits_blob.fetch_add(1, Ordering::Relaxed); + let blob = self.read_blob(sequence_number)?; + return Ok(Some(blob)); + } + LookupResult::RangeMiss => { + #[cfg(feature = "stats")] + self.stats.miss_range.fetch_add(1, Ordering::Relaxed); + } + LookupResult::QuickFilterMiss => { + #[cfg(feature = "stats")] + self.stats.miss_aqmf.fetch_add(1, Ordering::Relaxed); + } + LookupResult::KeyMiss => { + #[cfg(feature = "stats")] + self.stats.miss_key.fetch_add(1, Ordering::Relaxed); + } + } + } + #[cfg(feature = "stats")] + self.stats.miss_global.fetch_add(1, Ordering::Relaxed); + Ok(None) + } + + /// Returns database statistics. + #[cfg(feature = "stats")] + pub fn statistics(&self) -> Statistics { + let inner = self.inner.read(); + Statistics { + sst_files: inner.static_sorted_files.len(), + index_block_cache: CacheStatistics::new(&self.index_block_cache), + key_block_cache: CacheStatistics::new(&self.key_block_cache), + value_block_cache: CacheStatistics::new(&self.value_block_cache), + aqmf_cache: CacheStatistics::new(&self.aqmf_cache), + hits: self.stats.hits_deleted.load(Ordering::Relaxed) + + self.stats.hits_small.load(Ordering::Relaxed) + + self.stats.hits_blob.load(Ordering::Relaxed), + misses: self.stats.miss_global.load(Ordering::Relaxed), + miss_range: self.stats.miss_range.load(Ordering::Relaxed), + miss_aqmf: self.stats.miss_aqmf.load(Ordering::Relaxed), + miss_key: self.stats.miss_key.load(Ordering::Relaxed), + } + } + + /// Shuts down the database. This will print statistics if the `stats` feature is enabled. + pub fn shutdown(&self) -> Result<()> { + #[cfg(feature = "stats")] + println!("{:#?}", self.statistics()); + Ok(()) + } +} + +/// Helper method to remove certain indicies from a list while keeping the order. +/// This is similar to the `remove` method on Vec, but it allows to remove multiple indicies at +/// once. It returns the removed elements in unspecified order. +/// +/// Note: The `sorted_indicies` list needs to be sorted. +fn remove_indicies(list: &mut Vec, sorted_indicies: &[usize]) -> Vec { + let mut r = 0; + let mut w = 0; + let mut i = 0; + while r < list.len() { + if i < sorted_indicies.len() { + let idx = sorted_indicies[i]; + if r != idx { + list.swap(w, r); + w += 1; + r += 1; + } else { + r += 1; + i += 1; + } + } else { + list.swap(w, r); + w += 1; + r += 1; + } + } + list.split_off(w) +} + +#[cfg(test)] +mod tests { + use crate::db::remove_indicies; + + #[test] + fn test_remove_indicies() { + let mut list = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; + let sorted_indicies = vec![1, 3, 5, 7]; + let removed = remove_indicies(&mut list, &sorted_indicies); + assert_eq!(list, vec![1, 3, 5, 7, 9]); + assert!(removed.contains(&2)); + assert!(removed.contains(&4)); + assert!(removed.contains(&6)); + assert!(removed.contains(&8)); + assert_eq!(removed.len(), 4); + } + + #[test] + fn test_remove_indicies2() { + let mut list = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; + let sorted_indicies = vec![0, 1, 2, 6, 7, 8]; + let removed = remove_indicies(&mut list, &sorted_indicies); + assert_eq!(list, vec![4, 5, 6]); + assert!(removed.contains(&1)); + assert!(removed.contains(&2)); + assert!(removed.contains(&3)); + assert!(removed.contains(&7)); + assert!(removed.contains(&8)); + assert!(removed.contains(&9)); + assert_eq!(removed.len(), 6); + } +} diff --git a/turbopack/crates/turbo-persistence/src/key.rs b/turbopack/crates/turbo-persistence/src/key.rs new file mode 100644 index 0000000000000..d88a93396abda --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/key.rs @@ -0,0 +1,205 @@ +use std::{cmp::min, hash::Hasher}; + +/// A trait for keys that can be used for hashing. +pub trait KeyBase { + /// Returns the length of the key in bytes. + fn len(&self) -> usize; + /// Hashes the key. It should not include the structure of the key, only the data. E.g. `([1, + /// 2], [3, 4])` should hash the same as `[1, 2, 3, 4]`. + fn hash(&self, state: &mut H); +} + +impl KeyBase for &'_ [u8] { + fn len(&self) -> usize { + <[u8]>::len(self) + } + + fn hash(&self, state: &mut H) { + for item in *self { + state.write_u8(*item); + } + } +} + +impl KeyBase for [u8; N] { + fn len(&self) -> usize { + self[..].len() + } + + fn hash(&self, state: &mut H) { + for item in self { + state.write_u8(*item); + } + } +} + +impl KeyBase for Vec { + fn len(&self) -> usize { + self.len() + } + + fn hash(&self, state: &mut H) { + for item in self { + state.write_u8(*item); + } + } +} + +impl KeyBase for u8 { + fn len(&self) -> usize { + 1 + } + + fn hash(&self, state: &mut H) { + state.write_u8(*self); + } +} + +impl KeyBase for (A, B) { + fn len(&self) -> usize { + let (a, b) = self; + a.len() + b.len() + } + + fn hash(&self, state: &mut H) { + let (a, b) = self; + KeyBase::hash(a, state); + KeyBase::hash(b, state); + } +} + +impl KeyBase for &'_ T { + fn len(&self) -> usize { + (*self).len() + } + + fn hash(&self, state: &mut H) { + (*self).hash(state) + } +} + +/// A trait for keys that can be used to query the database. They need to allow hashing and +/// comparison with a byte slice (total order). +pub trait QueryKey: KeyBase { + fn cmp(&self, key: &[u8]) -> std::cmp::Ordering; +} + +impl QueryKey for &'_ [u8] { + fn cmp(&self, key: &[u8]) -> std::cmp::Ordering { + Ord::cmp(self, &key) + } +} + +impl QueryKey for [u8; N] { + fn cmp(&self, key: &[u8]) -> std::cmp::Ordering { + Ord::cmp(&self[..], key) + } +} + +impl QueryKey for Vec { + fn cmp(&self, key: &[u8]) -> std::cmp::Ordering { + Ord::cmp(&**self, key) + } +} + +impl QueryKey for u8 { + fn cmp(&self, key: &[u8]) -> std::cmp::Ordering { + Ord::cmp(&[*self][..], key) + } +} + +impl QueryKey for (A, B) { + fn cmp(&self, mut key: &[u8]) -> std::cmp::Ordering { + let (a, b) = self; + let len = a.len(); + let key_len = key.len(); + let key_part = &key[..min(key_len, len)]; + match a.cmp(key_part) { + std::cmp::Ordering::Equal => { + key = &key[len..]; + b.cmp(key) + } + ord => ord, + } + } +} + +impl QueryKey for &'_ T { + fn cmp(&self, key: &[u8]) -> std::cmp::Ordering { + (*self).cmp(key) + } +} + +/// A trait for keys that can be stored in the database. They need to allow hashing and comparison. +pub trait StoreKey: KeyBase + Ord { + fn write_to(&self, buf: &mut Vec); +} + +impl StoreKey for Vec { + fn write_to(&self, buf: &mut Vec) { + buf.extend_from_slice(self); + } +} + +impl StoreKey for &'_ [u8] { + fn write_to(&self, buf: &mut Vec) { + buf.extend_from_slice(self); + } +} + +impl StoreKey for u8 { + fn write_to(&self, buf: &mut Vec) { + buf.push(*self); + } +} + +impl StoreKey for (A, B) { + fn write_to(&self, buf: &mut Vec) { + self.0.write_to(buf); + self.1.write_to(buf); + } +} + +impl StoreKey for &'_ T { + fn write_to(&self, buf: &mut Vec) { + (*self).write_to(buf); + } +} + +/// Hashes a key with a fast, deterministic hash function. +pub fn hash_key(key: &impl KeyBase) -> u64 { + let mut hasher = twox_hash::XxHash64::with_seed(0); + key.hash(&mut hasher); + hasher.finish() +} + +#[cfg(test)] +mod tests { + use std::cmp::Ordering; + + use crate::{key::hash_key, QueryKey}; + + #[test] + fn tuple() { + let key = (&[1, 2], &[3, 4]); + assert_eq!(QueryKey::cmp(&key, &[1, 2, 3, 4]), Ordering::Equal); + assert_eq!(QueryKey::cmp(&key, &[1, 2, 3, 3]), Ordering::Greater); + assert_eq!(QueryKey::cmp(&key, &[1, 2, 3, 5]), Ordering::Less); + assert_eq!(QueryKey::cmp(&key, &[0, 2, 3, 4]), Ordering::Greater); + assert_eq!(QueryKey::cmp(&key, &[2, 2, 3, 4]), Ordering::Less); + assert_eq!(QueryKey::cmp(&key, &[1, 2, 3, 4, 5]), Ordering::Less); + assert_eq!(QueryKey::cmp(&key, &[1, 2, 3]), Ordering::Greater); + assert_eq!(QueryKey::cmp(&key, &[1, 2]), Ordering::Greater); + assert_eq!(QueryKey::cmp(&key, &[1]), Ordering::Greater); + assert_eq!(QueryKey::cmp(&key, &[]), Ordering::Greater); + } + + #[test] + fn hash() { + let h1 = hash_key(&[1, 2, 3, 4]); + let h2 = hash_key(&(&[1, 2], &[3, 4])); + let h3 = hash_key(&(vec![1, 2, 3], 4u8)); + assert_eq!(h2, h1); + assert_eq!(h3, h1); + } +} diff --git a/turbopack/crates/turbo-persistence/src/lib.rs b/turbopack/crates/turbo-persistence/src/lib.rs new file mode 100644 index 0000000000000..3069de7069418 --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/lib.rs @@ -0,0 +1,24 @@ +#![feature(once_cell_try)] +#![feature(new_zeroed_alloc)] +#![feature(get_mut_unchecked)] + +mod arc_slice; +mod collector; +mod collector_entry; +mod compaction; +mod constants; +mod db; +mod key; +mod lookup_entry; +mod merge_iter; +mod static_sorted_file; +mod static_sorted_file_builder; +mod write_batch; + +#[cfg(test)] +mod tests; + +pub use arc_slice::ArcSlice; +pub use db::TurboPersistence; +pub use key::{QueryKey, StoreKey}; +pub use write_batch::WriteBatch; diff --git a/turbopack/crates/turbo-persistence/src/lookup_entry.rs b/turbopack/crates/turbo-persistence/src/lookup_entry.rs new file mode 100644 index 0000000000000..7095fd57fc9bb --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/lookup_entry.rs @@ -0,0 +1,66 @@ +use crate::{ + constants::MAX_SMALL_VALUE_SIZE, + static_sorted_file_builder::{Entry, EntryValue}, + ArcSlice, +}; + +/// A value from a SST file lookup. +pub enum LookupValue { + /// The value was deleted. + Deleted, + /// The value is stored in the SST file. + Slice { value: ArcSlice }, + /// The value is stored in a blob file. + Blob { sequence_number: u32 }, +} + +impl LookupValue { + /// Returns the size of the value in the SST file. + pub fn size_in_sst(&self) -> usize { + match self { + LookupValue::Slice { value } => value.len(), + LookupValue::Deleted => 0, + LookupValue::Blob { .. } => 0, + } + } +} + +/// An entry from a SST file lookup. +pub struct LookupEntry { + /// The hash of the key. + pub hash: u64, + /// The key. + pub key: ArcSlice, + /// The value. + pub value: LookupValue, +} + +impl Entry for LookupEntry { + fn key_hash(&self) -> u64 { + self.hash + } + + fn key_len(&self) -> usize { + self.key.len() + } + + fn write_key_to(&self, buf: &mut Vec) { + buf.extend_from_slice(&self.key); + } + + fn value(&self) -> EntryValue<'_> { + match &self.value { + LookupValue::Deleted => EntryValue::Deleted, + LookupValue::Slice { value } => { + if value.len() > MAX_SMALL_VALUE_SIZE { + EntryValue::Medium { value } + } else { + EntryValue::Small { value } + } + } + LookupValue::Blob { sequence_number } => EntryValue::Large { + blob: *sequence_number, + }, + } + } +} diff --git a/turbopack/crates/turbo-persistence/src/merge_iter.rs b/turbopack/crates/turbo-persistence/src/merge_iter.rs new file mode 100644 index 0000000000000..251ef32c26db5 --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/merge_iter.rs @@ -0,0 +1,79 @@ +use std::{cmp::Ordering, collections::BinaryHeap}; + +use anyhow::Result; + +use crate::lookup_entry::LookupEntry; + +/// An active iterator that is being merged. It has peeked the next element and can be compared +/// according to that element. The `order` is used when multiple iterators have the same key. +struct ActiveIterator>> { + iter: T, + order: usize, + entry: LookupEntry, +} + +impl>> PartialEq for ActiveIterator { + fn eq(&self, other: &Self) -> bool { + self.entry.hash == other.entry.hash && *self.entry.key == *other.entry.key + } +} + +impl>> Eq for ActiveIterator {} + +impl>> PartialOrd for ActiveIterator { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl>> Ord for ActiveIterator { + fn cmp(&self, other: &Self) -> Ordering { + self.entry + .hash + .cmp(&other.entry.hash) + .then_with(|| (*self.entry.key).cmp(&other.entry.key)) + .then_with(|| self.order.cmp(&other.order)) + .reverse() + } +} + +/// An iterator that merges multiple sorted iterators into a single sorted iterator. Internal it +/// uses an heap of iterators to iterate them in order. +pub struct MergeIter>> { + heap: BinaryHeap>, +} + +impl>> MergeIter { + pub fn new(iters: impl Iterator) -> Result { + let mut heap = BinaryHeap::new(); + for (order, mut iter) in iters.enumerate() { + if let Some(entry) = iter.next() { + let entry = entry?; + heap.push(ActiveIterator { iter, order, entry }); + } + } + Ok(Self { heap }) + } +} + +impl>> Iterator for MergeIter { + type Item = Result; + + fn next(&mut self) -> Option { + let ActiveIterator { + mut iter, + order, + entry, + } = self.heap.pop()?; + match iter.next() { + None => {} + Some(Err(e)) => return Some(Err(e)), + Some(Ok(next)) => self.heap.push(ActiveIterator { + iter, + order, + entry: next, + }), + } + Some(Ok(entry)) + } +} diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file.rs new file mode 100644 index 0000000000000..1de537c69ee3d --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file.rs @@ -0,0 +1,717 @@ +use std::{ + cmp::Ordering, + fs::File, + hash::BuildHasherDefault, + mem::{transmute, MaybeUninit}, + path::PathBuf, + sync::{Arc, OnceLock}, +}; + +use anyhow::{bail, Result}; +use byteorder::{ReadBytesExt, BE}; +use lzzzz::lz4::decompress_with_dict; +use memmap2::Mmap; +use quick_cache::sync::GuardResult; +use rustc_hash::FxHasher; + +use crate::{ + arc_slice::ArcSlice, + lookup_entry::{LookupEntry, LookupValue}, + QueryKey, +}; + +/// The block header for an index block. +pub const BLOCK_TYPE_INDEX: u8 = 0; +/// The block header for a key block. +pub const BLOCK_TYPE_KEY: u8 = 1; + +/// The tag for a small-sized value. +pub const KEY_BLOCK_ENTRY_TYPE_SMALL: u8 = 0; +/// The tag for the blob value. +pub const KEY_BLOCK_ENTRY_TYPE_BLOB: u8 = 1; +/// The tag for the deleted value. +pub const KEY_BLOCK_ENTRY_TYPE_DELETED: u8 = 2; +/// The tag for a medium-sized value. +pub const KEY_BLOCK_ENTRY_TYPE_MEDIUM: u8 = 3; + +/// The result of a lookup operation. +pub enum LookupResult { + /// The key was deleted. + Deleted, + /// The key was found and the value is a slice. + Slice { value: ArcSlice }, + /// The key was found and the value is a blob. + Blob { sequence_number: u32 }, + /// The key was not found because it is out of the range of this SST file. + RangeMiss, + /// The key was not found because it was not in the AQMF filter. But it was in the range. + QuickFilterMiss, + /// The key was not found. But it was in the range and the AQMF filter. + KeyMiss, +} + +impl From for LookupResult { + fn from(value: LookupValue) -> Self { + match value { + LookupValue::Deleted => LookupResult::Deleted, + LookupValue::Slice { value } => LookupResult::Slice { value }, + LookupValue::Blob { sequence_number } => LookupResult::Blob { sequence_number }, + } + } +} + +/// A byte range in the SST file. +struct LocationInFile { + start: usize, + end: usize, +} + +/// The read and parsed header of an SST file. +struct Header { + /// The key family stored in this file. + family: u32, + /// The minimum hash value in this file. + min_hash: u64, + /// The maximum hash value in this file. + max_hash: u64, + /// The location of the AQMF filter in the file. + aqmf: LocationInFile, + /// The location of the key compression dictionary in the file. + key_compression_dictionary: LocationInFile, + /// The location of the value compression dictionary in the file. + value_compression_dictionary: LocationInFile, + /// The byte offset where the block offsets start. + block_offsets_start: usize, + /// The byte offset where the blocks start. + blocks_start: usize, + /// The number of blocks in this file. + block_count: u16, +} + +/// The key family and hash range of an SST file. +#[derive(Clone, Copy)] +pub struct StaticSortedFileRange { + pub family: u32, + pub min_hash: u64, + pub max_hash: u64, +} + +#[derive(Clone, Default)] +pub struct AqmfWeighter; + +impl quick_cache::Weighter> for AqmfWeighter { + fn weight(&self, _key: &u32, filter: &Arc) -> u64 { + filter.capacity() + 1 + } +} + +#[derive(Clone, Default)] +pub struct BlockWeighter; + +impl quick_cache::Weighter<(u32, u16), ArcSlice> for BlockWeighter { + fn weight(&self, _key: &(u32, u16), val: &ArcSlice) -> u64 { + val.len() as u64 + 8 + } +} + +pub type AqmfCache = + quick_cache::sync::Cache, AqmfWeighter, BuildHasherDefault>; +pub type BlockCache = + quick_cache::sync::Cache<(u32, u16), ArcSlice, BlockWeighter, BuildHasherDefault>; + +/// A memory mapped SST file. +pub struct StaticSortedFile { + /// The sequence number of this file. + sequence_number: u32, + /// The memory mapped file. + mmap: Mmap, + /// The parsed header of this file. + header: OnceLock
, + /// The AQMF filter of this file. This is only used if the range is very large. Smaller ranges + /// use the AQMF cache instead. + aqmf: OnceLock, +} + +impl StaticSortedFile { + /// The sequence number of this file. + pub fn sequence_number(&self) -> u32 { + self.sequence_number + } + + /// Opens an SST file at the given path. This memory maps the file, but does not read it yet. + /// It's lazy read on demand. + pub fn open(sequence_number: u32, path: PathBuf) -> Result { + let mmap = unsafe { Mmap::map(&File::open(&path)?)? }; + let file = Self { + sequence_number, + mmap, + header: OnceLock::new(), + aqmf: OnceLock::new(), + }; + Ok(file) + } + + /// Reads and parses the header of this file if it hasn't been read yet. + fn header(&self) -> Result<&Header> { + self.header.get_or_try_init(|| { + let mut file = &*self.mmap; + let magic = file.read_u32::()?; + if magic != 0x53535401 { + bail!("Invalid magic number or version"); + } + let family = file.read_u32::()?; + let min_hash = file.read_u64::()?; + let max_hash = file.read_u64::()?; + let aqmf_length = file.read_u24::()? as usize; + let key_compression_dictionary_length = file.read_u16::()? as usize; + let value_compression_dictionary_length = file.read_u16::()? as usize; + let block_count = file.read_u16::()?; + const HEADER_SIZE: usize = 33; + let mut current_offset = HEADER_SIZE; + let aqmf = LocationInFile { + start: current_offset, + end: current_offset + aqmf_length, + }; + current_offset += aqmf_length; + let key_compression_dictionary = LocationInFile { + start: current_offset, + end: current_offset + key_compression_dictionary_length, + }; + current_offset += key_compression_dictionary_length; + let value_compression_dictionary = LocationInFile { + start: current_offset, + end: current_offset + value_compression_dictionary_length, + }; + current_offset += value_compression_dictionary_length; + let block_offsets_start = current_offset; + let blocks_start = block_offsets_start + block_count as usize * 4; + + Ok(Header { + family, + min_hash, + max_hash, + aqmf, + key_compression_dictionary, + value_compression_dictionary, + block_offsets_start, + blocks_start, + block_count, + }) + }) + } + + /// Returns the key family and hash range of this file. + pub fn range(&self) -> Result { + let header = self.header()?; + Ok(StaticSortedFileRange { + family: header.family, + min_hash: header.min_hash, + max_hash: header.max_hash, + }) + } + + /// Iterate over all entries in this file in sorted order. + pub fn iter<'l>( + &'l self, + key_block_cache: &'l BlockCache, + value_block_cache: &'l BlockCache, + ) -> Result> { + let header = self.header()?; + let mut iter = StaticSortedFileIter { + this: self, + key_block_cache, + value_block_cache, + header, + stack: Vec::new(), + current_key_block: None, + }; + iter.enter_block(header.block_count - 1)?; + Ok(iter) + } + + /// Looks up a key in this file. + pub fn lookup( + &self, + key_family: u32, + key_hash: u64, + key: &K, + aqmf_cache: &AqmfCache, + index_block_cache: &BlockCache, + key_block_cache: &BlockCache, + value_block_cache: &BlockCache, + ) -> Result { + let header = self.header()?; + if key_family != header.family || key_hash < header.min_hash || key_hash > header.max_hash { + return Ok(LookupResult::RangeMiss); + } + + let use_aqmf_cache = header.max_hash - header.min_hash < 1 << 62; + if use_aqmf_cache { + let aqmf = match aqmf_cache.get_value_or_guard(&self.sequence_number, None) { + GuardResult::Value(aqmf) => aqmf, + GuardResult::Guard(guard) => { + let aqmf = &self.mmap[header.aqmf.start..header.aqmf.end]; + let aqmf: Arc = Arc::new(pot::from_slice(aqmf)?); + let _ = guard.insert(aqmf.clone()); + aqmf + } + GuardResult::Timeout => unreachable!(), + }; + if !aqmf.contains_fingerprint(key_hash) { + return Ok(LookupResult::QuickFilterMiss); + } + } else { + let aqmf = self.aqmf.get_or_try_init(|| { + let aqmf = &self.mmap[header.aqmf.start..header.aqmf.end]; + anyhow::Ok(pot::from_slice(aqmf)?) + })?; + if !aqmf.contains_fingerprint(key_hash) { + return Ok(LookupResult::QuickFilterMiss); + } + } + let mut current_block = header.block_count - 1; + let mut cache = index_block_cache; + loop { + let block = self.get_key_block(header, current_block, cache)?; + cache = key_block_cache; + let mut block = &block[..]; + let block_type = block.read_u8()?; + match block_type { + BLOCK_TYPE_INDEX => { + current_block = self.lookup_index_block(block, key_hash)?; + } + BLOCK_TYPE_KEY => { + return self.lookup_key_block(block, key_hash, key, header, value_block_cache); + } + _ => { + bail!("Invalid block type"); + } + } + } + } + + /// Looks up a hash in a index block. + fn lookup_index_block(&self, mut block: &[u8], hash: u64) -> Result { + let first_block = block.read_u16::()?; + let entry_count = block.len() / 10; + if entry_count == 0 { + return Ok(first_block); + } + let entries = block; + fn get_hash(entries: &[u8], index: usize) -> Result { + Ok((&entries[index * 10..]).read_u64::()?) + } + fn get_block(entries: &[u8], index: usize) -> Result { + Ok((&entries[index * 10 + 8..]).read_u16::()?) + } + let first_hash = get_hash(entries, 0)?; + match hash.cmp(&first_hash) { + Ordering::Less => { + return Ok(first_block); + } + Ordering::Equal => { + return get_block(entries, 0); + } + Ordering::Greater => {} + } + + let mut l = 1; + let mut r = entry_count; + // binary search for the range + while l < r { + let m = (l + r) / 2; + let mid_hash = get_hash(entries, m)?; + match hash.cmp(&mid_hash) { + Ordering::Less => { + r = m; + } + Ordering::Equal => { + return get_block(entries, m); + } + Ordering::Greater => { + l = m + 1; + } + } + } + get_block(entries, l - 1) + } + + /// Looks up a key in a key block and the value in a value block. + fn lookup_key_block( + &self, + mut block: &[u8], + key_hash: u64, + key: &K, + header: &Header, + value_block_cache: &BlockCache, + ) -> Result { + let entry_count = block.read_u24::()? as usize; + let offsets = &block[..entry_count * 4]; + let entries = &block[entry_count * 4..]; + + let mut l = 0; + let mut r = entry_count; + // binary search for the key + while l < r { + let m = (l + r) / 2; + let GetKeyEntryResult { + hash: mid_hash, + key: mid_key, + ty, + val: mid_val, + } = get_key_entry(offsets, entries, entry_count, m)?; + match key_hash.cmp(&mid_hash).then_with(|| key.cmp(mid_key)) { + Ordering::Less => { + r = m; + } + Ordering::Equal => { + return Ok(self + .handle_key_match(ty, mid_val, header, value_block_cache)? + .into()); + } + Ordering::Greater => { + l = m + 1; + } + } + } + Ok(LookupResult::KeyMiss) + } + + /// Handles a key match by looking up the value. + fn handle_key_match( + &self, + ty: u8, + mut val: &[u8], + header: &Header, + value_block_cache: &BlockCache, + ) -> Result { + Ok(match ty { + KEY_BLOCK_ENTRY_TYPE_SMALL => { + let block = val.read_u16::()?; + let size = val.read_u16::()? as usize; + let position = val.read_u32::()? as usize; + let value = self + .get_value_block(header, block, value_block_cache)? + .slice(position..position + size); + LookupValue::Slice { value } + } + KEY_BLOCK_ENTRY_TYPE_MEDIUM => { + let block = val.read_u16::()?; + let value = self.read_value_block(header, block)?; + LookupValue::Slice { value } + } + KEY_BLOCK_ENTRY_TYPE_BLOB => { + let sequence_number = val.read_u32::()?; + LookupValue::Blob { sequence_number } + } + KEY_BLOCK_ENTRY_TYPE_DELETED => LookupValue::Deleted, + _ => { + bail!("Invalid key block entry type"); + } + }) + } + + /// Gets a key block from the cache or reads it from the file. + fn get_key_block( + &self, + header: &Header, + block: u16, + key_block_cache: &BlockCache, + ) -> Result, anyhow::Error> { + Ok( + match key_block_cache.get_value_or_guard(&(self.sequence_number, block), None) { + GuardResult::Value(block) => block, + GuardResult::Guard(guard) => { + let block = self.read_key_block(header, block)?; + let _ = guard.insert(block.clone()); + block + } + GuardResult::Timeout => unreachable!(), + }, + ) + } + + /// Gets a value block from the cache or reads it from the file. + fn get_value_block( + &self, + header: &Header, + block: u16, + value_block_cache: &BlockCache, + ) -> Result> { + let block = match value_block_cache.get_value_or_guard(&(self.sequence_number, block), None) + { + GuardResult::Value(block) => block, + GuardResult::Guard(guard) => { + let block = self.read_value_block(header, block)?; + let _ = guard.insert(block.clone()); + block + } + GuardResult::Timeout => unreachable!(), + }; + Ok(block) + } + + /// Reads a key block from the file. + fn read_key_block(&self, header: &Header, block_index: u16) -> Result> { + self.read_block( + header, + block_index, + &self.mmap + [header.key_compression_dictionary.start..header.key_compression_dictionary.end], + ) + } + + /// Reads a value block from the file. + fn read_value_block(&self, header: &Header, block_index: u16) -> Result> { + self.read_block( + header, + block_index, + &self.mmap[header.value_compression_dictionary.start + ..header.value_compression_dictionary.end], + ) + } + + /// Reads a block from the file. + fn read_block( + &self, + header: &Header, + block_index: u16, + compression_dictionary: &[u8], + ) -> Result> { + #[cfg(feature = "strict_checks")] + if block_index >= header.block_count { + bail!( + "Corrupted file seq:{} block:{} > number of blocks {} (block_offsets: {:x}, \ + blocks: {:x})", + self.sequence_number, + block_index, + header.block_count, + header.block_offsets_start, + header.blocks_start + ); + } + let offset = header.block_offsets_start + block_index as usize * 4; + #[cfg(feature = "strict_checks")] + if offset + 4 > self.mmap.len() { + bail!( + "Corrupted file seq:{} block:{} block offset locations {} + 4 bytes > file end {} \ + (block_offsets: {:x}, blocks: {:x})", + self.sequence_number, + block_index, + offset, + self.mmap.len(), + header.block_offsets_start, + header.blocks_start + ); + } + let block_start = if block_index == 0 { + header.blocks_start + } else { + header.blocks_start + (&self.mmap[offset - 4..offset]).read_u32::()? as usize + }; + let block_end = + header.blocks_start + (&self.mmap[offset..offset + 4]).read_u32::()? as usize; + #[cfg(feature = "strict_checks")] + if block_end > self.mmap.len() || block_start > self.mmap.len() { + bail!( + "Corrupted file seq:{} block:{} block {} - {} > file end {} (block_offsets: {:x}, \ + blocks: {:x})", + self.sequence_number, + block_index, + block_start, + block_end, + self.mmap.len(), + header.block_offsets_start, + header.blocks_start + ); + } + let uncompressed_length = + (&self.mmap[block_start..block_start + 4]).read_u32::()? as usize; + let block = self.mmap[block_start + 4..block_end].to_vec(); + + let buffer = Arc::new_zeroed_slice(uncompressed_length); + // Safety: MaybeUninit can be safely transmuted to u8. + let mut buffer = unsafe { transmute::]>, Arc<[u8]>>(buffer) }; + // Safety: We know that the buffer is not shared yet. + let decompressed = unsafe { Arc::get_mut_unchecked(&mut buffer) }; + decompress_with_dict(&block, decompressed, compression_dictionary)?; + Ok(ArcSlice::from(buffer)) + } +} + +/// An iterator over all entries in a SST file in sorted order. +pub struct StaticSortedFileIter<'l> { + this: &'l StaticSortedFile, + key_block_cache: &'l BlockCache, + value_block_cache: &'l BlockCache, + header: &'l Header, + + stack: Vec, + current_key_block: Option, +} + +struct CurrentKeyBlock { + offsets: ArcSlice, + entries: ArcSlice, + entry_count: usize, + index: usize, +} + +struct CurrentIndexBlock { + entries: ArcSlice, + block_indicies_count: usize, + index: usize, +} + +impl Iterator for StaticSortedFileIter<'_> { + type Item = Result; + + fn next(&mut self) -> Option { + self.next_internal().transpose() + } +} + +impl StaticSortedFileIter<'_> { + /// Enters a block at the given index. + fn enter_block(&mut self, block_index: u16) -> Result<()> { + let block_arc = self + .this + .get_key_block(self.header, block_index, self.key_block_cache)?; + let mut block = &*block_arc; + let block_type = block.read_u8()?; + match block_type { + BLOCK_TYPE_INDEX => { + let block_indicies_count = (block.len() + 8) / 10; + let range = 1..block_arc.len(); + self.stack.push(CurrentIndexBlock { + entries: block_arc.slice(range), + block_indicies_count, + index: 0, + }); + } + BLOCK_TYPE_KEY => { + let entry_count = block.read_u24::()? as usize; + let offsets_range = 4..4 + entry_count * 4; + let entries_range = 4 + entry_count * 4..block_arc.len(); + let offsets = block_arc.clone().slice(offsets_range); + let entries = block_arc.slice(entries_range); + self.current_key_block = Some(CurrentKeyBlock { + offsets, + entries, + entry_count, + index: 0, + }); + } + _ => { + bail!("Invalid block type"); + } + } + Ok(()) + } + + /// Gets the next entry in the file and moves the cursor. + fn next_internal(&mut self) -> Result> { + loop { + if let Some(CurrentKeyBlock { + offsets, + entries, + entry_count, + index, + }) = self.current_key_block.take() + { + let GetKeyEntryResult { hash, key, ty, val } = + get_key_entry(&offsets, &entries, entry_count, index)?; + let value = + self.this + .handle_key_match(ty, val, self.header, self.value_block_cache)?; + let entry = LookupEntry { + hash, + // Safety: The key is a valid slice of the entries. + key: unsafe { ArcSlice::new_unchecked(key, ArcSlice::full_arc(&entries)) }, + value, + }; + if index + 1 < entry_count { + self.current_key_block = Some(CurrentKeyBlock { + offsets, + entries, + entry_count, + index: index + 1, + }); + } + return Ok(Some(entry)); + } + if let Some(CurrentIndexBlock { + entries, + block_indicies_count, + index, + }) = self.stack.pop() + { + let block_index = (&entries[index * 10..]).read_u16::()?; + if index + 1 < block_indicies_count { + self.stack.push(CurrentIndexBlock { + entries, + block_indicies_count, + index: index + 1, + }); + } + self.enter_block(block_index)?; + } else { + return Ok(None); + } + } + } +} + +struct GetKeyEntryResult<'l> { + hash: u64, + key: &'l [u8], + ty: u8, + val: &'l [u8], +} + +/// Reads a key entry from a key block. +fn get_key_entry<'l>( + offsets: &[u8], + entries: &'l [u8], + entry_count: usize, + index: usize, +) -> Result> { + let mut offset = &offsets[index * 4..]; + let ty = offset.read_u8()?; + let start = offset.read_u24::()? as usize; + let end = if index == entry_count - 1 { + entries.len() + } else { + (&offsets[(index + 1) * 4 + 1..]).read_u24::()? as usize + }; + let hash = (&entries[start..start + 8]).read_u64::()?; + Ok(match ty { + KEY_BLOCK_ENTRY_TYPE_SMALL => GetKeyEntryResult { + hash, + key: &entries[start + 8..end - 8], + ty, + val: &entries[end - 8..end], + }, + KEY_BLOCK_ENTRY_TYPE_MEDIUM => GetKeyEntryResult { + hash, + key: &entries[start + 8..end - 2], + ty, + val: &entries[end - 2..end], + }, + KEY_BLOCK_ENTRY_TYPE_BLOB => GetKeyEntryResult { + hash, + key: &entries[start + 8..end - 4], + ty, + val: &entries[end - 4..end], + }, + KEY_BLOCK_ENTRY_TYPE_DELETED => GetKeyEntryResult { + hash, + key: &entries[start + 8..end], + ty, + val: &[], + }, + _ => { + bail!("Invalid key block entry type"); + } + }) +} diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs new file mode 100644 index 0000000000000..03dfc6ef21c0d --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs @@ -0,0 +1,532 @@ +use std::{ + cmp::min, + fs::File, + io::{self, BufWriter, Write}, + path::Path, +}; + +use anyhow::{Context, Result}; +use byteorder::{ByteOrder, WriteBytesExt, BE}; +use lzzzz::lz4::{max_compressed_size, ACC_LEVEL_DEFAULT}; + +use crate::static_sorted_file::{ + BLOCK_TYPE_INDEX, BLOCK_TYPE_KEY, KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED, + KEY_BLOCK_ENTRY_TYPE_MEDIUM, KEY_BLOCK_ENTRY_TYPE_SMALL, +}; + +/// The maximum number of entries that should go into a single key block +const MAX_KEY_BLOCK_ENTRIES: usize = 100 * 1024; +/// The maximum bytes that should go into a single key block +// Note this must fit into 3 bytes length +const MAX_KEY_BLOCK_SIZE: usize = 16 * 1024; +/// Overhead of bytes that should be counted for entries in a key block in addition to the key size +const KEY_BLOCK_ENTRY_META_OVERHEAD: usize = 8; +/// The maximum number of entries that should go into a single small value block +const MAX_SMALL_VALUE_BLOCK_ENTRIES: usize = 100 * 1024; +/// The maximum bytes that should go into a single small value block +const MAX_SMALL_VALUE_BLOCK_SIZE: usize = 16 * 1024; +/// The aimed false positive rate for the AQMF +const AQMF_FALSE_POSITIVE_RATE: f64 = 0.01; + +/// The maximum compression dictionay size for value blocks +const VALUE_COMPRESSION_DICTIONARY_SIZE: usize = 64 * 1024 - 1; +/// The maximum compression dictionay size for key and index blocks +const KEY_COMPRESSION_DICTIONARY_SIZE: usize = 64 * 1024 - 1; +/// The maximum bytes that should be selected as value samples to create a compression dictionary +const VALUE_COMPRESSION_SAMPLES_SIZE: usize = 256 * 1024; +/// The maximum bytes that should be selected as key samples to create a compression dictionary +const KEY_COMPRESSION_SAMPLES_SIZE: usize = 256 * 1024; +/// The minimum bytes that should be selected as value samples. Below that no compression dictionary +/// is used. +const MIN_VALUE_COMPRESSION_SAMPLES_SIZE: usize = 1024; +/// The minimum bytes that should be selected as key samples. Below that no compression dictionary +/// is used. +const MIN_KEY_COMPRESSION_SAMPLES_SIZE: usize = 1024; +/// The bytes that are used per key/value entry for a sample. +const COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY: usize = 100; + +/// Trait for entries from that SST files can be created +pub trait Entry { + /// Returns the hash of the key + fn key_hash(&self) -> u64; + /// Returns the length of the key + fn key_len(&self) -> usize; + /// Writes the key to a buffer + fn write_key_to(&self, buf: &mut Vec); + + /// Returns the value + fn value(&self) -> EntryValue<'_>; +} + +/// Reference to a value +#[derive(Copy, Clone)] +pub enum EntryValue<'l> { + /// Small-sized value. They are stored in shared value blocks. + Small { value: &'l [u8] }, + /// Medium-sized value. They are stored in their own value block. + Medium { value: &'l [u8] }, + /// Large-sized value. They are stored in a blob file. + Large { blob: u32 }, + /// Tombstone. The value was removed. + Deleted, +} + +#[derive(Debug, Default)] +pub struct StaticSortedFileBuilder { + family: u32, + aqmf: Vec, + key_compression_dictionary: Vec, + value_compression_dictionary: Vec, + blocks: Vec<(u32, Vec)>, + min_hash: u64, + max_hash: u64, +} + +impl StaticSortedFileBuilder { + pub fn new( + family: u32, + entries: &[E], + total_key_size: usize, + total_value_size: usize, + ) -> Result { + debug_assert!(entries.iter().map(|e| e.key_hash()).is_sorted()); + let mut builder = Self { + family, + min_hash: entries.first().map(|e| e.key_hash()).unwrap_or(u64::MAX), + max_hash: entries.last().map(|e| e.key_hash()).unwrap_or(0), + ..Default::default() + }; + builder.compute_aqmf(entries); + builder.compute_compression_dictionary(entries, total_key_size, total_value_size)?; + builder.compute_blocks(entries); + Ok(builder) + } + + /// Computes a AQMF from the keys of all entries. + fn compute_aqmf(&mut self, entries: &[E]) { + let mut filter = qfilter::Filter::new(entries.len() as u64, AQMF_FALSE_POSITIVE_RATE) + // This won't fail as we limit the number of entries per SST file + .expect("Filter can't be constructed"); + for entry in entries { + filter + .insert_fingerprint(false, entry.key_hash()) + // This can't fail as we allocated enough capacity + .expect("AQMF insert failed"); + } + self.aqmf = pot::to_vec(&filter).expect("AQMF serialization failed"); + } + + /// Computes compression dictionaries from keys and values of all entries + fn compute_compression_dictionary( + &mut self, + entries: &[E], + total_key_size: usize, + total_value_size: usize, + ) -> Result<()> { + if total_key_size < MIN_KEY_COMPRESSION_SAMPLES_SIZE + && total_value_size < MIN_VALUE_COMPRESSION_SAMPLES_SIZE + { + return Ok(()); + } + let key_compression_samples_size = min(KEY_COMPRESSION_SAMPLES_SIZE, total_key_size / 10); + let value_compression_samples_size = + min(VALUE_COMPRESSION_SAMPLES_SIZE, total_value_size / 10); + let mut value_samples = Vec::with_capacity(value_compression_samples_size); + let mut value_sample_sizes = Vec::new(); + let mut key_samples = Vec::with_capacity(key_compression_samples_size); + let mut key_sample_sizes = Vec::new(); + let mut i = 12345678 % entries.len(); + let mut j = 0; + loop { + let entry = &entries[i]; + let value_remaining = value_compression_samples_size - value_samples.len(); + let key_remaining = key_compression_samples_size - key_samples.len(); + if value_remaining > 0 { + if let EntryValue::Small { value } | EntryValue::Medium { value } = entry.value() { + let value = if value.len() <= COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY { + value + } else { + j = (j + 12345678) + % (value.len() - COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY); + &value[j..j + COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY] + }; + if value.len() <= value_remaining { + value_sample_sizes.push(value.len()); + value_samples.extend_from_slice(value); + } else { + value_sample_sizes.push(value_remaining); + value_samples.extend_from_slice(&value[..value_remaining]); + } + } + } + if key_remaining > 0 { + let used_len = min(key_remaining, COMPRESSION_DICTIONARY_SAMPLE_PER_ENTRY); + if entry.key_len() <= used_len { + key_sample_sizes.push(entry.key_len()); + entry.write_key_to(&mut key_samples); + } else { + let mut temp = Vec::with_capacity(entry.key_len()); + entry.write_key_to(&mut temp); + debug_assert!(temp.len() == entry.key_len()); + + j = (j + 12345678) % (temp.len() - used_len); + key_sample_sizes.push(used_len); + key_samples.extend_from_slice(&temp[j..j + used_len]); + } + } + if key_remaining == 0 && value_remaining == 0 { + break; + } + i = (i + 12345678) % entries.len(); + } + assert!(key_samples.len() == key_sample_sizes.iter().sum::()); + assert!(value_samples.len() == value_sample_sizes.iter().sum::()); + if key_samples.len() > MIN_KEY_COMPRESSION_SAMPLES_SIZE && key_sample_sizes.len() > 5 { + self.key_compression_dictionary = zstd::dict::from_continuous( + &key_samples, + &key_sample_sizes, + KEY_COMPRESSION_DICTIONARY_SIZE, + ) + .context("Key dictionary creation failed")?; + } + if value_samples.len() > MIN_VALUE_COMPRESSION_SAMPLES_SIZE && value_sample_sizes.len() > 5 + { + self.value_compression_dictionary = zstd::dict::from_continuous( + &value_samples, + &value_sample_sizes, + VALUE_COMPRESSION_DICTIONARY_SIZE, + ) + .context("Value dictionary creation failed")?; + } + Ok(()) + } + + /// Compute index, key and value blocks. + fn compute_blocks(&mut self, entries: &[E]) { + // TODO implement multi level index + // TODO place key and value block near to each other + + // For now we use something simple to implement: + // Start with Value blocks + // And then Key blocks + // Last block is Index block + + // Store the locations of the values + let mut value_locations: Vec<(usize, usize)> = Vec::with_capacity(entries.len()); + + // Split the values into blocks + let mut current_block_start = 0; + let mut current_block_count = 0; + let mut current_block_size = 0; + for (i, entry) in entries.iter().enumerate() { + match entry.value() { + EntryValue::Small { value } => { + if current_block_size + value.len() > MAX_SMALL_VALUE_BLOCK_SIZE + || current_block_count + 1 >= MAX_SMALL_VALUE_BLOCK_ENTRIES + { + let block_index = self.blocks.len(); + let mut block = Vec::with_capacity(current_block_size); + for j in current_block_start..i { + if let EntryValue::Small { value } = &entries[j].value() { + block.extend_from_slice(value); + value_locations[j].0 = block_index; + } + } + self.blocks.push(self.compress_value_block(&block)); + current_block_start = i; + current_block_size = 0; + current_block_count = 0; + } + value_locations.push((0, current_block_size)); + current_block_size += value.len(); + current_block_count += 1; + } + EntryValue::Medium { value } => { + value_locations.push((self.blocks.len(), value.len())); + self.blocks.push(self.compress_value_block(value)); + } + _ => { + value_locations.push((0, 0)); + } + } + } + if current_block_count > 0 { + let block_index = self.blocks.len(); + let mut block = Vec::with_capacity(current_block_size); + for j in current_block_start..entries.len() { + if let EntryValue::Small { value } = &entries[j].value() { + block.extend_from_slice(value); + value_locations[j].0 = block_index; + } + } + self.blocks.push(self.compress_value_block(&block)); + } + + let mut key_block_boundaries = Vec::new(); + + // Split the keys into blocks + fn add_entry_to_block( + entry: &E, + value_location: &(usize, usize), + block: &mut KeyBlockBuilder, + ) { + match entry.value() { + EntryValue::Small { value } => { + block.put_small( + entry, + value_location.0.try_into().unwrap(), + value_location.1.try_into().unwrap(), + value.len().try_into().unwrap(), + ); + } + EntryValue::Medium { .. } => { + block.put_medium(entry, value_location.0.try_into().unwrap()); + } + EntryValue::Large { blob } => { + block.put_blob(entry, blob); + } + EntryValue::Deleted => { + block.delete(entry); + } + } + } + let mut current_block_start = 0; + let mut current_block_size = 0; + for (i, entry) in entries.iter().enumerate() { + if current_block_size > 0 + && (current_block_size + entry.key_len() + KEY_BLOCK_ENTRY_META_OVERHEAD + > MAX_KEY_BLOCK_SIZE + || i - current_block_start >= MAX_KEY_BLOCK_ENTRIES) && + // avoid breaking the block in the middle of a hash conflict + entries[i - 1].key_hash() != entry.key_hash() + { + let mut block = KeyBlockBuilder::new((i - current_block_start) as u32); + for j in current_block_start..i { + let entry = &entries[j]; + let value_location = &value_locations[j]; + add_entry_to_block(entry, value_location, &mut block); + } + key_block_boundaries + .push((entries[current_block_start].key_hash(), self.blocks.len())); + self.blocks.push(self.compress_key_block(&block.finish())); + current_block_size = 0; + current_block_start = i; + } + current_block_size += entry.key_len() + KEY_BLOCK_ENTRY_META_OVERHEAD; + } + if current_block_size > 0 { + let mut block = KeyBlockBuilder::new((entries.len() - current_block_start) as u32); + for j in current_block_start..entries.len() { + let entry = &entries[j]; + let value_location = &value_locations[j]; + add_entry_to_block(entry, value_location, &mut block); + } + key_block_boundaries.push((entries[current_block_start].key_hash(), self.blocks.len())); + self.blocks.push(self.compress_key_block(&block.finish())); + } + + // Compute the index + let mut index_block = IndexBlockBuilder::new( + key_block_boundaries.len() as u16, + key_block_boundaries[0].1 as u16, + ); + for (hash, block) in &key_block_boundaries[1..] { + index_block.put(*hash, *block as u16); + } + self.blocks + .push(self.compress_key_block(&index_block.finish())); + } + + /// Compresses a block with a compression dictionary. + fn compress_block(&self, block: &[u8], dict: &[u8]) -> (u32, Vec) { + let mut compressor = + lzzzz::lz4::Compressor::with_dict(dict).expect("LZ4 compressor creation failed"); + let mut compressed = Vec::with_capacity(max_compressed_size(block.len())); + compressor + .next_to_vec(block, &mut compressed, ACC_LEVEL_DEFAULT) + .expect("Compression failed"); + if compressed.capacity() > compressed.len() * 2 { + compressed.shrink_to_fit(); + } + (block.len().try_into().unwrap(), compressed) + } + + /// Compresses an index or key block. + fn compress_key_block(&self, block: &[u8]) -> (u32, Vec) { + self.compress_block(block, &self.key_compression_dictionary) + } + + /// Compresses a value block. + fn compress_value_block(&self, block: &[u8]) -> (u32, Vec) { + self.compress_block(block, &self.value_compression_dictionary) + } + + /// Writes the SST file. + pub fn write(&self, file: &Path) -> io::Result { + let mut file = BufWriter::new(File::create(file)?); + // magic number and version + file.write_u32::(0x53535401)?; + // family + file.write_u32::(self.family)?; + // min hash + file.write_u64::(self.min_hash)?; + // max hash + file.write_u64::(self.max_hash)?; + // AQMF length + file.write_u24::(self.aqmf.len().try_into().unwrap())?; + // Key compression dictionary length + file.write_u16::(self.key_compression_dictionary.len().try_into().unwrap())?; + // Value compression dictionary length + file.write_u16::(self.value_compression_dictionary.len().try_into().unwrap())?; + // Number of blocks + file.write_u16::(self.blocks.len().try_into().unwrap())?; + + // Write the AQMF + file.write_all(&self.aqmf)?; + // Write the key compression dictionary + file.write_all(&self.key_compression_dictionary)?; + // Write the value compression dictionary + file.write_all(&self.value_compression_dictionary)?; + + // Write the blocks + let mut offset = 0; + for (_, block) in &self.blocks { + // Block length (including the uncompressed length field) + let len = block.len() + 4; + offset += len; + file.write_u32::(offset.try_into().unwrap())?; + } + for (uncompressed_size, block) in &self.blocks { + // Uncompressed size + file.write_u32::(*uncompressed_size)?; + // Compressed block + file.write_all(block)?; + } + Ok(file.into_inner()?) + } +} + +/// Builder for a single key block +pub struct KeyBlockBuilder { + current_entry: usize, + header_size: usize, + data: Vec, +} + +/// The size of the key block header. +const KEY_BLOCK_HEADER_SIZE: usize = 4; + +impl KeyBlockBuilder { + /// Creates a new key block builder for the number of entries. + pub fn new(entry_count: u32) -> Self { + debug_assert!(entry_count < (1 << 24)); + + const ESTIMATED_KEY_SIZE: usize = 16; + let mut data = Vec::with_capacity(entry_count as usize * ESTIMATED_KEY_SIZE); + data.write_u8(BLOCK_TYPE_KEY).unwrap(); + data.write_u24::(entry_count).unwrap(); + for _ in 0..entry_count { + data.write_u32::(0).unwrap(); + } + Self { + current_entry: 0, + header_size: data.len(), + data, + } + } + + /// Writes a small-sized value to the buffer. + pub fn put_small( + &mut self, + entry: &E, + value_block: u16, + value_offset: u32, + value_size: u16, + ) { + let pos = self.data.len() - self.header_size; + let header_offset = KEY_BLOCK_HEADER_SIZE + self.current_entry * 4; + let header = (pos as u32) | ((KEY_BLOCK_ENTRY_TYPE_SMALL as u32) << 24); + BE::write_u32(&mut self.data[header_offset..header_offset + 4], header); + + self.data.write_u64::(entry.key_hash()).unwrap(); + entry.write_key_to(&mut self.data); + self.data.write_u16::(value_block).unwrap(); + self.data.write_u16::(value_size).unwrap(); + self.data.write_u32::(value_offset).unwrap(); + + self.current_entry += 1; + } + + /// Writes a medium-sized value to the buffer. + pub fn put_medium(&mut self, entry: &E, value_block: u16) { + let pos = self.data.len() - self.header_size; + let header_offset = KEY_BLOCK_HEADER_SIZE + self.current_entry * 4; + let header = (pos as u32) | ((KEY_BLOCK_ENTRY_TYPE_MEDIUM as u32) << 24); + BE::write_u32(&mut self.data[header_offset..header_offset + 4], header); + + self.data.write_u64::(entry.key_hash()).unwrap(); + entry.write_key_to(&mut self.data); + self.data.write_u16::(value_block).unwrap(); + + self.current_entry += 1; + } + + /// Writes a tombstone to the buffer. + pub fn delete(&mut self, entry: &E) { + let pos = self.data.len() - self.header_size; + let header_offset = KEY_BLOCK_HEADER_SIZE + self.current_entry * 4; + let header = (pos as u32) | ((KEY_BLOCK_ENTRY_TYPE_DELETED as u32) << 24); + BE::write_u32(&mut self.data[header_offset..header_offset + 4], header); + + self.data.write_u64::(entry.key_hash()).unwrap(); + entry.write_key_to(&mut self.data); + + self.current_entry += 1; + } + + /// Writes a blob value to the buffer. + pub fn put_blob(&mut self, entry: &E, blob: u32) { + let pos = self.data.len() - self.header_size; + let header_offset = KEY_BLOCK_HEADER_SIZE + self.current_entry * 4; + let header = (pos as u32) | ((KEY_BLOCK_ENTRY_TYPE_BLOB as u32) << 24); + BE::write_u32(&mut self.data[header_offset..header_offset + 4], header); + + self.data.write_u64::(entry.key_hash()).unwrap(); + entry.write_key_to(&mut self.data); + self.data.write_u32::(blob).unwrap(); + + self.current_entry += 1; + } + + /// Returns the key block buffer + pub fn finish(self) -> Vec { + self.data + } +} + +/// Builder for a single index block. +pub struct IndexBlockBuilder { + data: Vec, +} + +impl IndexBlockBuilder { + /// Creates a new builder for an index block with the specified number of entries and a pointer + /// to the first block. + pub fn new(entry_count: u16, first_block: u16) -> Self { + let mut data = Vec::with_capacity(entry_count as usize * 10 + 3); + data.write_u8(BLOCK_TYPE_INDEX).unwrap(); + data.write_u16::(first_block).unwrap(); + Self { data } + } + + /// Adds a hash boundary to the index block. + pub fn put(&mut self, hash: u64, block: u16) { + self.data.write_u64::(hash).unwrap(); + self.data.write_u16::(block).unwrap(); + } + + /// Returns the index block buffer + fn finish(self) -> Vec { + self.data + } +} diff --git a/turbopack/crates/turbo-persistence/src/tests.rs b/turbopack/crates/turbo-persistence/src/tests.rs new file mode 100644 index 0000000000000..6dee6cd81721a --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/tests.rs @@ -0,0 +1,347 @@ +use std::time::Instant; + +use anyhow::Result; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; + +use crate::{db::TurboPersistence, write_batch::WriteBatch}; + +#[test] +fn full_cycle() -> Result<()> { + let mut test_cases = Vec::new(); + type TestCases = Vec<( + &'static str, + Box, 16>) -> Result<()>>, + Box Result<()>>, + )>; + + fn test_case( + test_cases: &mut TestCases, + name: &'static str, + write: impl Fn(&mut WriteBatch, 16>) -> Result<()> + 'static, + read: impl Fn(&TurboPersistence) -> Result<()> + 'static, + ) { + test_cases.push(( + name, + Box::new(write) as Box, 16>) -> Result<()>>, + Box::new(read) as Box Result<()>>, + )); + } + + test_case( + &mut test_cases, + "Simple", + |batch| { + for i in 10..100u8 { + batch.put(0, vec![i], vec![i].into())?; + } + Ok(()) + }, + |db| { + let Some(value) = db.get(0, &[42u8])? else { + panic!("Value not found"); + }; + assert_eq!(&*value, &[42]); + assert_eq!(db.get(0, &[42u8, 42])?, None); + assert_eq!(db.get(0, &[1u8])?, None); + assert_eq!(db.get(0, &[255u8])?, None); + Ok(()) + }, + ); + + test_case( + &mut test_cases, + "Families", + |batch| { + for i in 0..16u8 { + batch.put(i as usize, vec![i], vec![i].into())?; + } + Ok(()) + }, + |db| { + let Some(value) = db.get(8, &[8u8])? else { + panic!("Value not found"); + }; + assert_eq!(&*value, &[8]); + assert!(db.get(8, &[8u8, 8])?.is_none()); + assert!(db.get(8, &[0u8])?.is_none()); + assert!(db.get(8, &[255u8])?.is_none()); + Ok(()) + }, + ); + + test_case( + &mut test_cases, + "Medium keys and values", + |batch| { + for i in 0..200u8 { + batch.put(0, vec![i; 10 * 1024], vec![i; 100 * 1024].into())?; + } + Ok(()) + }, + |db| { + for i in 0..200u8 { + let Some(value) = db.get(0, &vec![i; 10 * 1024])? else { + panic!("Value not found"); + }; + assert_eq!(&*value, &vec![i; 100 * 1024]); + } + Ok(()) + }, + ); + + test_case( + &mut test_cases, + "Large keys and values (blob files)", + |batch| { + for i in 0..20u8 { + batch.put( + 0, + vec![i; 10 * 1024 * 1024], + vec![i; 10 * 1024 * 1024].into(), + )?; + } + Ok(()) + }, + |db| { + for i in 0..20u8 { + let Some(value) = db.get(0, &vec![i; 10 * 1024 * 1024])? else { + panic!("Value not found"); + }; + assert_eq!(&*value, &vec![i; 10 * 1024 * 1024]); + } + Ok(()) + }, + ); + + test_case( + &mut test_cases, + "Different sizes keys and values", + |batch| { + for i in 100..200u8 { + batch.put(0, vec![i; i as usize], vec![i; i as usize].into())?; + } + Ok(()) + }, + |db| { + for i in 100..200u8 { + let Some(value) = db.get(0, &vec![i; i as usize])? else { + panic!("Value not found"); + }; + assert_eq!(&*value, &vec![i; i as usize]); + } + Ok(()) + }, + ); + + test_case( + &mut test_cases, + "Many items (1% read)", + |batch| { + for i in 0..1000 * 1024u32 { + batch.put(0, i.to_be_bytes().into(), i.to_be_bytes().to_vec().into())?; + } + Ok(()) + }, + |db| { + for i in 0..10 * 1024u32 { + let i = i * 100; + let Some(value) = db.get(0, &i.to_be_bytes())? else { + panic!("Value not found"); + }; + assert_eq!(&*value, &i.to_be_bytes()); + } + Ok(()) + }, + ); + + test_case( + &mut test_cases, + "Many items (1% read, multi-threaded)", + |batch| { + (0..10 * 1024 * 1024u32).into_par_iter().for_each(|i| { + batch + .put(0, i.to_be_bytes().into(), i.to_be_bytes().to_vec().into()) + .unwrap(); + }); + Ok(()) + }, + |db| { + (0..100 * 1024u32).into_par_iter().for_each(|i| { + let i = i * 100; + let Some(value) = db.get(0, &i.to_be_bytes()).unwrap() else { + panic!("Value not found"); + }; + assert_eq!(&*value, &i.to_be_bytes()); + }); + Ok(()) + }, + ); + + // Run each test case standalone + for (name, write, read) in test_cases.iter() { + let tempdir = tempfile::tempdir()?; + let path = tempdir.path(); + + { + let start = Instant::now(); + let db = TurboPersistence::open(path.to_path_buf())?; + let mut batch = db.write_batch()?; + write(&mut batch)?; + db.commit_write_batch(batch)?; + println!("{name} write time: {:?}", start.elapsed()); + + let start = Instant::now(); + read(&db)?; + println!("{name} read time: {:?}", start.elapsed()); + + let start = Instant::now(); + drop(db); + println!("{name} drop time: {:?}", start.elapsed()); + } + { + let start = Instant::now(); + let db = TurboPersistence::open(path.to_path_buf())?; + println!("{name} restore time: {:?}", start.elapsed()); + let start = Instant::now(); + read(&db)?; + println!("{name} read time after restore: {:?}", start.elapsed()); + let start = Instant::now(); + read(&db)?; + println!("{name} read time after read: {:?}", start.elapsed()); + + #[cfg(feature = "stats")] + println!("{name} stats: {:#?}", db.statistics()); + + let start = Instant::now(); + db.full_compact()?; + println!("{name} compact time: {:?}", start.elapsed()); + + let start = Instant::now(); + read(&db)?; + println!("{name} read time after compact: {:?}", start.elapsed()); + + let start = Instant::now(); + drop(db); + println!("{name} drop time after compact: {:?}", start.elapsed()); + } + { + let start = Instant::now(); + let db = TurboPersistence::open(path.to_path_buf())?; + println!("{name} restore time after compact: {:?}", start.elapsed()); + let start = Instant::now(); + read(&db)?; + println!( + "{name} read time after compact + restore: {:?}", + start.elapsed() + ); + let start = Instant::now(); + read(&db)?; + println!( + "{name} read time after compact + restore + read: {:?}", + start.elapsed() + ); + + #[cfg(feature = "stats")] + println!("{name} stats (compacted): {:#?}", db.statistics()); + + let start = Instant::now(); + drop(db); + println!( + "{name} drop time after compact + restore: {:?}", + start.elapsed() + ); + } + } + + // Run all test cases in a single db + { + let tempdir = tempfile::tempdir()?; + let path = tempdir.path(); + + { + let start = Instant::now(); + let db = TurboPersistence::open(path.to_path_buf())?; + let mut batch = db.write_batch()?; + for (_, write, _) in test_cases.iter() { + write(&mut batch)?; + } + db.commit_write_batch(batch)?; + println!("All write time: {:?}", start.elapsed()); + + for (name, _, read) in test_cases.iter() { + let start = Instant::now(); + read(&db)?; + println!("{name} read time: {:?}", start.elapsed()); + } + + let start = Instant::now(); + drop(db); + println!("All drop time: {:?}", start.elapsed()); + } + { + let start = Instant::now(); + let db = TurboPersistence::open(path.to_path_buf())?; + println!("All restore time: {:?}", start.elapsed()); + for (name, _, read) in test_cases.iter() { + let start = Instant::now(); + read(&db)?; + println!("{name} read time after restore: {:?}", start.elapsed()); + } + for (name, _, read) in test_cases.iter() { + let start = Instant::now(); + read(&db)?; + println!("{name} read time after read: {:?}", start.elapsed()); + } + #[cfg(feature = "stats")] + println!("All stats: {:#?}", db.statistics()); + + let start = Instant::now(); + db.full_compact()?; + println!("All compact time: {:?}", start.elapsed()); + + for (name, _, read) in test_cases.iter() { + let start = Instant::now(); + read(&db)?; + println!("{name} read time after compact: {:?}", start.elapsed()); + } + + let start = Instant::now(); + drop(db); + println!("All drop time after compact: {:?}", start.elapsed()); + } + + { + let start = Instant::now(); + let db = TurboPersistence::open(path.to_path_buf())?; + println!("All restore time after compact: {:?}", start.elapsed()); + + for (name, _, read) in test_cases.iter() { + let start = Instant::now(); + read(&db)?; + println!( + "{name} read time after compact + restore: {:?}", + start.elapsed() + ); + } + for (name, _, read) in test_cases.iter() { + let start = Instant::now(); + read(&db)?; + println!( + "{name} read time after compact + restore + read: {:?}", + start.elapsed() + ); + } + + #[cfg(feature = "stats")] + println!("All stats (compacted): {:#?}", db.statistics()); + + let start = Instant::now(); + drop(db); + println!( + "All drop time after compact + restore: {:?}", + start.elapsed() + ); + } + } + Ok(()) +} diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs new file mode 100644 index 0000000000000..97490c73e41e5 --- /dev/null +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -0,0 +1,296 @@ +use std::{ + borrow::Cow, + cell::UnsafeCell, + fs::File, + mem::{replace, swap}, + path::PathBuf, + sync::atomic::{AtomicU32, Ordering}, +}; + +use anyhow::{Context, Result}; +use byteorder::{WriteBytesExt, BE}; +use lzzzz::lz4::{self, ACC_LEVEL_DEFAULT}; +use parking_lot::Mutex; +use rayon::{ + iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}, + scope, Scope, +}; +use thread_local::ThreadLocal; + +use crate::{ + collector::Collector, collector_entry::CollectorEntry, constants::MAX_MEDIUM_VALUE_SIZE, + key::StoreKey, static_sorted_file_builder::StaticSortedFileBuilder, +}; + +/// The thread local state of a `WriteBatch`. +struct ThreadLocalState { + /// The collectors for each family. + collectors: [Option>; FAMILIES], + /// The list of new SST files that have been created. + new_sst_files: Vec<(u32, File)>, +} + +/// A write batch. +pub struct WriteBatch { + /// The database path + path: PathBuf, + /// The current sequence number counter. Increased for every new SST file or blob file. + current_sequence_number: AtomicU32, + /// The thread local state. + thread_locals: ThreadLocal>>, + /// Collectors are are current unused, but have memory preallocated. + idle_collectors: Mutex>>, +} + +impl WriteBatch { + /// Creates a new write batch for a database. + pub(crate) fn new(path: PathBuf, current: u32) -> Self { + assert!(FAMILIES <= u32::MAX as usize); + Self { + path, + current_sequence_number: AtomicU32::new(current), + thread_locals: ThreadLocal::new(), + idle_collectors: Mutex::new(Vec::new()), + } + } + + /// Resets the write batch to a new sequence number. This is called when the WriteBatch is + /// reused. + pub(crate) fn reset(&mut self, current: u32) { + self.current_sequence_number + .store(current, Ordering::SeqCst); + } + + /// Returns the collector for a family for the current thread. + fn collector_mut(&self, family: usize) -> Result<&mut Collector> { + debug_assert!(family < FAMILIES); + let cell = self.thread_locals.get_or(|| { + UnsafeCell::new(ThreadLocalState { + collectors: [const { None }; FAMILIES], + new_sst_files: Vec::new(), + }) + }); + // Safety: We know that the cell is only accessed from the current thread. + let state = unsafe { &mut *cell.get() }; + let collector = state.collectors[family].get_or_insert_with(|| { + self.idle_collectors + .lock() + .pop() + .unwrap_or_else(|| Collector::new()) + }); + if collector.is_full() { + let sst = self.create_sst_file(family, collector.sorted())?; + collector.clear(); + state.new_sst_files.push(sst); + } + Ok(collector) + } + + /// Puts a key-value pair into the write batch. + pub fn put(&self, family: usize, key: K, value: Cow<'_, [u8]>) -> Result<()> { + let collector = self.collector_mut(family)?; + if value.len() <= MAX_MEDIUM_VALUE_SIZE { + collector.put(key, value.into_owned()); + } else { + let blob = self.create_blob(&value)?; + collector.put_blob(key, blob); + } + Ok(()) + } + + /// Puts a delete operation into the write batch. + pub fn delete(&self, family: usize, key: K) -> Result<()> { + let collector = self.collector_mut(family)?; + collector.delete(key); + Ok(()) + } + + /// Finishes the write batch by returning the new sequence number and the new SST files. This + /// writes all outstanding thread local data to disk. + pub(crate) fn finish(&mut self) -> Result<(u32, Vec<(u32, File)>)> { + let mut new_sst_files = Vec::new(); + let mut all_collectors = [(); FAMILIES].map(|_| Vec::new()); + for cell in self.thread_locals.iter_mut() { + let state = cell.get_mut(); + new_sst_files.append(&mut state.new_sst_files); + for (family, global_collector) in all_collectors.iter_mut().enumerate() { + if let Some(collector) = state.collectors[family].take() { + if !collector.is_empty() { + global_collector.push(Some(collector)); + } + } + } + } + let shared_new_sst_files = Mutex::new(&mut new_sst_files); + let shared_error = Mutex::new(Ok(())); + scope(|scope| { + fn handle_done_collector<'scope, K: StoreKey + Send + Sync, const FAMILIES: usize>( + this: &'scope WriteBatch, + scope: &Scope<'scope>, + family: usize, + mut collector: Collector, + shared_new_sst_files: &'scope Mutex<&mut Vec<(u32, File)>>, + shared_error: &'scope Mutex>, + ) { + scope.spawn( + move |_| match this.create_sst_file(family, collector.sorted()) { + Ok(sst) => { + collector.clear(); + this.idle_collectors.lock().push(collector); + shared_new_sst_files.lock().push(sst); + } + Err(err) => { + *shared_error.lock() = Err(err); + } + }, + ); + } + + all_collectors + .into_par_iter() + .enumerate() + .for_each(|(family, collectors)| { + let final_collector = collectors.into_par_iter().reduce( + || None, + |a, b| match (a, b) { + (Some(mut a), Some(mut b)) => { + if a.len() < b.len() { + swap(&mut a, &mut b); + } + for entry in b.drain() { + if a.is_full() { + let full_collector = replace( + &mut a, + self.idle_collectors + .lock() + .pop() + .unwrap_or_else(|| Collector::new()), + ); + handle_done_collector( + self, + scope, + family, + full_collector, + &shared_new_sst_files, + &shared_error, + ); + } + a.add_entry(entry); + } + self.idle_collectors.lock().push(b); + Some(a) + } + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + }, + ); + if let Some(collector) = final_collector { + handle_done_collector( + self, + scope, + family, + collector, + &shared_new_sst_files, + &shared_error, + ); + } + }); + }); + shared_error.into_inner()?; + let seq = self.current_sequence_number.load(Ordering::SeqCst); + new_sst_files.sort_by_key(|(seq, _)| *seq); + Ok((seq, new_sst_files)) + } + + /// Creates a new blob file with the given value. + fn create_blob(&self, value: &[u8]) -> Result { + let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let mut buffer = Vec::new(); + buffer.write_u32::(value.len() as u32)?; + lz4::compress_to_vec(value, &mut buffer, ACC_LEVEL_DEFAULT) + .context("Compression of value for blob file failed")?; + + let file = self.path.join(format!("{:08}.blob", seq)); + std::fs::write(file, &buffer).context("Unable to write blob file")?; + Ok(seq) + } + + /// Creates a new SST file with the given collector data. + fn create_sst_file( + &self, + family: usize, + collector_data: (&[CollectorEntry], usize, usize), + ) -> Result<(u32, File)> { + let (entries, total_key_size, total_value_size) = collector_data; + let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + let builder = + StaticSortedFileBuilder::new(family as u32, entries, total_key_size, total_value_size)?; + + let path = self.path.join(format!("{:08}.sst", seq)); + let file = builder + .write(&path) + .with_context(|| format!("Unable to write SST file {:08}.sst", seq))?; + + #[cfg(feature = "verify_sst_content")] + { + use core::panic; + + use crate::{ + collector_entry::CollectorEntryValue, + key::hash_key, + static_sorted_file::{AqmfCache, BlockCache, LookupResult, StaticSortedFile}, + }; + + file.sync_all()?; + let sst = StaticSortedFile::open(seq, path)?; + let cache1 = AqmfCache::with( + 10, + u64::MAX, + Default::default(), + Default::default(), + Default::default(), + ); + let cache2 = BlockCache::with( + 10, + u64::MAX, + Default::default(), + Default::default(), + Default::default(), + ); + let cache3 = BlockCache::with( + 10, + u64::MAX, + Default::default(), + Default::default(), + Default::default(), + ); + for entry in entries { + let mut key = Vec::with_capacity(entry.key.len()); + entry.key.write_to(&mut key); + let result = sst + .lookup(hash_key(&key), &key, &cache1, &cache2, &cache3) + .expect("key found"); + match result { + LookupResult::Deleted => {} + LookupResult::Small { value: val } => { + if let EntryValue::Small { value } | EntryValue::Medium { value } = + entry.value + { + assert_eq!(&*val, &*value); + } else { + panic!("Unexpected value"); + } + } + LookupResult::Blob { sequence_number } => {} + LookupResult::QuickFilterMiss => panic!("aqmf must include"), + LookupResult::RangeMiss => panic!("Index must cover"), + LookupResult::KeyMiss => panic!("All keys must exist"), + } + } + } + + Ok((seq, file)) + } +} diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index 1def3ef47f75b..96252b3c51869 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -16,6 +16,7 @@ workspace = true default = [] verify_serialization = [] trace_aggregation_update = [] +lmdb = ["dep:lmdb-rkv"] [dependencies] anyhow = { workspace = true } @@ -27,7 +28,7 @@ dashmap = { workspace = true, features = ["raw-api"]} either = { workspace = true } hashbrown = { workspace = true, features = ["raw"] } indexmap = { workspace = true } -lmdb-rkv = "0.14.0" +lmdb-rkv = { version = "0.14.0", optional = true } once_cell = { workspace = true } parking_lot = { workspace = true } pot = "3.0.0" @@ -41,6 +42,7 @@ tokio = { workspace = true } tokio-scoped = "0.2.0" tracing = { workspace = true } thread_local = { workspace = true } +turbo-persistence = { workspace = true } turbo-prehash = { workspace = true } turbo-rcstr = { workspace = true } turbo-tasks = { workspace = true } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index 1110be4bf3781..e6c3d88e59288 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -784,6 +784,12 @@ impl TurboTasksBackendInner { self.stopping_event.notify(usize::MAX); } + fn stop(&self) { + if let Err(err) = self.backing_storage.shutdown() { + println!("Shutting down failed: {}", err); + } + } + fn idle_start(&self) { self.idle_start_event.notify(usize::MAX); } @@ -1813,6 +1819,10 @@ impl Backend for TurboTasksBackend { self.0.stopping(); } + fn stop(&self, _turbo_tasks: &dyn TurboTasksBackendApi) { + self.0.stop(); + } + fn idle_start(&self, _turbo_tasks: &dyn TurboTasksBackendApi) { self.0.idle_start(); } diff --git a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs index 1db2d7f2ada0e..257c13f585731 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backing_storage.rs @@ -51,4 +51,8 @@ pub trait BackingStorage: 'static + Send + Sync { task_id: TaskId, category: TaskDataCategory, ) -> Vec; + + fn shutdown(&self) -> Result<()> { + Ok(()) + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs b/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs index eb300c00624e4..f41f9db6cc0da 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/key_value_database.rs @@ -51,4 +51,8 @@ pub trait KeyValueDatabase { fn write_batch( &self, ) -> Result, Self::ConcurrentWriteBatch<'_>>>; + + fn shutdown(&self) -> Result<()> { + Ok(()) + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/mod.rs b/turbopack/crates/turbo-tasks-backend/src/database/mod.rs index 9e23e919fa679..78ce1a99a6791 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/mod.rs @@ -1,16 +1,15 @@ +#[cfg(feature = "lmdb")] mod by_key_space; pub mod db_versioning; +#[cfg(feature = "lmdb")] pub mod fresh_db_optimization; pub mod key_value_database; +#[cfg(feature = "lmdb")] pub mod lmdb; pub mod noop_kv; +#[cfg(feature = "lmdb")] pub mod read_transaction_cache; -mod startup_cache; +#[cfg(feature = "lmdb")] +pub mod startup_cache; +pub mod turbo; pub mod write_batch; - -pub use db_versioning::handle_db_versioning; -pub use fresh_db_optimization::{is_fresh, FreshDbOptimization}; -#[allow(unused_imports)] -pub use noop_kv::NoopKvDb; -pub use read_transaction_cache::ReadTransactionCache; -pub use startup_cache::StartupCacheLayer; diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs new file mode 100644 index 0000000000000..3d1274689eb12 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs @@ -0,0 +1,147 @@ +use std::{ + borrow::Cow, + path::PathBuf, + sync::Arc, + thread::{spawn, JoinHandle}, +}; + +use anyhow::Result; +use parking_lot::Mutex; +use turbo_persistence::{ArcSlice, TurboPersistence}; + +use crate::database::{ + key_value_database::{KeySpace, KeyValueDatabase}, + write_batch::{BaseWriteBatch, ConcurrentWriteBatch, WriteBatch}, +}; + +const COMPACT_MAX_COVERAGE: f32 = 20.0; +const COMPACT_MAX_MERGE_SEQUENCE: usize = 8; + +pub struct TurboKeyValueDatabase { + db: Arc, + compact_join_handle: Mutex>>>, +} + +impl TurboKeyValueDatabase { + pub fn new(path: PathBuf) -> Result { + let db = Arc::new(TurboPersistence::open(path.to_path_buf())?); + let mut this = Self { + db: db.clone(), + compact_join_handle: Mutex::new(None), + }; + // start compaction in background if the database is not empty + if !db.is_empty() { + let handle = + spawn(move || db.compact(COMPACT_MAX_COVERAGE, COMPACT_MAX_MERGE_SEQUENCE)); + this.compact_join_handle.get_mut().replace(handle); + } + Ok(this) + } +} + +impl KeyValueDatabase for TurboKeyValueDatabase { + type ReadTransaction<'l> + = () + where + Self: 'l; + + fn lower_read_transaction<'l: 'i + 'r, 'i: 'r, 'r>( + tx: &'r Self::ReadTransaction<'l>, + ) -> &'r Self::ReadTransaction<'i> { + tx + } + + fn is_empty(&self) -> bool { + self.db.is_empty() + } + + fn begin_read_transaction(&self) -> Result> { + Ok(()) + } + + type ValueBuffer<'l> + = ArcSlice + where + Self: 'l; + + fn get<'l, 'db: 'l>( + &'l self, + _transaction: &'l Self::ReadTransaction<'db>, + key_space: KeySpace, + key: &[u8], + ) -> Result>> { + self.db.get(key_space as usize, &key) + } + + type ConcurrentWriteBatch<'l> + = TurboWriteBatch<'l> + where + Self: 'l; + + fn write_batch( + &self, + ) -> Result, Self::ConcurrentWriteBatch<'_>>> { + // Wait for the compaction to finish + if let Some(join_handle) = self.compact_join_handle.lock().take() { + join_handle.join().unwrap()?; + } + // Start a new write batch + Ok(WriteBatch::concurrent(TurboWriteBatch { + batch: self.db.write_batch()?, + db: &self.db, + compact_join_handle: &self.compact_join_handle, + })) + } + + fn shutdown(&self) -> Result<()> { + // Wait for the compaction to finish + if let Some(join_handle) = self.compact_join_handle.lock().take() { + join_handle.join().unwrap()?; + } + // Shutdown the database + self.db.shutdown() + } +} + +pub struct TurboWriteBatch<'a> { + batch: turbo_persistence::WriteBatch, 5>, + db: &'a Arc, + compact_join_handle: &'a Mutex>>>, +} + +impl<'a> BaseWriteBatch<'a> for TurboWriteBatch<'a> { + type ValueBuffer<'l> + = ArcSlice + where + Self: 'l, + 'a: 'l; + + fn get<'l>(&'l self, key_space: KeySpace, key: &[u8]) -> Result>> + where + 'a: 'l, + { + self.db.get(key_space as usize, &key) + } + + fn commit(self) -> Result<()> { + // Commit the write batch + self.db.commit_write_batch(self.batch)?; + + // Start a new compaction in the background + let db = self.db.clone(); + let handle = spawn(move || db.compact(COMPACT_MAX_COVERAGE, COMPACT_MAX_MERGE_SEQUENCE)); + self.compact_join_handle.lock().replace(handle); + + Ok(()) + } +} + +impl<'a> ConcurrentWriteBatch<'a> for TurboWriteBatch<'a> { + fn put(&self, key_space: KeySpace, key: Cow<[u8]>, value: Cow<[u8]>) -> Result<()> { + self.batch.put(key_space as usize, key.into_owned(), value) + } + + fn delete(&self, key_space: KeySpace, key: Cow<[u8]>) -> Result<()> { + self.batch.delete(key_space as usize, key.into_owned()) + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 009fdf5e2dc11..50d7aedd83a4f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -435,6 +435,10 @@ impl BackingStorage .inspect_err(|err| println!("Looking up data for {task_id} failed: {err:?}")) .unwrap_or_default() } + + fn shutdown(&self) -> Result<()> { + self.database.shutdown() + } } fn get_next_free_task_id<'a, S, C>( diff --git a/turbopack/crates/turbo-tasks-backend/src/lib.rs b/turbopack/crates/turbo-tasks-backend/src/lib.rs index 333cf6596ab4f..3af8287607746 100644 --- a/turbopack/crates/turbo-tasks-backend/src/lib.rs +++ b/turbopack/crates/turbo-tasks-backend/src/lib.rs @@ -18,32 +18,59 @@ pub use self::{ kv_backing_storage::KeyValueDatabaseBackingStorage, }; use crate::database::{ - handle_db_versioning, is_fresh, lmdb::LmbdKeyValueDatabase, FreshDbOptimization, NoopKvDb, - ReadTransactionCache, StartupCacheLayer, + db_versioning::handle_db_versioning, noop_kv::NoopKvDb, turbo::TurboKeyValueDatabase, }; +#[cfg(feature = "lmdb")] pub type LmdbBackingStorage = KeyValueDatabaseBackingStorage< - ReadTransactionCache>>, + ReadTransactionCache< + StartupCacheLayer>, + >, >; +#[cfg(feature = "lmdb")] pub fn lmdb_backing_storage(path: &Path) -> Result { + use crate::database::{ + fresh_db_optimization::{is_fresh, FreshDbOptimization}, + read_transaction_cache::ReadTransactionCache, + startup_cache::StartupCacheLayer, + }; + let path = handle_db_versioning(path)?; let fresh_db = is_fresh(&path); - let database = LmbdKeyValueDatabase::new(&path)?; + let database = crate::database::lmdb::LmbdKeyValueDatabase::new(&path)?; let database = FreshDbOptimization::new(database, fresh_db); let database = StartupCacheLayer::new(database, path.join("startup.cache"), fresh_db)?; let database = ReadTransactionCache::new(database); Ok(KeyValueDatabaseBackingStorage::new(database)) } +pub type TurboBackingStorage = KeyValueDatabaseBackingStorage; + +pub fn turbo_backing_storage(path: &Path) -> Result { + let path = handle_db_versioning(path)?; + let database = TurboKeyValueDatabase::new(path)?; + Ok(KeyValueDatabaseBackingStorage::new(database)) +} + pub type NoopBackingStorage = KeyValueDatabaseBackingStorage; pub fn noop_backing_storage() -> NoopBackingStorage { KeyValueDatabaseBackingStorage::new(NoopKvDb) } +#[cfg(feature = "lmdb")] pub type DefaultBackingStorage = LmdbBackingStorage; +#[cfg(feature = "lmdb")] pub fn default_backing_storage(path: &Path) -> Result { lmdb_backing_storage(path) } + +#[cfg(not(feature = "lmdb"))] +pub type DefaultBackingStorage = TurboBackingStorage; + +#[cfg(not(feature = "lmdb"))] +pub fn default_backing_storage(path: &Path) -> Result { + turbo_backing_storage(path) +}