Skip to content

Commit

Permalink
SSTables use binary search (#10)
Browse files Browse the repository at this point in the history
* update: sstable scan doesn't require a mut sstable.

* add: min number of threads for parallel search.

* add: fn to search value concurrently.

resolve #3

* update: sstable.write

- sstable.write needs a ref to HashMap.
- This allows the store to pass in the HashMap.
- The Sstable does the sorting and writing.
- This is also easier to parallelize from the store.

* update: search

We remove `scan` for the sstable, it doesn't exploit the sorted nature.
Instead we provide a binary `search` api.

* update: foreshadow key-value size independence.

Currently the key and value can be both as large as u64.
It is wasteful and forces users to use more space than needed for keys.
We will offer means to configure max size for key and values separately.

* update: create word sizes for keys and values separately

* test: uses hashmap to write sstables.

* update: parallel_search doesn't expect thread config.

* update: sstable.write abstracts hashmap management.

* update: use search over deprecated scan.

* refactor: flush_memtable returns an error.

* test: fix expect statements.

* style: lint.

* style: lint.
  • Loading branch information
ltbringer authored Mar 21, 2023
1 parent 2f39a17 commit f93c4d8
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 111 deletions.
4 changes: 3 additions & 1 deletion src/sstable/constants.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub static WORD: usize = 8;
pub static KEY_WORD: usize = WORD;
pub static VALUE_WORD: usize = WORD;
pub static TOMBSTONE: &[u8] = &[];
pub static RKV: &str = ".rkv";
pub static RKV: &str = "rkv";
139 changes: 83 additions & 56 deletions src/sstable/sst.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::sstable::constants::{TOMBSTONE, WORD};
use byteorder::{LittleEndian, WriteBytesExt};
use crate::sstable::constants::{KEY_WORD, TOMBSTONE, VALUE_WORD, WORD};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use log::error;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::fs::{remove_file, File, OpenOptions};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::io::{Read, Result, Seek, SeekFrom, Write};
use std::path::PathBuf;

#[derive(Clone)]
Expand All @@ -27,7 +28,7 @@ impl SSTable {
* `Key length` is trying to specify. The same explains the following
* `Val length`.
*/
pub fn new(filename: PathBuf, read: bool, write: bool, create: bool) -> io::Result<SSTable> {
pub fn new(filename: PathBuf, read: bool, write: bool, create: bool) -> Result<SSTable> {
Ok(SSTable {
filename,
read,
Expand All @@ -44,58 +45,78 @@ impl SSTable {
}
}

fn open(&self) -> io::Result<File> {
fn open(&self, filename: &PathBuf) -> Result<File> {
OpenOptions::new()
.read(self.read)
.write(self.write)
.create(self.create)
.open(&self.filename)
.open(filename)
}

/**
* Write a key-value pair to an SSTable.
*
* - Both key length and value length are exactly 8 bytes long because
* we are using u64 for both.
* - Both key length and value length are 8 bytes long because
* we are using u64 for keys and values.
* - Writing the key (and value) length helps us at the time of reading.
* or else we would resort to delimiters and handle cases when the
* delimiter character is also an input.
*/
pub fn write(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
let mut file = self.open()?;
file.seek(SeekFrom::End(0))?;
let key_len = key.len() as u64;
let value_len = value.len() as u64;
let mut buf = vec![];
buf.write_u64::<LittleEndian>(key_len)?;
buf.write_all(key)?;
buf.write_u64::<LittleEndian>(value_len)?;
buf.write_all(value)?;
file.write_all(&buf)?;
pub fn write(&mut self, hashmap: &HashMap<Vec<u8>, Vec<u8>>) -> Result<()> {
let index_filename = self.filename.with_extension("index");
let mut data_file = self.open(&self.filename)?;
let mut index_file = self.open(&index_filename)?;
data_file.seek(SeekFrom::End(0))?;
index_file.seek(SeekFrom::End(0))?;

let mut sorted_hashmap: Vec<(&Vec<u8>, &Vec<u8>)> = hashmap.iter().collect();
sorted_hashmap.sort_by(|a, b| {
let a_key = a.0;
let other_key = b.0;
a_key.cmp(other_key)
});

for (key, value) in sorted_hashmap {
let key_len = key.len() as u64;
let value_len = value.len() as u64;
let mut buf = vec![];
let seek_pos = data_file.seek(SeekFrom::Current(0))?;
index_file.write_u64::<LittleEndian>(seek_pos)?;
buf.write_u64::<LittleEndian>(key_len)?;
buf.write_all(key)?;
buf.write_u64::<LittleEndian>(value_len)?;
buf.write_all(value)?;
data_file.write_all(&buf)?;
}

Ok(())
}

fn get_kv_len_u64(&self, buf: &[u8], i: usize) -> usize {
u64::from_le_bytes(buf[i..i + 8].try_into().unwrap()) as usize
fn get_key_u64(&self, buf: &[u8], i: usize) -> usize {
u64::from_le_bytes(buf[i..i + KEY_WORD].try_into().unwrap()) as usize
}

pub fn as_hashmap(&mut self) -> io::Result<HashMap<Vec<u8>, Vec<u8>>> {
let mut file = self.open()?;
fn get_value_u64(&self, buf: &[u8], i: usize) -> usize {
u64::from_le_bytes(buf[i..i + VALUE_WORD].try_into().unwrap()) as usize
}

pub fn as_hashmap(&mut self) -> Result<HashMap<Vec<u8>, Vec<u8>>> {
let mut file = self.open(&self.filename)?;
file.seek(SeekFrom::Start(0))?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let mut i: usize = 0;
let mut hashmap: HashMap<Vec<u8>, Vec<u8>> = HashMap::new();

while i < buf.len() {
let key_len = self.get_kv_len_u64(&buf, i);
i += WORD;
let key_len = self.get_key_u64(&buf, i);
i += KEY_WORD;

let key_ = &buf[i..i + key_len];
i += key_len;

let value_len = self.get_kv_len_u64(&buf, i);
i += WORD;
let value_len = self.get_value_u64(&buf, i);
i += VALUE_WORD;

let value_ = &buf[i..i + value_len];
i += value_len;
Expand All @@ -107,35 +128,41 @@ impl SSTable {
Ok(hashmap)
}

/**
* Read the value of a key from an SSTable.
* If this file was opened for writing,
* that would change the seek position to EOF,
* Hence we explicitly change the position.
*/
pub fn scan(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
let mut file = self.open()?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let mut i: usize = 0;

while i < buf.len() {
let key_len = self.get_kv_len_u64(&buf, i);
i += WORD;

let key_ = &buf[i..i + key_len];
i += key_len;

let value_len = self.get_kv_len_u64(&buf, i);
i += WORD;

let value_ = &buf[i..i + value_len];
i += value_len;

let is_tombstone = value_ == TOMBSTONE;

if key_ == key && !is_tombstone {
return Ok(Some(value_.to_vec()));
pub fn search(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let mut data_file = self.open(&self.filename)?;
let mut index_file = self.open(&self.filename.with_extension("index"))?;
let mut start = index_file.seek(SeekFrom::Start(0))?;
let mut end = index_file.seek(SeekFrom::End(0))? / WORD as u64;

while start < end {
let index_mid = start + (end - start) / 2;
index_file.seek(SeekFrom::Start(index_mid * WORD as u64))?;
let data_mid = index_file.read_u64::<LittleEndian>()?;
data_file.seek(SeekFrom::Start(data_mid))?;
let key_len = data_file.read_u64::<LittleEndian>()?;
let mut key_buf = vec![0; key_len as usize];
data_file.read_exact(key_buf.as_mut_slice())?;
let current_key = key_buf.as_slice();

match key.cmp(current_key) {
Ordering::Less => {
end = index_mid;
}
Ordering::Equal => {
let value_len = data_file.read_u64::<LittleEndian>()?;
let mut value_buf = vec![0; value_len as usize];
data_file.read_exact(value_buf.as_mut_slice())?;
let value = value_buf.as_slice();

if value != TOMBSTONE {
return Ok(Some(value.to_vec()));
} else {
return Ok(None);
}
}
Ordering::Greater => {
start = index_mid + 1;
}
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/sstable/sstable_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#[cfg(test)]
mod test {
use crate::sstable::sst::SSTable;
use std::panic::{self, AssertUnwindSafe};
use std::{
collections::HashMap,
panic::{self, AssertUnwindSafe},
};
use tempfile::TempDir;

#[test]
Expand All @@ -18,11 +21,13 @@ mod test {
};
let key = b"test_key";
let value = b"test_value";
match sstable.write(key, value) {
let mut store = HashMap::new();
store.insert(key.to_vec(), value.to_vec());
match sstable.write(&store) {
Ok(_) => (),
Err(_) => panic!("Failed write to sstable."),
};
let value_read = match sstable.scan(key) {
let value_read = match sstable.search(key) {
Ok(Some(v)) => v,
Err(e) => panic!("{}", e),
_ => panic!("Failed to read value."),
Expand Down
Loading

0 comments on commit f93c4d8

Please sign in to comment.