Skip to content

Commit

Permalink
Improve the Checksum implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops committed Dec 29, 2024
1 parent 85cc8de commit 5496b9b
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 34 deletions.
159 changes: 128 additions & 31 deletions heed/src/envs/env_open_options.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(master3)]
use std::any::TypeId;
use std::ffi::CString;
#[cfg(windows)]
use std::ffi::OsStr;
Expand All @@ -14,10 +16,12 @@ use std::{io, ptr};
use aead::{generic_array::typenum::Unsigned, AeadCore, AeadMutInPlace, Key, KeyInit};
use synchronoise::SignalEvent;

#[cfg(master3)]
use super::checksum_func_wrapper;
#[cfg(master3)]
use super::encrypted_env::{encrypt_func_wrapper, EncryptedEnv};
use super::env::Env;
use super::{canonicalize_path, OPENED_ENV};
use super::{canonicalize_path, Checksum, NoChecksum, OPENED_ENV};
#[cfg(windows)]
use crate::envs::OsStrExtLmdb as _;
use crate::mdb::error::mdb_result;
Expand All @@ -28,28 +32,28 @@ use crate::{EnvFlags, Error, Result};
/// Options and flags which can be used to configure how an environment is opened.
#[derive(Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct EnvOpenOptions<T: TlsUsage> {
pub struct EnvOpenOptions<T: TlsUsage, C: Checksum> {
map_size: Option<usize>,
max_readers: Option<u32>,
max_dbs: Option<u32>,
flags: EnvFlags,
_tls_marker: PhantomData<T>,
_marker: PhantomData<(T, C)>,
}

impl EnvOpenOptions<WithTls> {
impl EnvOpenOptions<WithTls, NoChecksum> {
/// Creates a blank new set of options ready for configuration.
pub fn new() -> EnvOpenOptions<WithTls> {
pub fn new() -> EnvOpenOptions<WithTls, NoChecksum> {
EnvOpenOptions {
map_size: None,
max_readers: None,
max_dbs: None,
flags: EnvFlags::empty(),
_tls_marker: PhantomData,
_marker: PhantomData,
}
}
}

impl<T: TlsUsage> EnvOpenOptions<T> {
impl<T: TlsUsage, C: Checksum + 'static> EnvOpenOptions<T, C> {
/// Make the read transactions `!Send` by specifying they will
/// use Thread Local Storage (TLS). It is often faster to open
/// TLS-backed transactions.
Expand Down Expand Up @@ -81,9 +85,9 @@ impl<T: TlsUsage> EnvOpenOptions<T> {
/// is_sendable(rtxn);
/// # Ok(()) }
/// ```
pub fn read_txn_with_tls(self) -> EnvOpenOptions<WithTls> {
let Self { map_size, max_readers, max_dbs, flags, _tls_marker: _ } = self;
EnvOpenOptions { map_size, max_readers, max_dbs, flags, _tls_marker: PhantomData }
pub fn read_txn_with_tls(self) -> EnvOpenOptions<WithTls, C> {
let Self { map_size, max_readers, max_dbs, flags, _marker: _ } = self;
EnvOpenOptions { map_size, max_readers, max_dbs, flags, _marker: PhantomData }
}

/// Make the read transactions `Send` by specifying they will
Expand Down Expand Up @@ -126,9 +130,106 @@ impl<T: TlsUsage> EnvOpenOptions<T> {
/// is_sendable(rtxn);
/// # Ok(()) }
/// ```
pub fn read_txn_without_tls(self) -> EnvOpenOptions<WithoutTls> {
let Self { map_size, max_readers, max_dbs, flags, _tls_marker: _ } = self;
EnvOpenOptions { map_size, max_readers, max_dbs, flags, _tls_marker: PhantomData }
pub fn read_txn_without_tls(self) -> EnvOpenOptions<WithoutTls, C> {
let Self { map_size, max_readers, max_dbs, flags, _marker: _ } = self;
EnvOpenOptions { map_size, max_readers, max_dbs, flags, _marker: PhantomData }
}

#[cfg(master3)]
/// Changes the checksum algorithm to use.
///
/// # Basic Example
///
/// Creates and open a database. The [`Env`] is using a [`crc`](https://github.com/mrhooray/crc-rs)
/// algorithm.
///
/// Note that you cannot use **any** type of crc algorithm as it is possible to tell
/// the size of the crc to LMDB.
///
/// ```
/// use std::fs;
/// use std::path::Path;
/// use memchr::memmem::find;
/// use argon2::Argon2;
/// use chacha20poly1305::{ChaCha20Poly1305, Key};
/// use heed3::types::*;
/// use heed3::{EnvOpenOptions, Checksum, Database, Error, MdbError};
///
/// /// A checksum algorithm based on the well-known CRC_32_BZIP2.
/// enum Crc32Bzip2 {}
///
/// impl Checksum for Crc32Bzip2 {
/// // Be careful the size is in bytes not bits.
/// const SIZE: u32 = 32 / 8;
///
/// fn checksum(input: &[u8], output: &mut [u8], _key: Option<&[u8]>) {
/// let sum = crc::Crc::<u32>::new(&crc::CRC_32_BZIP2).checksum(input);
/// eprintln!("checksumming {input:?} which gives {sum:?}");
/// output.copy_from_slice(&sum.to_ne_bytes());
/// }
/// }
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let env_path = tempfile::tempdir()?;
///
/// fs::create_dir_all(&env_path)?;
///
/// // We open the environment
/// let mut options = EnvOpenOptions::new().checksum::<Crc32Bzip2>();
/// let env = unsafe {
/// options
/// .map_size(10 * 1024 * 1024) // 10MB
/// .max_dbs(3)
/// .open(&env_path)?
/// };
///
/// let key1 = "first-key";
/// let val1 = "this is my first value";
/// let key2 = "second-key";
/// let val2 = "this is a second information";
///
/// // We create a database and write values in it
/// let mut wtxn = env.write_txn()?;
/// let db = env.create_database::<Str, Str>(&mut wtxn, Some("first"))?;
/// db.put(&mut wtxn, key1, val1)?;
/// db.put(&mut wtxn, key2, val2)?;
/// wtxn.commit()?;
///
/// // We check that we can read the values back
/// let rtxn = env.read_txn()?;
/// assert_eq!(db.get(&rtxn, key1)?, Some(val1));
/// assert_eq!(db.get(&rtxn, key2)?, Some(val2));
/// drop(rtxn);
///
/// // We close the env and check that we can read in it
/// env.prepare_for_closing().wait();
///
/// // We modify the content of the data file
/// let mut content = fs::read(env_path.path().join("data.mdb"))?;
/// let pos = find(&content, b"value").unwrap();
/// content[pos..pos + 5].copy_from_slice(b"thing");
/// fs::write(env_path.path().join("data.mdb"), content)?;
///
/// // We reopen the environment
/// let mut options = EnvOpenOptions::new().checksum::<Crc32Bzip2>();
/// let env = unsafe {
/// options
/// .map_size(10 * 1024 * 1024) // 10MB
/// .max_dbs(3)
/// .open(&env_path)?
/// };
///
/// // We check that we can read the values back
/// let rtxn = env.read_txn()?;
/// let db = env.open_database::<Str, Str>(&rtxn, Some("first"))?.unwrap();
/// assert!(matches!(db.get(&rtxn, key1).unwrap_err(), Error::Mdb(MdbError::BadChecksum)));
/// drop(rtxn);
///
/// # Ok(()) }
/// ```
pub fn checksum<NC: Checksum>(self) -> EnvOpenOptions<T, NC> {
let Self { map_size, max_readers, max_dbs, flags, _marker } = self;
EnvOpenOptions { map_size, max_readers, max_dbs, flags, _marker: PhantomData }
}

/// Set the size of the memory map to use for this environment.
Expand Down Expand Up @@ -237,18 +338,6 @@ impl<T: TlsUsage> EnvOpenOptions<T> {
path.as_ref(),
#[cfg(master3)]
None,
#[cfg(master3)]
None,
)
}

pub unsafe fn open_checksummed<P: AsRef<Path>>(&self, path: P) -> Result<Env<T>> {
self.raw_open_with_checksum_and_encryption(
path.as_ref(),
#[cfg(master3)]
None,
#[cfg(master3)]
None,
)
}

Expand Down Expand Up @@ -404,7 +493,6 @@ impl<T: TlsUsage> EnvOpenOptions<T> {
{
self.raw_open_with_checksum_and_encryption(
path.as_ref(),
None,
Some((Some(encrypt_func_wrapper::<E>), &key, <E as AeadCore>::TagSize::U32)),
)
.map(|inner| EncryptedEnv { inner })
Expand All @@ -413,7 +501,6 @@ impl<T: TlsUsage> EnvOpenOptions<T> {
fn raw_open_with_checksum_and_encryption(
&self,
path: &Path,
#[cfg(master3)] sum: Option<(ffi::MDB_sum_func, u32)>,
#[cfg(master3)] enc: Option<(ffi::MDB_enc_func, &[u8], u32)>,
) -> Result<Env<T>> {
let mut lock = OPENED_ENV.write().unwrap();
Expand Down Expand Up @@ -451,6 +538,16 @@ impl<T: TlsUsage> EnvOpenOptions<T> {
))?;
}

#[cfg(master3)]
if TypeId::of::<C>() != TypeId::of::<NoChecksum>() {
eprintln!("Doing some checksumming stuff");
mdb_result(ffi::mdb_env_set_checksum(
env,
Some(checksum_func_wrapper::<C>),
C::SIZE,
))?;
}

if let Some(size) = self.map_size {
if size % page_size::get() != 0 {
let msg = format!(
Expand Down Expand Up @@ -496,15 +593,15 @@ impl<T: TlsUsage> EnvOpenOptions<T> {
}
}

impl Default for EnvOpenOptions<WithTls> {
impl Default for EnvOpenOptions<WithTls, NoChecksum> {
fn default() -> Self {
Self::new()
}
}

impl<T: TlsUsage> Clone for EnvOpenOptions<T> {
impl<T: TlsUsage, C: Checksum> Clone for EnvOpenOptions<T, C> {
fn clone(&self) -> Self {
let Self { map_size, max_readers, max_dbs, flags, _tls_marker } = *self;
EnvOpenOptions { map_size, max_readers, max_dbs, flags, _tls_marker }
let Self { map_size, max_readers, max_dbs, flags, _marker } = *self;
EnvOpenOptions { map_size, max_readers, max_dbs, flags, _marker }
}
}
52 changes: 52 additions & 0 deletions heed/src/envs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,55 @@ impl FlagSetMode {
}
}
}

/// A trait defining how to calculate checksum within the environment.
///
/// Enabling checksumming is not supported in the heed crate and
/// can only be modified within the heed3 crate.
pub trait Checksum {
/// The size of computed checksum values, in bytes.
const SIZE: u32;

/// Compute the checksum of the data in input and store the
/// result in output, an optional key may be used with keyed
/// hash algorithms.
///
/// The key parameter is an encryption key, if encryption was
/// configured. This parameter will be NULL if there is no key.
fn checksum(input: &[u8], output: &mut [u8], key: Option<&[u8]>);
}

/// Deactivate environment checksumming.
///
/// Enabling checksumming is not supported in the heed crate and
/// can only be modified within the heed3 crate.
pub enum NoChecksum {}

impl Checksum for NoChecksum {
const SIZE: u32 = 0;
fn checksum(_input: &[u8], _output: &mut [u8], _key: Option<&[u8]>) {}
}

/// The wrapper function that is called by LMDB that directly calls
/// the Rust idiomatic function internally.
#[cfg(master3)]
unsafe extern "C" fn checksum_func_wrapper<C: Checksum>(
src: *const ffi::MDB_val,
dst: *mut ffi::MDB_val,
key_ptr: *const ffi::MDB_val,
) {
let result = std::panic::catch_unwind(|| {
let input = std::slice::from_raw_parts((*src).mv_data as *const u8, (*src).mv_size);
let output = std::slice::from_raw_parts_mut((*dst).mv_data as *mut u8, (*dst).mv_size);
let key = if key_ptr.is_null() {
None
} else {
Some(std::slice::from_raw_parts((*key_ptr).mv_data as *const u8, (*key_ptr).mv_size))
};
C::checksum(input, output, key)
});

if result.is_err() {
std::process::abort();
}
}
4 changes: 2 additions & 2 deletions heed/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ pub use self::databases::{EncryptedDatabase, EncryptedDatabaseOpenOptions};
#[cfg(master3)]
pub use self::envs::EncryptedEnv;
pub use self::envs::{
env_closing_event, CompactionOption, DefaultComparator, Env, EnvClosingEvent, EnvInfo,
EnvOpenOptions, FlagSetMode, IntegerComparator,
env_closing_event, Checksum, CompactionOption, DefaultComparator, Env, EnvClosingEvent,
EnvInfo, EnvOpenOptions, FlagSetMode, IntegerComparator, NoChecksum,
};
pub use self::iterator::{
RoIter, RoPrefix, RoRange, RoRevIter, RoRevPrefix, RoRevRange, RwIter, RwPrefix, RwRange,
Expand Down
2 changes: 1 addition & 1 deletion heed/src/mdb/lmdb_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use ffi::{
MDB_RDONLY, MDB_RESERVE,
};
#[cfg(master3)]
pub use ffi::{mdb_env_set_encrypt, MDB_enc_func, MDB_sum_func};
pub use ffi::{mdb_env_set_checksum, mdb_env_set_encrypt, MDB_enc_func};
#[cfg(master3)]
use lmdb_master3_sys as ffi;
#[cfg(not(master3))]
Expand Down
2 changes: 2 additions & 0 deletions heed3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ synchronoise = "1.0.1"
[dev-dependencies]
# TODO update dependencies
argon2 = { version = "0.5.3", features = ["std"] }
crc = "3.2.1"
memchr = "2.7.4"
serde = { version = "1.0.215", features = ["derive"] }
chacha20poly1305 = "0.10.1"
tempfile = "3.14.0"
Expand Down

0 comments on commit 5496b9b

Please sign in to comment.