Skip to content

Commit

Permalink
Speed up writes further with re-usable buffer backed by AlignedVec.
Browse files Browse the repository at this point in the history
Also added template parameter `N` for scratch space size.

Benchmarks:
```
synchronizer/write      time:   [250.71 ns 251.42 ns 252.41 ns]
                        thrpt:  [3.9619 Melem/s 3.9774 Melem/s 3.9887 Melem/s]
                 change:
                        time:   [-99.152% -99.147% -99.142%] (p = 0.00 < 0.05)
                        thrpt:  [+11559% +11627% +11699%]
                        Performance has improved.
synchronizer/write_raw  time:   [145.25 ns 145.53 ns 145.92 ns]
                        thrpt:  [6.8531 Melem/s 6.8717 Melem/s 6.8849 Melem/s]
                 change:
                        time:   [-98.508% -98.471% -98.443%] (p = 0.00 < 0.05)
                        thrpt:  [+6322.7% +6441.2% +6602.9%]
                        Performance has improved.
synchronizer/read/check_bytes_true
                        time:   [40.114 ns 40.139 ns 40.186 ns]
                        thrpt:  [24.884 Melem/s 24.914 Melem/s 24.929 Melem/s]
                 change:
                        time:   [-0.1031% -0.0200% +0.0753%] (p = 0.69 > 0.05)
                        thrpt:  [-0.0752% +0.0200% +0.1032%]
                        No change in performance detected.
synchronizer/read/check_bytes_false
                        time:   [26.658 ns 26.673 ns 26.696 ns]
                        thrpt:  [37.458 Melem/s 37.491 Melem/s 37.512 Melem/s]
                 change:
                        time:   [-0.9845% -0.9083% -0.7959%] (p = 0.00 < 0.05)
                        thrpt:  [+0.8023% +0.9167% +0.9943%]
                        Change within noise threshold.
```
  • Loading branch information
bocharov committed Jul 9, 2024
1 parent 4a3fdba commit 86e132d
Showing 1 changed file with 28 additions and 11 deletions.
39 changes: 28 additions & 11 deletions src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use std::hash::{BuildHasher, BuildHasherDefault, Hasher};
use std::time::Duration;

use bytecheck::CheckBytes;
use rkyv::ser::serializers::AllocSerializer;
use rkyv::ser::serializers::{AlignedSerializer, AllocSerializer};
use rkyv::ser::Serializer;
use rkyv::validation::validators::DefaultValidator;
use rkyv::{archived_root, check_archived_root, Archive, Serialize};
use rkyv::{archived_root, check_archived_root, AlignedVec, Archive, Serialize};
use thiserror::Error;
use wyhash::WyHash;

Expand All @@ -24,13 +24,15 @@ use crate::synchronizer::SynchronizerError::*;
/// `Synchronizer` is a concurrency primitive that manages data access between a single writer process and multiple reader processes.
///
/// It coordinates the access to two data files that store the shared data. A state file, also memory-mapped, stores the index of the current data file and the number of active readers for each index, updated via atomic instructions.
pub struct Synchronizer<H: Hasher + Default = WyHash> {
pub struct Synchronizer<H: Hasher + Default = WyHash, const N: usize = 1024> {
/// Container storing state mmap
state_container: StateContainer,
/// Container storing data mmap
data_container: DataContainer,
/// Hasher used for checksum calculation
build_hasher: BuildHasherDefault<H>,
/// Re-usable buffer for serialization
serialize_buffer: Option<AlignedVec>,
}

/// `SynchronizerError` enumerates all possible errors returned by this library.
Expand Down Expand Up @@ -61,16 +63,21 @@ pub enum SynchronizerError {
InvalidInstanceVersionParams,
}

/// Default serializer with 1 MB scratch space allocated on the heap.
type DefaultSerializer = AllocSerializer<1_000_000>;

impl Synchronizer {
/// Create new instance of `Synchronizer` using given `path_prefix`
pub fn new(path_prefix: &OsStr) -> Synchronizer {
/// Create new instance of `Synchronizer` using given `path_prefix` and default template parameters
pub fn new(path_prefix: &OsStr) -> Self {
Self::with_params(path_prefix)
}
}

impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
/// Create new instance of `Synchronizer` using given `path_prefix` and template parameters
pub fn with_params(path_prefix: &OsStr) -> Self {
Synchronizer {
state_container: StateContainer::new(path_prefix),
data_container: DataContainer::new(path_prefix),
build_hasher: BuildHasherDefault::default(),
serialize_buffer: Some(AlignedVec::new()),
}
}

Expand All @@ -97,11 +104,18 @@ impl Synchronizer {
grace_duration: Duration,
) -> Result<(usize, bool), SynchronizerError>
where
T: Serialize<DefaultSerializer>,
T: Serialize<AllocSerializer<N>>,
T::Archived: for<'b> CheckBytes<DefaultValidator<'b>>,
{
let mut buf = self.serialize_buffer.take().ok_or(FailedEntityWrite)?;
buf.clear();

// serialize given entity into bytes
let mut serializer = DefaultSerializer::default();
let mut serializer = AllocSerializer::new(
AlignedSerializer::new(buf),
Default::default(),
Default::default(),
);
let _ = serializer
.serialize_value(entity)
.map_err(|_| FailedEntityWrite)?;
Expand All @@ -126,6 +140,9 @@ impl Synchronizer {
// switch readers to new version
state.switch_version(new_version);

// Restore buffer for potential reuse
self.serialize_buffer.replace(data);

Ok((size, reset))
}

Expand All @@ -138,7 +155,7 @@ impl Synchronizer {
grace_duration: Duration,
) -> Result<(usize, bool), SynchronizerError>
where
T: Serialize<DefaultSerializer>,
T: Serialize<AllocSerializer<N>>,
T::Archived: for<'b> CheckBytes<DefaultValidator<'b>>,
{
// fetch current state from mapped memory
Expand Down

0 comments on commit 86e132d

Please sign in to comment.