forked from delta-io/delta-rs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadd_column.rs
113 lines (94 loc) · 3.93 KB
/
add_column.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//! Add a new column to a table
use delta_kernel::schema::StructType;
use futures::future::BoxFuture;
use itertools::Itertools;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use crate::kernel::StructField;
use crate::logstore::LogStoreRef;
use crate::operations::cast::merge_schema::merge_delta_struct;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};
/// Add new columns and/or nested fields to a table
pub struct AddColumnBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Fields to add/merge into schema
fields: Option<Vec<StructField>>,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Additional information to add to the commit
commit_properties: CommitProperties,
}
impl super::Operation<()> for AddColumnBuilder {}
impl AddColumnBuilder {
/// Create a new builder
pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self {
Self {
snapshot,
log_store,
fields: None,
commit_properties: CommitProperties::default(),
}
}
/// Specify the fields to be added
pub fn with_fields(mut self, fields: impl IntoIterator<Item = StructField> + Clone) -> Self {
self.fields = Some(fields.into_iter().collect());
self
}
/// Additional metadata to be added to commit info
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
}
impl std::future::IntoFuture for AddColumnBuilder {
type Output = DeltaResult<DeltaTable>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let mut metadata = this.snapshot.metadata().clone();
let fields = match this.fields {
Some(v) => v,
None => return Err(DeltaTableError::Generic("No fields provided".to_string())),
};
let fields_right = &StructType::new(fields.clone());
let table_schema = this.snapshot.schema();
let new_table_schema = merge_delta_struct(table_schema, fields_right)?;
// TODO(ion): Think of a way how we can simply this checking through the API or centralize some checks.
let contains_timestampntz = PROTOCOL.contains_timestampntz(fields.iter());
let protocol = this.snapshot.protocol();
let maybe_new_protocol = if contains_timestampntz {
let updated_protocol = protocol.clone().enable_timestamp_ntz();
if !(protocol.min_reader_version == 3 && protocol.min_writer_version == 7) {
// Convert existing properties to features since we advanced the protocol to v3,7
Some(
updated_protocol
.move_table_properties_into_features(&metadata.configuration),
)
} else {
Some(updated_protocol)
}
} else {
None
};
let operation = DeltaOperation::AddColumn {
fields: fields.into_iter().collect_vec(),
};
metadata.schema_string = serde_json::to_string(&new_table_schema)?;
let mut actions = vec![metadata.into()];
if let Some(new_protocol) = maybe_new_protocol {
actions.push(new_protocol.into())
}
let commit = CommitBuilder::from(this.commit_properties)
.with_actions(actions)
.build(Some(&this.snapshot), this.log_store.clone(), operation)
.await?;
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}