Skip to content

Commit

Permalink
feat(rust): post commit hook (v2), create checkpoint hook (#2391)
Browse files Browse the repository at this point in the history
# Description
Introduces a post commit, which can do additional actions before
returning the FinalizedCommit.

Current commit hook will creates a checkpoint if it meets the condition
of the interval.

Also bumping the default interval to 100 commits. 10 commits can be a
bit aggressive

# Related Issue(s)
- closes #913
  • Loading branch information
ion-elgreco authored Apr 7, 2024
1 parent fef111c commit 5eade5e
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 12 deletions.
133 changes: 125 additions & 8 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,33 @@
//! │
//! ▼
//! ┌───────────────────────────────┐
//! │ Post Commit │
//! │ │
//! │ Commit that was materialized │
//! │ to storage with post commit │
//! │ hooks to be executed │
//! └──────────────┬────────────────┘
//! │
//! ▼
//! ┌───────────────────────────────┐
//! │ Finalized Commit │
//! │ │
//! │ Commit that was materialized │
//! │ to storage │
//! │ │
//! └───────────────────────────────┘
//! └───────────────────────────────┘
//!</pre>

use std::collections::HashMap;

use chrono::Utc;
use conflict_checker::ConflictChecker;
use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectStore};
use serde_json::Value;
use std::collections::HashMap;

use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary};
use crate::checkpoints::create_checkpoint_for;
use crate::errors::DeltaTableError;
use crate::kernel::{
Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, WriterFeatures,
Expand Down Expand Up @@ -293,19 +302,27 @@ impl CommitData {
}
}

#[derive(Clone, Debug, Copy)]
/// Properties for post commit hook.
pub struct PostCommitHookProperties {
create_checkpoint: bool,
}

#[derive(Clone, Debug)]
/// End user facing interface to be used by operations on the table.
/// Enable controling commit behaviour and modifying metadata that is written during a commit.
pub struct CommitProperties {
pub(crate) app_metadata: HashMap<String, Value>,
max_retries: usize,
create_checkpoint: bool,
}

impl Default for CommitProperties {
fn default() -> Self {
Self {
app_metadata: Default::default(),
max_retries: DEFAULT_RETRIES,
create_checkpoint: true,
}
}
}
Expand All @@ -319,13 +336,23 @@ impl CommitProperties {
self.app_metadata = HashMap::from_iter(metadata);
self
}

/// Specify if it should create a checkpoint when the commit interval condition is met
pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
self.create_checkpoint = create_checkpoint;
self
}
}

impl From<CommitProperties> for CommitBuilder {
fn from(value: CommitProperties) -> Self {
CommitBuilder {
max_retries: value.max_retries,
app_metadata: value.app_metadata,
post_commit_hook: PostCommitHookProperties {
create_checkpoint: value.create_checkpoint,
}
.into(),
..Default::default()
}
}
Expand All @@ -336,6 +363,7 @@ pub struct CommitBuilder {
actions: Vec<Action>,
app_metadata: HashMap<String, Value>,
max_retries: usize,
post_commit_hook: Option<PostCommitHookProperties>,
}

impl Default for CommitBuilder {
Expand All @@ -344,6 +372,7 @@ impl Default for CommitBuilder {
actions: Vec::new(),
app_metadata: HashMap::new(),
max_retries: DEFAULT_RETRIES,
post_commit_hook: None,
}
}
}
Expand All @@ -367,6 +396,12 @@ impl<'a> CommitBuilder {
self
}

/// Specify all the post commit hook properties
pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
self.post_commit_hook = post_commit_hook.into();
self
}

/// Prepare a Commit operation using the configured builder
pub fn build(
self,
Expand All @@ -380,6 +415,7 @@ impl<'a> CommitBuilder {
table_data,
max_retries: self.max_retries,
data,
post_commit_hook: self.post_commit_hook,
})
}
}
Expand All @@ -390,6 +426,7 @@ pub struct PreCommit<'a> {
table_data: Option<&'a dyn TableReference>,
data: CommitData,
max_retries: usize,
post_commit_hook: Option<PostCommitHookProperties>,
}

impl<'a> std::future::IntoFuture for PreCommit<'a> {
Expand All @@ -399,7 +436,7 @@ impl<'a> std::future::IntoFuture for PreCommit<'a> {
fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move { this.into_prepared_commit_future().await?.await })
Box::pin(async move { this.into_prepared_commit_future().await?.await?.await })
}
}

Expand Down Expand Up @@ -429,6 +466,7 @@ impl<'a> PreCommit<'a> {
table_data: this.table_data,
max_retries: this.max_retries,
data: this.data,
post_commit: this.post_commit_hook,
})
})
}
Expand All @@ -441,6 +479,7 @@ pub struct PreparedCommit<'a> {
data: CommitData,
table_data: Option<&'a dyn TableReference>,
max_retries: usize,
post_commit: Option<PostCommitHookProperties>,
}

impl<'a> PreparedCommit<'a> {
Expand All @@ -451,7 +490,7 @@ impl<'a> PreparedCommit<'a> {
}

impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
type Output = DeltaResult<FinalizedCommit>;
type Output = DeltaResult<PostCommit<'a>>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
Expand All @@ -462,9 +501,12 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {

if this.table_data.is_none() {
this.log_store.write_commit_entry(0, tmp_commit).await?;
return Ok(FinalizedCommit {
return Ok(PostCommit {
version: 0,
data: this.data,
create_checkpoint: false,
log_store: this.log_store,
table_data: this.table_data,
});
}

Expand All @@ -483,10 +525,16 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
let version = read_snapshot.version() + attempt_number as i64;
match this.log_store.write_commit_entry(version, tmp_commit).await {
Ok(()) => {
return Ok(FinalizedCommit {
return Ok(PostCommit {
version,
data: this.data,
})
create_checkpoint: this
.post_commit
.map(|v| v.create_checkpoint)
.unwrap_or_default(),
log_store: this.log_store,
table_data: this.table_data,
});
}
Err(TransactionError::VersionAlreadyExists(version)) => {
let summary = WinningCommitSummary::try_new(
Expand Down Expand Up @@ -534,6 +582,54 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}
}

/// Represents items for the post commit hook
pub struct PostCommit<'a> {
/// The winning version number of the commit
pub version: i64,
/// The data that was comitted to the log store
pub data: CommitData,
create_checkpoint: bool,
log_store: LogStoreRef,
table_data: Option<&'a dyn TableReference>,
}

impl<'a> PostCommit<'a> {
/// Runs the post commit activities
async fn run_post_commit_hook(
&self,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if self.create_checkpoint {
self.create_checkpoint(&self.table_data, &self.log_store, version, commit_data)
.await?
}
Ok(())
}
async fn create_checkpoint(
&self,
table: &Option<&'a dyn TableReference>,
log_store: &LogStoreRef,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if let Some(table) = table {
let checkpoint_interval = table.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
// We have to advance the snapshot otherwise we can't create a checkpoint
let mut snapshot = table.eager_snapshot().unwrap().clone();
snapshot.advance(vec![commit_data])?;
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
create_checkpoint_for(version, &state, log_store.as_ref()).await?
}
}
Ok(())
}
}

/// A commit that successfully completed
pub struct FinalizedCommit {
/// The winning version number of the commit
Expand All @@ -554,6 +650,27 @@ impl FinalizedCommit {
}
}

impl<'a> std::future::IntoFuture for PostCommit<'a> {
type Output = DeltaResult<FinalizedCommit>;
type IntoFuture = BoxFuture<'a, Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
match this.run_post_commit_hook(this.version, &this.data).await {
Ok(_) => {
return Ok(FinalizedCommit {
version: this.version,
data: this.data,
})
}
Err(err) => return Err(err),
};
})
}
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::logstore::LogStore;
use crate::table::state::DeltaTableState;
use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder};
use crate::{open_table_with_version, DeltaTable};

type SchemaPath = Vec<String>;

/// Error returned when there is an error during creating a checkpoint.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl<'a> TableConfig<'a> {
DeltaConfigKey::CheckpointInterval,
checkpoint_interval,
i32,
10
100
),
);

Expand Down Expand Up @@ -591,7 +591,7 @@ mod tests {
fn get_long_from_metadata_test() {
let md = dummy_metadata();
let config = TableConfig(&md.configuration);
assert_eq!(config.checkpoint_interval(), 10,)
assert_eq!(config.checkpoint_interval(), 100,)
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{DeltaResult, DeltaTableError};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DeltaTableState {
app_transaction_version: HashMap<String, i64>,
pub(crate) app_transaction_version: HashMap<String, i64>,
pub(crate) snapshot: EagerSnapshot,
}

Expand Down
Loading

0 comments on commit 5eade5e

Please sign in to comment.