Skip to content

Commit

Permalink
fix(iroh-blobs): unconditionally delete blobs (#2692)
Browse files Browse the repository at this point in the history
- always delete hashes, even if they are protected when calling
`Store::delete`
- move gc loop implementation into the stores

## Breaking Changes

- removed `Store::gc_sweep`
- removed `Store::gc_mark`
- removed `Store::gc_start`
- added `Store::gc_run` which starts the full gc schedule

Closes #2657
  • Loading branch information
dignifiedquire authored Sep 3, 2024
1 parent a029d89 commit 567577d
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 135 deletions.
134 changes: 126 additions & 8 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
//! errors when communicating with the actor.
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
io,
path::{Path, PathBuf},
sync::{Arc, RwLock},
Expand All @@ -78,6 +79,7 @@ use bao_tree::io::{
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};

use genawaiter::rc::{Co, Gen};
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError};
Expand All @@ -102,6 +104,7 @@ use crate::{
tables::BaoFilePart,
util::{overwrite_and_sync, read_and_remove},
},
GcMarkEvent, GcSweepEvent,
},
util::{
compute_outboard,
Expand All @@ -122,7 +125,7 @@ use self::test_support::EntryData;
use super::{
bao_file::{BaoFileConfig, BaoFileHandle, BaoFileHandleWeak, CreateCb},
temp_name, BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, EntryStatus, ExportMode,
ExportProgressCb, ImportMode, ImportProgress, Map, TempCounterMap,
ExportProgressCb, ImportMode, ImportProgress, Map, ReadableStore, TempCounterMap,
};

/// Location of the data.
Expand Down Expand Up @@ -627,6 +630,11 @@ pub(crate) enum ActorMessage {
hashes: Vec<Hash>,
tx: oneshot::Sender<ActorResult<()>>,
},
/// Modification method: delete the data for a number of hashes, only if not protected
GcDelete {
hashes: Vec<Hash>,
tx: oneshot::Sender<ActorResult<()>>,
},
/// Sync the entire database to disk.
///
/// This just makes sure that there is no write transaction open.
Expand Down Expand Up @@ -670,7 +678,8 @@ impl ActorMessage {
| Self::SetTag { .. }
| Self::CreateTag { .. }
| Self::SetFullEntryState { .. }
| Self::Delete { .. } => MessageCategory::ReadWrite,
| Self::Delete { .. }
| Self::GcDelete { .. } => MessageCategory::ReadWrite,
Self::UpdateInlineOptions { .. }
| Self::Sync { .. }
| Self::Shutdown { .. }
Expand Down Expand Up @@ -887,6 +896,12 @@ impl StoreInner {
Ok(rx.await??)
}

async fn gc_delete(&self, hashes: Vec<Hash>) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::GcDelete { hashes, tx }).await?;
Ok(rx.await??)
}

async fn gc_start(&self) -> OuterResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ActorMessage::GcStart { tx }).await?;
Expand Down Expand Up @@ -1374,9 +1389,76 @@ impl super::Store for Store {
Ok(self.0.delete(hashes).await?)
}

async fn gc_start(&self) -> io::Result<()> {
self.0.gc_start().await?;
Ok(())
async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
tracing::info!("Starting GC task with interval {:?}", config.period);
let mut live = BTreeSet::new();
'outer: loop {
if let Err(cause) = self.0.gc_start().await {
tracing::debug!(
"unable to notify the db of GC start: {cause}. Shutting down GC loop."
);
break;
}
// do delay before the two phases of GC
tokio::time::sleep(config.period).await;
tracing::debug!("Starting GC");
live.clear();

let p = protected_cb().await;
live.extend(p);

tracing::debug!("Starting GC mark phase");
let live_ref = &mut live;
let mut stream = Gen::new(|co| async move {
if let Err(e) = super::gc_mark_task(self, live_ref, &co).await {
co.yield_(GcMarkEvent::Error(e)).await;
}
});
while let Some(item) = stream.next().await {
match item {
GcMarkEvent::CustomDebug(text) => {
tracing::debug!("{}", text);
}
GcMarkEvent::CustomWarning(text, _) => {
tracing::warn!("{}", text);
}
GcMarkEvent::Error(err) => {
tracing::error!("Fatal error during GC mark {}", err);
continue 'outer;
}
}
}
drop(stream);

tracing::debug!("Starting GC sweep phase");
let live_ref = &live;
let mut stream = Gen::new(|co| async move {
if let Err(e) = gc_sweep_task(self, live_ref, &co).await {
co.yield_(GcSweepEvent::Error(e)).await;
}
});
while let Some(item) = stream.next().await {
match item {
GcSweepEvent::CustomDebug(text) => {
tracing::debug!("{}", text);
}
GcSweepEvent::CustomWarning(text, _) => {
tracing::warn!("{}", text);
}
GcSweepEvent::Error(err) => {
tracing::error!("Fatal error during GC mark {}", err);
continue 'outer;
}
}
}
if let Some(ref cb) = config.done_callback {
cb();
}
}
}

fn temp_tag(&self, value: HashAndFormat) -> TempTag {
Expand All @@ -1392,6 +1474,36 @@ impl super::Store for Store {
}
}

pub(super) async fn gc_sweep_task<'a>(
store: &'a Store,
live: &BTreeSet<Hash>,
co: &Co<GcSweepEvent>,
) -> anyhow::Result<()> {
let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
let mut count = 0;
let mut batch = Vec::new();
for hash in blobs {
let hash = hash?;
if !live.contains(&hash) {
batch.push(hash);
count += 1;
}
if batch.len() >= 100 {
store.0.gc_delete(batch.clone()).await?;
batch.clear();
}
}
if !batch.is_empty() {
store.0.gc_delete(batch).await?;
}
co.yield_(GcSweepEvent::CustomDebug(format!(
"deleted {} blobs",
count
)))
.await;
Ok(())
}

impl Actor {
fn new(
path: &Path,
Expand Down Expand Up @@ -2033,16 +2145,18 @@ impl ActorState {
Ok(())
}

fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>) -> ActorResult<()> {
fn delete(&mut self, tables: &mut Tables, hashes: Vec<Hash>, force: bool) -> ActorResult<()> {
for hash in hashes {
if self.temp.as_ref().read().unwrap().contains(&hash) {
continue;
}
if self.protected.contains(&hash) {
if !force && self.protected.contains(&hash) {
tracing::debug!("protected hash, continuing {}", &hash.to_hex()[..8]);
continue;
}

tracing::debug!("deleting {}", &hash.to_hex()[..8]);

self.handles.remove(&hash);
if let Some(entry) = tables.blobs.remove(hash)? {
match entry.value() {
Expand Down Expand Up @@ -2253,7 +2367,11 @@ impl ActorState {
tx.send(res).ok();
}
ActorMessage::Delete { hashes, tx } => {
let res = self.delete(tables, hashes);
let res = self.delete(tables, hashes, true);
tx.send(res).ok();
}
ActorMessage::GcDelete { hashes, tx } => {
let res = self.delete(tables, hashes, false);
tx.send(res).ok();
}
ActorMessage::OnComplete { handle } => {
Expand Down
11 changes: 8 additions & 3 deletions iroh-blobs/src/store/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use futures_lite::{Stream, StreamExt};
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_io::AsyncSliceReader;
use std::{
collections::BTreeMap,
collections::{BTreeMap, BTreeSet},
future::Future,
io,
path::PathBuf,
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
Expand Down Expand Up @@ -228,8 +229,12 @@ impl super::Store for Store {
self.inner.temp_tag(tag)
}

async fn gc_start(&self) -> io::Result<()> {
Ok(())
async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
}

async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
Expand Down
11 changes: 8 additions & 3 deletions iroh-blobs/src/store/readonly_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//!
//! Main entry point is [Store].
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
future::Future,
io,
path::PathBuf,
sync::Arc,
Expand Down Expand Up @@ -324,8 +325,12 @@ impl super::Store for Store {
TempTag::new(inner, None)
}

async fn gc_start(&self) -> io::Result<()> {
Ok(())
async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
where
G: Fn() -> Gut,
Gut: Future<Output = BTreeSet<Hash>> + Send,
{
super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
}

async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
Expand Down
Loading

0 comments on commit 567577d

Please sign in to comment.