Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Use sqlite wal for replication #777

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sqld/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ async-stream = "0.3.5"
libsql = { git = "https://github.com/tursodatabase/libsql", rev = "8847ca05c", optional = true }
metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
walkdir = "2.4.0"

[dev-dependencies]
proptest = "1.0.0"
Expand Down
7 changes: 6 additions & 1 deletion sqld/proto/replication_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ message Frames {
repeated Frame frames = 1;
}

message BatchEntriesResponse {
Frames frames = 1;
HelloResponse hs_response = 2;
}

service ReplicationLog {
rpc Hello(HelloRequest) returns (HelloResponse) {}
rpc LogEntries(LogOffset) returns (stream Frame) {}
rpc BatchLogEntries(LogOffset) returns (Frames) {}
rpc BatchLogEntries(LogOffset) returns (BatchEntriesResponse) {}
rpc Snapshot(LogOffset) returns (stream Frame) {}
}
5 changes: 5 additions & 0 deletions sqld/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ impl NamespaceName {
pub fn as_slice(&self) -> &[u8] {
&self.0
}

/// return the u128 hash of the namespace
pub fn id(&self) -> u128{
todo!();
}
}

impl fmt::Display for NamespaceName {
Expand Down
1 change: 1 addition & 0 deletions sqld/src/replication/primary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod frame_stream;
pub mod logger;
mod storage;
196 changes: 196 additions & 0 deletions sqld/src/replication/primary/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use std::cmp::Ordering;
use std::collections::HashSet;
use std::fs::create_dir_all;
use std::io::{Seek, Write, SeekFrom};
use std::mem::size_of;
use std::path::{PathBuf, Path};
use std::pin::Pin;
use std::task::{Poll, Context};

use bytemuck::{Zeroable, Pod, bytes_of};
use futures::StreamExt;
use futures_core::Stream;
use walkdir::{WalkDir, DirEntry};

use crate::LIBSQL_PAGE_SIZE;
use crate::replication::frame::{FrameHeader, FrameMut};
use crate::replication::snapshot::{SnapshotFileHeader, SnapshotFile};
use crate::replication::{FrameNo, frame::Frame};
use crate::namespace::NamespaceName;

pub struct Compactor<W> {
wal: W,
seen: HashSet<u32>,
}

impl<W: Wal> Compactor<W> {
/// calls f on the deduplicated frames of the WAL, in reverse frame order.
/// returns the (start_frame_no, end_frame_no, page_count, size_after)
fn frames_with<F>(mut self, mut f: F) -> (FrameNo, FrameNo, u64, u32)
where F: FnMut(&LibsqlFrame, FrameNo)
{
let wal_frame_count = self.wal.frame_count();
let mut snapshot_frame_count = 0;
let mut size_after = 0;

while let Some(frame) = self.wal.next_frame_back() {
let frame_no = wal_frame_count - self.seen.len();
if !self.seen.contains(&frame.header.page_number) {
self.seen.insert(frame.header.page_number);
f(frame, frame_no as FrameNo);
if snapshot_frame_count == 0 {
size_after = frame.header.size_after;
}
snapshot_frame_count += 1;
}
}

let start_frame_no = self.wal.start_frame_no();
let end_frame_no = start_frame_no + self.wal.frame_count() as u64;

(start_frame_no, end_frame_no, snapshot_frame_count, size_after)
}
}

pub trait Wal {
/// Returns the start frame_no for the current WAL
fn start_frame_no(&self) -> FrameNo;
/// Returns the next frame from the WAL, starting from the end of the WAL.
fn next_frame_back<'a>(&'a mut self) -> Option<&'a LibsqlFrame>;
/// returns the number of frames in the WAL
fn frame_count(&self) -> usize;
}

pub trait Storage {
type LocateFrameStream: Stream<Item = Frame>;

fn checkpoint_wal<W>(
&self,
namespace: NamespaceName,
reader: Compactor<W>,
)
where W: Wal + Send;

fn locate(&self, namespace: NamespaceName, frame_no: FrameNo) -> Self::LocateFrameStream;
}

pub struct FsStorage {
path: PathBuf,
}

impl Storage for FsStorage {
type LocateFrameStream = FsFrameStreamer;

fn checkpoint_wal<W>(
&self,
namespace: NamespaceName,
compactor: Compactor<W>,
)
where W: Wal + Send
{
let mut file = tempfile::NamedTempFile::new().unwrap();
file.seek(std::io::SeekFrom::Start(size_of::<SnapshotFileHeader>() as _)).unwrap();
let (start_frame_no, end_frame_no, frame_count, size_after) = compactor.frames_with(|frame, frame_no| {
let frame_header = FrameHeader {
frame_no,
checksum: 0,
page_no: frame.header.page_number,
size_after: 0,
};

file.write_all(bytes_of(&frame_header)).unwrap();
file.write_all(&frame.data).unwrap();
});

let snapshot_header = SnapshotFileHeader {
namespace_id: namespace.id(),
start_frame_no,
end_frame_no,
frame_count,
size_after,
_pad: 0,
};

file.seek(SeekFrom::Start(0)).unwrap();
file.write_all(bytes_of(&snapshot_header)).unwrap();

let ns_snapshot_path = self.path.join(namespace.as_str());

create_dir_all(&ns_snapshot_path).unwrap();

let snapshot_name = format!("{start_frame_no}-{end_frame_no}");
let snapshot_path = ns_snapshot_path.join(snapshot_name);

file.persist(snapshot_path).unwrap();
}

fn locate(&self, namespace: NamespaceName, mut frame_no: FrameNo) -> Self::LocateFrameStream {
let path = self.path.join(namespace.as_str());
FsFrameStreamer {
frame_no,
path,
current_snapshot: None,
}
}
}

struct FsFrameStreamer {
frame_no: FrameNo,
path: PathBuf,
current_snapshot: Option<Pin<Box<dyn Stream<Item = crate::Result<FrameMut>>>>>,
}

impl Stream for FsFrameStreamer {
type Item = Frame;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.current_snapshot.is_none() {
let Some(snapshot) = locate_snapshot_file(&self.path, self.frame_no) else { return Poll::Ready(None) };
let stream = snapshot.into_stream_from(self.frame_no).into();
self.current_snapshot.replace(Box::pin(stream));
}

let current = self.current_snapshot.as_mut().unwrap();

match current.poll_next_unpin(cx)
}
}

fn locate_snapshot_file(dir: &Path, frame_no: FrameNo) -> Option<SnapshotFile> {
for entry in WalkDir::new(dir) {
let entry = entry.unwrap();
let mut split = entry.file_name().to_str().unwrap().split("-");
let start_fno: FrameNo = split.next().unwrap().parse().unwrap();
let end_fno: FrameNo = split.next().unwrap().parse().unwrap();
if (start_fno..=end_fno).contains(&frame_no) {
// FIXME: there is a chance that the snapshot we're trying to open was deleted, we
// should try to relocate the next snapshot
return Some(SnapshotFile::open(entry.path()).unwrap())
}
}

None
}

#[derive(Debug, Copy, Clone, Zeroable, Pod, PartialEq, Eq)]
#[repr(C)]
pub struct LibsqlFrame {
header: LibsqlFrameHeader,
data: [u8; LIBSQL_PAGE_SIZE as usize],
}

#[derive(Debug, Copy, Clone, Zeroable, Pod, PartialEq, Eq)]
#[repr(C)]
pub struct LibsqlFrameHeader {
page_number: u32,
size_after: u32,
salt1: u32,
salt2: u32,
checksum1: u32,
checksum2: u32,
}

#[cfg(test)]
mod test {
use super::*;
}
42 changes: 38 additions & 4 deletions sqld/src/replication/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::fs::File;
use std::io::BufWriter;
use std::io::SeekFrom;
use std::io::Write;
use std::mem::size_of;
use std::os::unix::prelude::FileExt;
Expand All @@ -14,9 +15,13 @@ use anyhow::Context;
use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
use bytes::BytesMut;
use crossbeam::channel::bounded;
use futures::StreamExt;
use futures_core::Stream;
use once_cell::sync::Lazy;
use regex::Regex;
use tempfile::NamedTempFile;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use uuid::Uuid;

use crate::namespace::NamespaceName;
Expand All @@ -35,7 +40,7 @@ const MAX_SNAPSHOT_NUMBER: usize = 32;
#[repr(C)]
pub struct SnapshotFileHeader {
/// id of the database
pub log_id: u128,
pub namespace_id: u128,
/// first frame in the snapshot
pub start_frame_no: u64,
/// end frame in the snapshot
Expand Down Expand Up @@ -174,6 +179,35 @@ impl SnapshotFile {
})
}

pub fn into_stream(self) -> impl Stream<Item = crate::Result<FrameMut>> {
async_stream::try_stream! {
let mut current_offset = 0;
let mut file = tokio::fs::File::from_std(self.file);
loop {
if current_offset >= self.header.frame_count {
break;
}
let read_offset = size_of::<SnapshotFileHeader>() as u64
+ current_offset * LogFile::FRAME_SIZE as u64;
current_offset += 1;
let mut buf = BytesMut::zeroed(LogFile::FRAME_SIZE);
file.seek(SeekFrom::Start(read_offset)).await?;
file.read_exact(&mut buf).await?;
let frame = FrameMut::try_from(&*buf)?;
yield frame;
}
}
}

pub fn into_stream_from(self, from: FrameNo) -> impl Stream<Item = crate::Result<FrameMut>> {
self.into_stream().take_while(move |frame| std::future::ready({
match frame {
Ok(f) => f.header().frame_no >= from,
Err(_) => true,
}
}))
}

pub fn header(&self) -> &SnapshotFileHeader {
&self.header
}
Expand Down Expand Up @@ -409,7 +443,7 @@ impl SnapshotBuilder {
Ok(Self {
seen_pages: HashSet::new(),
header: SnapshotFileHeader {
log_id: log_id.as_u128(),
namespace_id: log_id.as_u128(),
start_frame_no: u64::MAX,
end_frame_no: u64::MIN,
frame_count: 0,
Expand Down Expand Up @@ -468,7 +502,7 @@ impl SnapshotBuilder {
file.as_file().write_all_at(bytes_of(&self.header), 0)?;
let snapshot_name = format!(
"{}-{}-{}.snap",
Uuid::from_u128(self.header.log_id),
Uuid::from_u128(self.header.namespace_id),
self.header.start_frame_no,
self.header.end_frame_no,
);
Expand Down Expand Up @@ -544,7 +578,7 @@ mod test {
assert_eq!(header.start_frame_no, 0);
assert_eq!(header.end_frame_no, 49);
assert_eq!(header.frame_count, 25);
assert_eq!(header.log_id, log_id.as_u128());
assert_eq!(header.namespace_id, log_id.as_u128());
assert_eq!(header.size_after, 25);

let mut seen_frames = HashSet::new();
Expand Down