Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support soft delete blobs #561

Merged
merged 2 commits into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/jwst-codec-utils/src/doc_operation/yrs_op/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub fn yrs_create_nest_type_from_root(doc: &yrs::Doc, target_type: CRDTNestType,
}
}

pub fn gen_nest_type_from_root(doc: &mut Doc, crdt_param: &CRDTParam) -> Option<YrsNestType> {
pub fn gen_nest_type_from_root(doc: &Doc, crdt_param: &CRDTParam) -> Option<YrsNestType> {
match crdt_param.new_nest_type {
CRDTNestType::Array => Some(yrs_create_nest_type_from_root(
doc,
Expand Down Expand Up @@ -124,7 +124,7 @@ pub fn gen_nest_type_from_root(doc: &mut Doc, crdt_param: &CRDTParam) -> Option<
}

pub fn gen_nest_type_from_nest_type(
doc: &mut Doc,
doc: &Doc,
crdt_param: CRDTParam,
nest_type: &mut YrsNestType,
) -> Option<YrsNestType> {
Expand Down
3 changes: 2 additions & 1 deletion libs/jwst-storage/src/entities/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand All @@ -13,6 +13,7 @@ pub struct Model {
pub blob: Vec<u8>,
pub length: i64,
pub created_at: DateTimeWithTimeZone,
pub deleted_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/bucket_blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/diff_log.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/docs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand Down
2 changes: 1 addition & 1 deletion libs/jwst-storage/src/entities/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

pub mod prelude;

Expand Down
3 changes: 2 additions & 1 deletion libs/jwst-storage/src/entities/optimized_blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

use sea_orm::entity::prelude::*;

Expand All @@ -15,6 +15,7 @@ pub struct Model {
pub created_at: DateTimeWithTimeZone,
#[sea_orm(primary_key, auto_increment = false)]
pub params: String,
pub deleted_at: Option<DateTimeWithTimeZone>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
11 changes: 6 additions & 5 deletions libs/jwst-storage/src/entities/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.2
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.6

pub use super::{
blobs::Entity as Blobs, bucket_blobs::Entity as BucketBlobs, diff_log::Entity as DiffLog, docs::Entity as Docs,
optimized_blobs::Entity as OptimizedBlobs,
};
pub use super::blobs::Entity as Blobs;
pub use super::bucket_blobs::Entity as BucketBlobs;
pub use super::diff_log::Entity as DiffLog;
pub use super::docs::Entity as Docs;
pub use super::optimized_blobs::Entity as OptimizedBlobs;
2 changes: 2 additions & 0 deletions libs/jwst-storage/src/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod m20230321_000001_blob_optimized_table;
mod m20230614_000001_initial_bucket_blob_table;
mod m20230626_023319_doc_guid;
mod m20230814_061223_initial_diff_log_table;
mod m20231124_082401_blob_deleted_at;
mod schemas;

pub struct Migrator;
Expand All @@ -20,6 +21,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230614_000001_initial_bucket_blob_table::Migration),
Box::new(m20230626_023319_doc_guid::Migration),
Box::new(m20230814_061223_initial_diff_log_table::Migration),
Box::new(m20231124_082401_blob_deleted_at::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use sea_orm_migration::prelude::*;

use crate::schemas::{Blobs, OptimizedBlobs};

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Blobs::Table)
.add_column(ColumnDef::new(Blobs::DeletedAt).timestamp_with_time_zone().null())
.to_owned(),
)
.await?;

manager
.alter_table(
Table::alter()
.table(OptimizedBlobs::Table)
.add_column(
ColumnDef::new(OptimizedBlobs::DeletedAt)
.timestamp_with_time_zone()
.null(),
)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Blobs::Table)
.drop_column(Blobs::DeletedAt)
.to_owned(),
)
.await?;

Check warning on line 42 in libs/jwst-storage/src/migration/src/m20231124_082401_blob_deleted_at.rs

View check run for this annotation

Codecov / codecov/patch

libs/jwst-storage/src/migration/src/m20231124_082401_blob_deleted_at.rs#L34-L42

Added lines #L34 - L42 were not covered by tests

manager
.alter_table(
Table::alter()
.table(OptimizedBlobs::Table)
.drop_column(OptimizedBlobs::DeletedAt)
.to_owned(),
)
.await
}

Check warning on line 52 in libs/jwst-storage/src/migration/src/m20231124_082401_blob_deleted_at.rs

View check run for this annotation

Codecov / codecov/patch

libs/jwst-storage/src/migration/src/m20231124_082401_blob_deleted_at.rs#L44-L52

Added lines #L44 - L52 were not covered by tests
}
4 changes: 4 additions & 0 deletions libs/jwst-storage/src/migration/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum Blobs {
Length,
#[iden = "created_at"]
Timestamp,
#[iden = "deleted_at"]
DeletedAt,
}

#[derive(Iden)]
Expand All @@ -36,6 +38,8 @@ pub enum OptimizedBlobs {
#[iden = "created_at"]
Timestamp,
Params,
#[iden = "deleted_at"]
DeletedAt,
}

#[derive(Iden)]
Expand Down
34 changes: 23 additions & 11 deletions libs/jwst-storage/src/storage/blobs/auto_storage/auto_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,31 @@ impl BlobAutoStorage {

async fn exists(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult<bool> {
Ok(OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into()))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
.map(|c| c > 0)?)
}

async fn insert(&self, table: &str, hash: &str, params: &str, blob: &[u8]) -> JwstBlobResult<()> {
if !self.exists(table, hash, params).await? {
async fn insert(&self, workspace: &str, hash: &str, params: &str, blob: &[u8]) -> JwstBlobResult<()> {
if let Some(model) = OptimizedBlobs::find_by_id((workspace.into(), hash.into(), params.into()))
.one(&self.pool)
.await?
{
if model.deleted_at.is_some() {
let mut model: OptimizedBlobActiveModel = model.into();
model.deleted_at = Set(None);
model.update(&self.pool).await?;
}
} else {
OptimizedBlobs::insert(OptimizedBlobActiveModel {
workspace_id: Set(table.into()),
workspace_id: Set(workspace.into()),
hash: Set(hash.into()),
blob: Set(blob.into()),
length: Set(blob.len().try_into().unwrap()),
params: Set(params.into()),
created_at: Set(Utc::now().into()),
deleted_at: Set(None),
})
.exec(&self.pool)
.await?;
Expand All @@ -53,6 +64,7 @@ impl BlobAutoStorage {

async fn get(&self, table: &str, hash: &str, params: &str) -> JwstBlobResult<OptimizedBlobModel> {
OptimizedBlobs::find_by_id((table.into(), hash.into(), params.into()))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.one(&self.pool)
.await
.map_err(|e| e.into())
Expand Down Expand Up @@ -141,16 +153,16 @@ impl BlobAutoStorage {
}
}

async fn delete(&self, table: &str, hash: &str) -> JwstBlobResult<u64> {
Ok(OptimizedBlobs::delete_many()
.filter(
OptimizedBlobColumn::WorkspaceId
.eq(table)
.and(OptimizedBlobColumn::Hash.eq(hash)),
)
async fn delete(&self, workspace: &str, hash: &str) -> JwstBlobResult<u64> {
OptimizedBlobs::update_many()
.col_expr(OptimizedBlobColumn::DeletedAt, Expr::value(Utc::now()))
.filter(OptimizedBlobColumn::WorkspaceId.eq(workspace))
.filter(OptimizedBlobColumn::Hash.eq(hash))
.filter(OptimizedBlobColumn::DeletedAt.is_null())
.exec(&self.pool)
.await
.map(|r| r.rows_affected)?)
.map(|r| r.rows_affected)
.map_err(|e| e.into())
}

async fn drop(&self, table: &str) -> Result<(), DbErr> {
Expand Down
37 changes: 33 additions & 4 deletions libs/jwst-storage/src/storage/blobs/blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
async fn all(&self, workspace: &str) -> Result<Vec<BlobModel>, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.all(&self.pool)
.await
}
Expand All @@ -48,6 +49,7 @@

Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.select_only()
.column(BlobColumn::Hash)
.into_model::<BlobHash>()
Expand All @@ -60,12 +62,14 @@
async fn count(&self, workspace: &str) -> Result<u64, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
}

async fn exists(&self, workspace: &str, hash: &str) -> Result<bool, DbErr> {
Blobs::find_by_id((workspace.into(), hash.into()))
.filter(BlobColumn::DeletedAt.is_null())
.count(&self.pool)
.await
.map(|c| c > 0)
Expand All @@ -76,6 +80,7 @@
.select_only()
.column_as(BlobColumn::Length, "size")
.column_as(BlobColumn::CreatedAt, "created_at")
.filter(BlobColumn::DeletedAt.is_null())
.into_model::<InternalBlobMetadata>()
.one(&self.pool)
.await
Expand All @@ -86,6 +91,7 @@
pub(super) async fn get_blobs_size(&self, workspaces: &[String]) -> Result<Option<i64>, DbErr> {
Blobs::find()
.filter(BlobColumn::WorkspaceId.is_in(workspaces))
.filter(BlobColumn::DeletedAt.is_null())

Check warning on line 94 in libs/jwst-storage/src/storage/blobs/blob_storage.rs

View check run for this annotation

Codecov / codecov/patch

libs/jwst-storage/src/storage/blobs/blob_storage.rs#L94

Added line #L94 was not covered by tests
.select_only()
.column_as(BlobColumn::Length.sum().cast_as(Alias::new("bigint")), "size")
.into_tuple::<Option<i64>>()
Expand All @@ -95,13 +101,23 @@
}

async fn insert(&self, workspace: &str, hash: &str, blob: &[u8]) -> Result<(), DbErr> {
if !self.exists(workspace, hash).await? {
if let Some(model) = Blobs::find_by_id((workspace.into(), hash.into()))
.one(&self.pool)
.await?
{
if model.deleted_at.is_some() {
let mut model: BlobActiveModel = model.into();
model.deleted_at = Set(None);
model.update(&self.pool).await?;
}

Check warning on line 112 in libs/jwst-storage/src/storage/blobs/blob_storage.rs

View check run for this annotation

Codecov / codecov/patch

libs/jwst-storage/src/storage/blobs/blob_storage.rs#L108-L112

Added lines #L108 - L112 were not covered by tests
} else {
Blobs::insert(BlobActiveModel {
workspace_id: Set(workspace.into()),
hash: Set(hash.into()),
blob: Set(blob.into()),
length: Set(blob.len().try_into().unwrap()),
created_at: Set(Utc::now().into()),
deleted_at: Set(None),
})
.exec(&self.pool)
.await?;
Expand All @@ -112,14 +128,19 @@

pub(super) async fn get(&self, workspace: &str, hash: &str) -> JwstBlobResult<BlobModel> {
Blobs::find_by_id((workspace.into(), hash.into()))
.filter(BlobColumn::DeletedAt.is_null())
.one(&self.pool)
.await
.map_err(|e| e.into())
.and_then(|r| r.ok_or(JwstBlobError::BlobNotFound(hash.into())))
}

async fn delete(&self, workspace: &str, hash: &str) -> Result<bool, DbErr> {
Blobs::delete_by_id((workspace.into(), hash.into()))
Blobs::update_many()
.col_expr(BlobColumn::DeletedAt, Expr::value(Utc::now()))
.filter(BlobColumn::WorkspaceId.eq(workspace))
.filter(BlobColumn::Hash.eq(hash))
.filter(BlobColumn::DeletedAt.is_null())
.exec(&self.pool)
.await
.map(|r| r.rows_affected == 1)
Expand Down Expand Up @@ -262,7 +283,8 @@
hash: "test".into(),
blob: vec![1, 2, 3, 4],
length: 4,
created_at: all.get(0).unwrap().created_at
created_at: all.get(0).unwrap().created_at,
deleted_at: None,
}]
);
assert_eq!(pool.count("basic").await?, 1);
Expand All @@ -282,7 +304,8 @@
hash: "test1".into(),
blob: vec![1, 2, 3, 4],
length: 4,
created_at: all.get(0).unwrap().created_at
created_at: all.get(0).unwrap().created_at,
deleted_at: None,
}]
);
assert_eq!(pool.count("basic").await?, 1);
Expand All @@ -295,5 +318,11 @@

pool.drop("basic").await?;

pool.insert("basic", "test1", &[1, 2, 3, 4]).await?;
pool.delete("basic", "test1").await?;

assert_eq!(pool.count("basic").await?, 0);
pool.drop("basic").await?;

Ok(())
}
Loading