1
1
//! Command for creating a new delta table
2
2
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala
3
3
4
- use std:: collections:: { HashMap , HashSet } ;
4
+ use std:: collections:: HashMap ;
5
5
use std:: sync:: Arc ;
6
6
7
7
use futures:: future:: BoxFuture ;
8
+ use maplit:: hashset;
8
9
use serde_json:: Value ;
9
10
10
11
use super :: transaction:: { CommitBuilder , TableReference , PROTOCOL } ;
@@ -13,6 +14,9 @@ use crate::kernel::{
13
14
Action , DataType , Metadata , Protocol , ReaderFeatures , StructField , StructType , WriterFeatures ,
14
15
} ;
15
16
use crate :: logstore:: { LogStore , LogStoreRef } ;
17
+ use crate :: operations:: set_tbl_properties:: {
18
+ apply_properties_to_protocol, convert_properties_to_features,
19
+ } ;
16
20
use crate :: protocol:: { DeltaOperation , SaveMode } ;
17
21
use crate :: table:: builder:: ensure_table_uri;
18
22
use crate :: table:: config:: DeltaConfigKey ;
@@ -237,41 +241,28 @@ impl CreateBuilder {
237
241
)
238
242
} ;
239
243
244
+ let configuration = self . configuration ;
240
245
let contains_timestampntz = PROTOCOL . contains_timestampntz ( & self . columns ) ;
246
+
241
247
// TODO configure more permissive versions based on configuration. Also how should this ideally be handled?
242
248
// We set the lowest protocol we can, and if subsequent writes use newer features we update metadata?
243
249
244
- let ( min_reader_version, min_writer_version, writer_features, reader_features) =
245
- if contains_timestampntz {
246
- let mut converted_writer_features = self
247
- . configuration
248
- . keys ( )
249
- . map ( |key| key. clone ( ) . into ( ) )
250
- . filter ( |v| !matches ! ( v, WriterFeatures :: Other ( _) ) )
251
- . collect :: < HashSet < WriterFeatures > > ( ) ;
252
-
253
- let mut converted_reader_features = self
254
- . configuration
255
- . keys ( )
256
- . map ( |key| key. clone ( ) . into ( ) )
257
- . filter ( |v| !matches ! ( v, ReaderFeatures :: Other ( _) ) )
258
- . collect :: < HashSet < ReaderFeatures > > ( ) ;
259
- converted_writer_features. insert ( WriterFeatures :: TimestampWithoutTimezone ) ;
260
- converted_reader_features. insert ( ReaderFeatures :: TimestampWithoutTimezone ) ;
261
- (
262
- 3 ,
263
- 7 ,
264
- Some ( converted_writer_features) ,
265
- Some ( converted_reader_features) ,
266
- )
267
- } else {
268
- (
269
- PROTOCOL . default_reader_version ( ) ,
270
- PROTOCOL . default_writer_version ( ) ,
271
- None ,
272
- None ,
273
- )
274
- } ;
250
+ let current_protocol = if contains_timestampntz {
251
+ Protocol {
252
+ min_reader_version : 3 ,
253
+ min_writer_version : 7 ,
254
+ writer_features : Some ( hashset ! { WriterFeatures :: TimestampWithoutTimezone } ) ,
255
+ reader_features : Some ( hashset ! { ReaderFeatures :: TimestampWithoutTimezone } ) ,
256
+ }
257
+ } else {
258
+ Protocol {
259
+ min_reader_version : PROTOCOL . default_reader_version ( ) ,
260
+ min_writer_version : PROTOCOL . default_writer_version ( ) ,
261
+ reader_features : None ,
262
+ writer_features : None ,
263
+ }
264
+ } ;
265
+
275
266
let protocol = self
276
267
. actions
277
268
. iter ( )
@@ -280,17 +271,23 @@ impl CreateBuilder {
280
271
Action :: Protocol ( p) => p. clone ( ) ,
281
272
_ => unreachable ! ( ) ,
282
273
} )
283
- . unwrap_or_else ( || Protocol {
284
- min_reader_version,
285
- min_writer_version,
286
- writer_features,
287
- reader_features,
288
- } ) ;
274
+ . unwrap_or_else ( || current_protocol) ;
275
+
276
+ let protocol = apply_properties_to_protocol (
277
+ & protocol,
278
+ & configuration
279
+ . iter ( )
280
+ . map ( |( k, v) | ( k. clone ( ) , v. clone ( ) . unwrap ( ) ) )
281
+ . collect :: < HashMap < String , String > > ( ) ,
282
+ true ,
283
+ ) ?;
284
+
285
+ let protocol = convert_properties_to_features ( protocol, & configuration) ;
289
286
290
287
let mut metadata = Metadata :: try_new (
291
288
StructType :: new ( self . columns ) ,
292
289
self . partition_columns . unwrap_or_default ( ) ,
293
- self . configuration ,
290
+ configuration,
294
291
) ?
295
292
. with_created_time ( chrono:: Utc :: now ( ) . timestamp_millis ( ) ) ;
296
293
if let Some ( name) = self . name {
0 commit comments