Skip to content

Commit 0ccfd64

Browse files
committed
feat: wip background compaction
1 parent f1decd7 commit 0ccfd64

File tree

4 files changed

+58
-14
lines changed

4 files changed

+58
-14
lines changed

src/bin/sqrl-server.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,16 @@ async fn main() -> anyhow::Result<()> {
8484
if clients.len() > 3 {
8585
warn!("Replicating to many followers can greatly impact write performance");
8686
}
87-
ReplicatedServer::new(clients.into(), app.log_file, app.addr)?
87+
ReplicatedServer::new(clients.into(), app.log_file, app.addr)
88+
.await?
89+
.run()
90+
.await
91+
}
92+
replication::Mode::Follower => {
93+
StandaloneServer::new(app.log_file, app.addr)
94+
.await?
8895
.run()
8996
.await
9097
}
91-
replication::Mode::Follower => StandaloneServer::new(app.log_file, app.addr)?.run().await,
9298
}
9399
}

src/replication/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub struct ReplicatedServer {
2727
}
2828

2929
impl ReplicatedServer {
30-
pub fn new<P>(
30+
pub async fn new<P>(
3131
clients: Mutex<Vec<RemoteNodeClient>>,
3232
path: P,
3333
addr: SocketAddr,
@@ -37,7 +37,7 @@ impl ReplicatedServer {
3737
{
3838
Ok(Self {
3939
addr,
40-
server: Arc::new(StandaloneServer::new(path, addr)?),
40+
server: Arc::new(StandaloneServer::new(path, addr).await?),
4141
remote_replicas: Arc::new(clients),
4242
})
4343
}

src/server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ pub struct StandaloneServer {
1616
}
1717

1818
impl StandaloneServer {
19-
pub fn new<P>(path: P, addr: SocketAddr) -> anyhow::Result<Self>
19+
pub async fn new<P>(path: P, addr: SocketAddr) -> anyhow::Result<Self>
2020
where
2121
P: Into<std::path::PathBuf>,
2222
{
23-
let store = Arc::new(KvStore::open(path)?);
23+
let store = Arc::new(KvStore::open(path).await?);
2424
Ok(Self { store, addr })
2525
}
2626

src/store.rs

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ use std::path::PathBuf;
1212
use std::str::FromStr;
1313
use std::sync::atomic::AtomicUsize;
1414
use std::sync::{Arc, RwLock};
15+
use std::time::Duration;
1516
use std::usize;
17+
use tokio::sync::mpsc::error::TryRecvError;
18+
use tokio::sync::mpsc::{Receiver, Sender};
1619
use tracing::level_filters::LevelFilter;
1720
use tracing::{self, debug, info, warn};
1821
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
@@ -62,6 +65,12 @@ pub struct KvStore {
6265
/// The maximum size of a log file in bytes.
6366
max_log_file_size: u64,
6467

68+
/// Channel on which compaction notifications are sent.
69+
///
70+
/// This allows the [`KvStore`] to signal to the background task which receives
71+
/// compaction jobs to begin its work.
72+
compaction_tx: Sender<()>,
73+
6574
_tracing: Option<Arc<tracing::subscriber::DefaultGuard>>,
6675
}
6776

@@ -96,10 +105,11 @@ struct KeydirEntry {
96105
timestamp: i64,
97106
}
98107

99-
#[derive(Debug, Clone, Serialize, Deserialize)]
108+
#[derive(Debug)]
100109
struct StoreConfig {
101110
log_location: PathBuf,
102111
max_log_file_size: u64,
112+
compaction_tx: Sender<()>,
103113
}
104114

105115
impl KvsEngine for KvStore {
@@ -230,28 +240,31 @@ impl KvStore {
230240
/// Create a new KvStore.
231241
///
232242
/// The store is created in memory and is not persisted to disk.
233-
fn new(config: StoreConfig) -> KvStore {
243+
fn new(config: &StoreConfig) -> KvStore {
234244
KvStore {
235245
writer: Arc::new(RwLock::new(StoreWriter::default())),
236-
log_location: config.log_location,
246+
log_location: config.log_location.clone(),
237247
keydir: Arc::new(DashMap::new()),
238248
max_log_file_size: config.max_log_file_size,
249+
compaction_tx: config.compaction_tx.clone(),
239250
_tracing: None,
240251
}
241252
}
242253

243254
/// Open a KvStore at the given path.
244-
pub fn open<P>(path: P) -> Result<KvStore>
255+
pub async fn open<P>(path: P) -> Result<KvStore>
245256
where
246257
P: Into<PathBuf>,
247258
{
248259
let path = path.into();
260+
let (tx, mut rx) = tokio::sync::mpsc::channel(5);
249261
let store_config = StoreConfig {
250262
log_location: path.clone(),
251263
max_log_file_size: MAX_LOG_FILE_SIZE.with(|f| *f),
264+
compaction_tx: tx,
252265
};
253266

254-
let mut store = KvStore::new(store_config.clone());
267+
let mut store = KvStore::new(&store_config);
255268
let log_level = std::env::var("KVS_LOG").unwrap_or("info".to_string());
256269
store.setup_logging(log_level)?;
257270
info!("Initialising store");
@@ -260,6 +273,24 @@ impl KvStore {
260273

261274
debug!("Creating initial log file");
262275
store.set_active_log_handle()?;
276+
tokio::spawn(async move {
277+
let interval_ms: u64 = std::env::var("KVS_COMPACTION_INTERVAL_MS")
278+
.unwrap_or_else(|_| "200".to_owned())
279+
.parse()
280+
.expect("Unable to parse default compaction interval");
281+
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
282+
info!(interval_ms, "Background compaction polling");
283+
loop {
284+
interval.tick().await;
285+
match rx.try_recv() {
286+
Ok(_trigger) => {
287+
info!("Compaction triggered");
288+
}
289+
Err(TryRecvError::Empty) => debug!("No compaction required"),
290+
Err(TryRecvError::Disconnected) => panic!("Compaction channel unavailable"),
291+
}
292+
}
293+
});
263294
Ok(store)
264295
}
265296

@@ -486,10 +517,17 @@ impl KvStore {
486517

487518
pub(crate) fn setup_logging(&mut self, level: String) -> anyhow::Result<()> {
488519
let level = LevelFilter::from_str(&level)?;
489-
let layer = tracing_subscriber::fmt::layer().with_writer(std::io::stderr);
520+
let layer = tracing_subscriber::fmt::layer()
521+
.compact()
522+
.with_writer(std::io::stderr)
523+
.with_thread_ids(true)
524+
.with_line_number(true)
525+
.with_file(true)
526+
.with_target(false);
490527
let subscriber = tracing_subscriber::registry().with(level).with(layer);
491-
let tracing_guard = tracing::subscriber::set_default(subscriber);
492-
self._tracing = Some(Arc::new(tracing_guard));
528+
tracing::subscriber::set_global_default(subscriber)?;
529+
//let tracing_guard = tracing::subscriber::set_default(subscriber);
530+
//self._tracing = Some(Arc::new(tracing_guard));
493531
Ok(())
494532
}
495533
}

0 commit comments

Comments
 (0)