Skip to content

Commit

Permalink
small cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
danburkert committed Oct 26, 2015
1 parent 8a15575 commit c8eb214
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 123 deletions.
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@

A work-in-progress write ahead logging library for Rust.

## Roadmap
# TODO

- [x] file based segment
- [x] benchmarks
- [ ] mmap based segment
- [ ] higher level wal type
- [ ] asynchronous appends
- [ ] compression
* Prefix/Suffix truncation
* Durable value API

## Running Benchmarks

Expand Down
9 changes: 4 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,11 @@ mod test {
let _ = env_logger::init();
fn wal(entries: Vec<Vec<u8>>) -> TestResult {
let dir = tempdir::TempDir::new("wal").unwrap();

// Insert all of the entries. Reopen the Wal after every entry
// without closing it properly.
for entry in &entries {
{
let mut wal = Wal::open(&dir.path()).unwrap();
let _ = wal.append(entry);
for entry in &entries {
let _ = wal.append(entry);
}
}

let wal = Wal::open(&dir.path()).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/segment/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ mod test {
let mut creator = SegmentCreator::new(&dir.path(), vec![]);

for _ in 0..10 {
let _ = creator.next();
let segment = creator.next();
}
}
}
235 changes: 125 additions & 110 deletions src/segment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,8 @@
//! An append-only, fixed-length, durable log of arbitrary entries.
//!
//! The segment on-disk format is as simple as possible, while providing
//! backwards compatibility, protection against corruption, and alignment
//! guarantees.
//!
//! A version tag allows the internal format to be updated, while still
//! maintaining compatibility with previously written files. CRCs are used
//! to ensure that entries are not corrupted. Padding is used to ensure that
//! entries are always aligned to 8-byte boundaries.
//!
//! ## On Disk Format
//!
//! All version, length, and CRC integers are serialized in little-endian format.
//!
//! All CRC values are computed using
//! [CRC32-C](https://en.wikipedia.org/wiki/Cyclic_redundancy_check).
//!
//! ### Segment Header Format
//!
//! | component | type |
//! | ---------------------- | ------- |
//! | magic bytes ("wal") | 3 bytes |
//! | segment format version | u8 |
//! | random CRC seed | u32 |
//!
//! The segment header is 8 bytes long: three magic bytes ("wal") followed
//! by a segment format version u8, followed by a random u32 CRC seed. The
//! CRC seed is serialized in little-endian format. The CRC seed ensures
//! that if a segment file is overwritten with a new segment, the old
//! segments entries will be ignored (since the CRC will not match).
//!
//! # Entry Format
//!
//! | component | type |
//! | ---------------------------- | ---- |
//! | length | u64 |
//! | data | |
//! | padding | |
//! | CRC(length + data + padding) | u32 |
//!
//! Entries are serialized to the log with a fixed-length header, followed
//! by the data itself, and finally a variable length footer. The header
//! includes the length of the data followed by a CRC code of the length.
//! The footer includes between 0 and 7 bytes of padding to extend the length
//! of the entry to a multiple of 8, followed by the CRC code of the length,
//! data, and padding.

pub mod creator;
pub mod flusher;

use std::fmt;
use std::fs::{self, File, OpenOptions};
use std::io::{
Error,
Expand Down Expand Up @@ -78,7 +32,52 @@ const CRC_LEN: usize = 4;

type Crc = u32;

/// A read-only segment.
/// An append-only, fixed-length, durable log of entries.
///
/// The segment on-disk format is as simple as possible, while providing
/// backwards compatibility, protection against corruption, and alignment
/// guarantees.
///
/// A version tag allows the internal format to be updated, while still
/// maintaining compatibility with previously written files. CRCs are used
/// to ensure that entries are not corrupted. Padding is used to ensure that
/// entries are always aligned to 8-byte boundaries.
///
/// ## On Disk Format
///
/// All version, length, and CRC integers are serialized in little-endian format.
///
/// All CRC values are computed using
/// [CRC32-C](https://en.wikipedia.org/wiki/Cyclic_redundancy_check).
///
/// ### Segment Header Format
///
/// | component | type |
/// | ---------------------- | ------- |
/// | magic bytes ("wal") | 3 bytes |
/// | segment format version | u8 |
/// | random CRC seed | u32 |
///
/// The segment header is 8 bytes long: three magic bytes ("wal") followed by a
/// segment format version u8, followed by a random u32 CRC seed. The CRC seed
/// is serialized in little-endian format. The CRC seed ensures that if a
/// segment file is overwritten with a new segment, the old entries will be
/// ignored (since the CRC will not match).
///
/// ### Entry Format
///
/// | component | type |
/// | ---------------------------- | ---- |
/// | length | u64 |
/// | data | |
/// | padding | |
/// | CRC(length + data + padding) | u32 |
///
/// Entries are serialized to the log with a fixed-length header, followed by
/// the data itself, and finally a variable length footer. The header includes
/// the length of the data as a u64 in little-endian format. The footer includes
/// between 0 and 7 bytes of padding to extend the total length of the entry to
/// a multiple of 8, followed by the CRC code of the length, data, and padding.
pub struct Segment {
/// The segment file buffer.
mmap: MmapHandle,
Expand All @@ -89,7 +88,7 @@ pub struct Segment {
/// Index of entry offset and lengths.
index: Vec<(usize, usize)>,
/// The crc of the last appended entry.
crc: u32,
crc: Crc,
}

impl Segment {
Expand All @@ -101,6 +100,11 @@ impl Segment {
///
/// If a segment or another file already exists on the path it will be
/// overwritten, and the allocated file space will be reused.
///
/// An individual file may only be opened by a single segment at a time.
///
/// The caller is responsible for syncing the directory in order to
/// guarantee that the segment is durable in the event of a crash.
pub fn create<P>(path: P, capacity: usize) -> Result<Segment> where P: AsRef<Path> {
if capacity < HEADER_LEN {
return Err(Error::new(ErrorKind::InvalidInput, "invalid segment capacity"));
Expand All @@ -111,26 +115,23 @@ impl Segment {
.write(true)
.create(true)
.open(&path));

try!(file.try_lock_exclusive());

try!(file.set_len(capacity as u64));
// TODO: sync directory

let mut mmap = MmapHandle::new(try!(Mmap::open_with_offset(&file,
Protection::ReadWrite,
0,
capacity)));
let mut mmap = try!(Mmap::open_with_offset(&file,
Protection::ReadWrite,
0,
capacity));

let seed = rand::random();

// Write and sync the header information.
copy_memory(SEGMENT_HEADER, unsafe { &mut mmap.as_mut().as_mut_slice()[..4] });
LittleEndian::write_u32(unsafe { &mut mmap.as_mut().as_mut_slice()[4..] }, seed);
copy_memory(SEGMENT_HEADER, unsafe { &mut mmap.as_mut_slice()[..4] });
LittleEndian::write_u32(unsafe { &mut mmap.as_mut_slice()[4..] }, seed);

try!(file.sync_all());
Ok(Segment {
mmap: mmap,
mmap: MmapHandle::new(mmap),
file: file,
path: path.as_ref().to_path_buf(),
index: Vec::new(),
Expand All @@ -140,15 +141,45 @@ impl Segment {

/// Opens the segment at the specified path.
pub fn open<P>(path: P) -> Result<Segment> where P: AsRef<Path> {
let mmap = MmapHandle::new(try!(Mmap::open_path(&path, Protection::ReadWrite)));
let mmap = try!(Mmap::open_path(&path, Protection::ReadWrite));
let file = try!(OpenOptions::new()
.read(true)
.write(true)
.create(false)
.open(&path));
let (index, crc) = try!(rebuild_index(unsafe { mmap.as_ref().as_slice() }));
try!(file.try_lock_exclusive());

let mut index = Vec::new();
let mut crc;
{
// Parse the segment, filling out the index containing the offset
// and length of each entry, as well as the latest CRC value.
//
// If the CRC of any entry does not match, then parsing stops and
// the remainder of the file is considered empty.
let segment = unsafe { mmap.as_slice() };
crc = LittleEndian::read_u32(&segment[SEGMENT_HEADER.len()..]);
let mut offset = HEADER_LEN;

while segment.len() >= offset + HEADER_LEN + CRC_LEN {
let len = LittleEndian::read_u64(&segment[offset..]) as usize;
let padding = padding(len);
let padded_len = len + padding;
if offset + HEADER_LEN + padded_len + CRC_LEN > segment.len() { break; }

let entry_crc = crc32::update(crc,
&crc32::CASTAGNOLI_TABLE,
&segment[offset..offset + HEADER_LEN + padded_len]);
if entry_crc != LittleEndian::read_u32(&segment[offset + HEADER_LEN + padded_len..]) { break; }

crc = entry_crc;
index.push((offset + HEADER_LEN, len));
offset += HEADER_LEN + padded_len + CRC_LEN;
}
}

Ok(Segment {
mmap: mmap,
mmap: MmapHandle::new(mmap),
file: file,
path: path.as_ref().to_path_buf(),
index: index,
Expand Down Expand Up @@ -238,7 +269,7 @@ impl Segment {
/// Renames the segment.
///
/// The caller is responsible for syncing the directory in order to
/// guarantee that the rename is durable.
/// guarantee that the rename is durable in the event of a crash.
pub fn rename<P>(&mut self, path: P) -> Result<()> where P: AsRef<Path> {
try!(fs::rename(&self.path, &path));
self.path = path.as_ref().to_path_buf();
Expand Down Expand Up @@ -271,6 +302,12 @@ impl Segment {
}
}

impl fmt::Debug for Segment {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Segment {{ path: {:?}, entry_count: {} }}", &self.path, self.index.len())
}
}

/// Copies data from `src` to `dst`
///
/// Panics if the length of `dst` is less than the length of `src`.
Expand All @@ -289,47 +326,13 @@ fn padding(len: usize) -> usize {
4usize.wrapping_sub(len) & 7
}

/// Parses a segment, returning an index containing the offset and length of
/// each entry, as well as the latest CRC value.
///
/// If the CRC of any entry does not match, then parsing stops and the entries
/// up until that point are returned.
fn rebuild_index(segment: &[u8]) -> Result<(Vec<(usize, usize)>, Crc)> {
if segment.len() < HEADER_LEN {
return Err(Error::new(ErrorKind::InvalidData, "invalid segment length"));
}

if &segment[..SEGMENT_HEADER.len()] != SEGMENT_HEADER {
return Err(Error::new(ErrorKind::InvalidData, "invalid segment header"));
}

let mut index = Vec::new();
let mut crc = LittleEndian::read_u32(&segment[SEGMENT_HEADER.len()..]);
let mut offset = HEADER_LEN;

while segment.len() >= offset + HEADER_LEN + CRC_LEN {
let len = LittleEndian::read_u64(&segment[offset..]) as usize;
let padding = padding(len);
let padded_len = len + padding;
if offset + HEADER_LEN + padded_len + CRC_LEN > segment.len() { break; }

let entry_crc = crc32::update(crc,
&crc32::CASTAGNOLI_TABLE,
&segment[offset..offset + HEADER_LEN + padded_len]);
if entry_crc != LittleEndian::read_u32(&segment[offset + HEADER_LEN + padded_len..]) { break; }

crc = entry_crc;
index.push((offset + HEADER_LEN, len));
offset += HEADER_LEN + padded_len + CRC_LEN;
}
Ok((index, crc))
}

#[cfg(test)]
mod test {
extern crate tempdir;
extern crate env_logger;

use std::io::ErrorKind;

use super::{HEADER_LEN, Segment, padding};

#[test]
Expand All @@ -353,34 +356,36 @@ mod test {
assert_eq!(5, padding(15));
}

fn test_segment(len: usize) -> (Segment, tempdir::TempDir) {
let _ = env_logger::init();
fn test_segment(len: usize) -> Segment {
let dir = tempdir::TempDir::new("segment").unwrap();
let mut path = dir.path().to_path_buf();
path.push("test-segment");
(Segment::create(path, len).unwrap(), dir)
Segment::create(path, len).unwrap()
}

#[test]
fn test_create() {
fn test_create_dir_path() {
let _ = env_logger::init();
let (_, dir) = test_segment(4096);
let dir = tempdir::TempDir::new("segment").unwrap();
assert!(Segment::open(dir.path()).is_err());
}

/// Opening multiple segments on the same file should fail.
#[test]
fn test_exclusive_lock() {
fn test_exclusive() {
let _ = env_logger::init();
let segment = test_segment(4096).0;
assert_eq!(HEADER_LEN, segment.size());
assert_eq!(4096, segment.capacity());
assert_eq!(0, segment.len());
let dir = tempdir::TempDir::new("segment").unwrap();
let mut path = dir.path().to_path_buf();
path.push("test-exclusive");
let segment = Segment::create(&path, 4096).unwrap();
assert_eq!(ErrorKind::WouldBlock, Segment::open(&path).unwrap_err().kind());
assert_eq!(ErrorKind::WouldBlock, Segment::create(&path, 4096).unwrap_err().kind());
}

#[test]
fn test_entries() {
let _ = env_logger::init();
let mut segment = test_segment(4096).0;
let mut segment = test_segment(4096);
let entries: &[&[u8]] = &[b"",
b"0",
b"01",
Expand Down Expand Up @@ -465,4 +470,14 @@ mod test {

assert_eq!(0, segment.len());
}

/// Tests that opening a non-existent segment file will fail.
#[test]
fn test_open_nonexistent() {
let _ = env_logger::init();
let dir = tempdir::TempDir::new("segment").unwrap();
let mut path = dir.path().to_path_buf();
path.push("test-open-nonexistent");
assert_eq!(ErrorKind::NotFound, Segment::open(&path).unwrap_err().kind());
}
}

0 comments on commit c8eb214

Please sign in to comment.