Skip to content

Commit af71cc6

Browse files
committed
f Implement locking for FilesystemStore
1 parent 3380625 commit af71cc6

File tree

1 file changed

+45
-7
lines changed

1 file changed

+45
-7
lines changed

src/io/fs_store.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ extern crate winapi;
33

44
use super::{KVStore, TransactionalWrite};
55

6+
use std::collections::HashMap;
67
use std::fs;
78
use std::io::{BufReader, BufWriter, Read, Write};
89
use std::path::{Path, PathBuf};
910
use std::str::FromStr;
11+
use std::sync::{Arc, Mutex, RwLock};
1012

1113
#[cfg(not(target_os = "windows"))]
1214
use std::os::unix::io::AsRawFd;
@@ -38,11 +40,13 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<winapi::shared::ntdef::W
3840

3941
pub struct FilesystemStore {
4042
dest_dir: PathBuf,
43+
locks: Mutex<HashMap<(String, String), Arc<RwLock<()>>>>,
4144
}
4245

4346
impl FilesystemStore {
4447
pub fn new(dest_dir: PathBuf) -> Self {
45-
Self { dest_dir }
48+
let locks = Mutex::new(HashMap::new());
49+
Self { dest_dir, locks }
4650
}
4751
}
4852

@@ -51,20 +55,34 @@ impl KVStore for FilesystemStore {
5155
type Writer = FilesystemWriter;
5256

5357
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
58+
let mut outer_lock = self.locks.lock().unwrap();
59+
let lock_key = (namespace.to_string(), key.to_string());
60+
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
61+
5462
let mut dest_file = self.dest_dir.clone();
5563
dest_file.push(namespace);
5664
dest_file.push(key);
57-
FilesystemReader::new(dest_file)
65+
FilesystemReader::new(dest_file, inner_lock_ref)
5866
}
5967

6068
fn write(&self, namespace: &str, key: &str) -> std::io::Result<Self::Writer> {
69+
let mut outer_lock = self.locks.lock().unwrap();
70+
let lock_key = (namespace.to_string(), key.to_string());
71+
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
72+
6173
let mut dest_file = self.dest_dir.clone();
6274
dest_file.push(namespace);
6375
dest_file.push(key);
64-
FilesystemWriter::new(dest_file)
76+
FilesystemWriter::new(dest_file, inner_lock_ref)
6577
}
6678

6779
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {
80+
let mut outer_lock = self.locks.lock().unwrap();
81+
let lock_key = (namespace.to_string(), key.to_string());
82+
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default());
83+
84+
let _guard = inner_lock_ref.write().unwrap();
85+
6886
let mut dest_file = self.dest_dir.clone();
6987
dest_file.push(namespace);
7088
dest_file.push(key);
@@ -96,6 +114,21 @@ impl KVStore for FilesystemStore {
96114
return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
97115
}
98116

117+
if Arc::strong_count(&inner_lock_ref) == 2 {
118+
// It's safe to remove the lock entry if we're the only one left holding a strong
119+
// reference. Checking this is necessary to ensure we continue to distribute references to the
120+
// same lock as long as some Writers/Readers are around. However, we still want to
121+
// clean up the table when possible.
122+
//
123+
// Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
124+
// around, but is preferable to doing nothing *or* something overly complex such as
125+
// implementing yet another RAII structure just for this pupose.
126+
outer_lock.remove(&lock_key);
127+
}
128+
129+
// Garbage collect all lock entries that are not referenced anymore.
130+
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
131+
99132
Ok(true)
100133
}
101134

@@ -134,18 +167,20 @@ impl KVStore for FilesystemStore {
134167

135168
pub struct FilesystemReader {
136169
inner: BufReader<fs::File>,
170+
lock_ref: Arc<RwLock<()>>,
137171
}
138172

139173
impl FilesystemReader {
140-
pub fn new(dest_file: PathBuf) -> std::io::Result<Self> {
174+
fn new(dest_file: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
141175
let f = fs::File::open(dest_file.clone())?;
142176
let inner = BufReader::new(f);
143-
Ok(Self { inner })
177+
Ok(Self { inner, lock_ref })
144178
}
145179
}
146180

147181
impl Read for FilesystemReader {
148182
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
183+
let _guard = self.lock_ref.read().unwrap();
149184
self.inner.read(buf)
150185
}
151186
}
@@ -155,10 +190,11 @@ pub struct FilesystemWriter {
155190
parent_directory: PathBuf,
156191
tmp_file: PathBuf,
157192
tmp_writer: BufWriter<fs::File>,
193+
lock_ref: Arc<RwLock<()>>,
158194
}
159195

160196
impl FilesystemWriter {
161-
pub fn new(dest_file: PathBuf) -> std::io::Result<Self> {
197+
fn new(dest_file: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
162198
let msg = format!("Could not retrieve parent directory of {}.", dest_file.display());
163199
let parent_directory = dest_file
164200
.parent()
@@ -179,7 +215,7 @@ impl FilesystemWriter {
179215

180216
let tmp_writer = BufWriter::new(fs::File::create(&tmp_file)?);
181217

182-
Ok(Self { dest_file, parent_directory, tmp_file, tmp_writer })
218+
Ok(Self { dest_file, parent_directory, tmp_file, tmp_writer, lock_ref })
183219
}
184220
}
185221

@@ -198,6 +234,8 @@ impl Write for FilesystemWriter {
198234
impl TransactionalWrite for FilesystemWriter {
199235
fn commit(&mut self) -> std::io::Result<()> {
200236
self.flush()?;
237+
238+
let _guard = self.lock_ref.write().unwrap();
201239
// Fsync the parent directory on Unix.
202240
#[cfg(not(target_os = "windows"))]
203241
{

0 commit comments

Comments
 (0)