@@ -22,6 +22,7 @@ use std::collections::{HashMap, HashSet};
22
22
use std:: future:: Future ;
23
23
use std:: mem:: discriminant;
24
24
use std:: ops:: RangeFrom ;
25
+ use std:: sync:: Arc ;
25
26
26
27
use arrow_array:: StringArray ;
27
28
use futures:: TryStreamExt ;
@@ -45,7 +46,7 @@ const META_ROOT_PATH: &str = "metadata";
45
46
/// Table transaction.
46
47
pub struct Transaction < ' a > {
47
48
base_table : & ' a Table ,
48
- current_metadata : TableMetadata ,
49
+ current_table : Table ,
49
50
updates : Vec < TableUpdate > ,
50
51
requirements : Vec < TableRequirement > ,
51
52
}
@@ -55,19 +56,20 @@ impl<'a> Transaction<'a> {
55
56
pub fn new ( table : & ' a Table ) -> Self {
56
57
Self {
57
58
base_table : table,
58
- current_metadata : table. metadata ( ) . clone ( ) ,
59
+ current_table : table. clone ( ) ,
59
60
updates : vec ! [ ] ,
60
61
requirements : vec ! [ ] ,
61
62
}
62
63
}
63
64
64
65
fn update_table_metadata ( & mut self , updates : & [ TableUpdate ] ) -> Result < ( ) > {
65
- let mut metadata_builder = self . current_metadata . clone ( ) . into_builder ( None ) ;
66
+ let mut metadata_builder = self . current_table . metadata ( ) . clone ( ) . into_builder ( None ) ;
66
67
for update in updates {
67
68
metadata_builder = update. clone ( ) . apply ( metadata_builder) ?;
68
69
}
69
70
70
- self . current_metadata = metadata_builder. build ( ) ?. metadata ;
71
+ self . current_table
72
+ . with_metadata ( Arc :: new ( metadata_builder. build ( ) ?. metadata ) ) ;
71
73
72
74
Ok ( ( ) )
73
75
}
@@ -78,7 +80,7 @@ impl<'a> Transaction<'a> {
78
80
requirements : Vec < TableRequirement > ,
79
81
) -> Result < ( ) > {
80
82
for requirement in & requirements {
81
- requirement. check ( Some ( & self . current_metadata ) ) ?;
83
+ requirement. check ( Some ( self . current_table . metadata ( ) ) ) ?;
82
84
}
83
85
84
86
self . update_table_metadata ( & updates) ?;
@@ -106,7 +108,7 @@ impl<'a> Transaction<'a> {
106
108
107
109
/// Sets table to a new version.
108
110
pub fn upgrade_table_version ( mut self , format_version : FormatVersion ) -> Result < Self > {
109
- let current_version = self . current_metadata . format_version ( ) ;
111
+ let current_version = self . current_table . metadata ( ) . format_version ( ) ;
110
112
match current_version. cmp ( & format_version) {
111
113
Ordering :: Greater => {
112
114
return Err ( Error :: new (
@@ -145,7 +147,8 @@ impl<'a> Transaction<'a> {
145
147
} ;
146
148
let mut snapshot_id = generate_random_id ( ) ;
147
149
while self
148
- . current_metadata
150
+ . current_table
151
+ . metadata ( )
149
152
. snapshots ( )
150
153
. any ( |s| s. snapshot_id ( ) == snapshot_id)
151
154
{
@@ -239,7 +242,8 @@ impl<'a> FastAppendAction<'a> {
239
242
if !self
240
243
. snapshot_produce_action
241
244
. tx
242
- . current_metadata
245
+ . current_table
246
+ . metadata ( )
243
247
. default_spec
244
248
. is_unpartitioned ( )
245
249
{
@@ -250,9 +254,9 @@ impl<'a> FastAppendAction<'a> {
250
254
}
251
255
252
256
let data_files = ParquetWriter :: parquet_files_to_data_files (
253
- self . snapshot_produce_action . tx . base_table . file_io ( ) ,
257
+ self . snapshot_produce_action . tx . current_table . file_io ( ) ,
254
258
file_path,
255
- & self . snapshot_produce_action . tx . current_metadata ,
259
+ self . snapshot_produce_action . tx . current_table . metadata ( ) ,
256
260
)
257
261
. await ?;
258
262
@@ -274,7 +278,7 @@ impl<'a> FastAppendAction<'a> {
274
278
let mut manifest_stream = self
275
279
. snapshot_produce_action
276
280
. tx
277
- . base_table
281
+ . current_table
278
282
. inspect ( )
279
283
. manifests ( )
280
284
. scan ( )
@@ -335,14 +339,19 @@ impl SnapshotProduceOperation for FastAppendOperation {
335
339
& self ,
336
340
snapshot_produce : & SnapshotProduceAction < ' _ > ,
337
341
) -> Result < Vec < ManifestFile > > {
338
- let Some ( snapshot) = snapshot_produce. tx . current_metadata . current_snapshot ( ) else {
342
+ let Some ( snapshot) = snapshot_produce
343
+ . tx
344
+ . current_table
345
+ . metadata ( )
346
+ . current_snapshot ( )
347
+ else {
339
348
return Ok ( vec ! [ ] ) ;
340
349
} ;
341
350
342
351
let manifest_list = snapshot
343
352
. load_manifest_list (
344
- snapshot_produce. tx . base_table . file_io ( ) ,
345
- & snapshot_produce. tx . current_metadata ,
353
+ snapshot_produce. tx . current_table . file_io ( ) ,
354
+ snapshot_produce. tx . current_table . metadata ( ) ,
346
355
)
347
356
. await ?;
348
357
@@ -456,7 +465,7 @@ impl<'a> SnapshotProduceAction<'a> {
456
465
for data_file in data_files {
457
466
Self :: validate_partition_value (
458
467
data_file. partition ( ) ,
459
- self . tx . current_metadata . default_partition_type ( ) ,
468
+ self . tx . current_table . metadata ( ) . default_partition_type ( ) ,
460
469
) ?;
461
470
if data_file. content_type ( ) == DataContentType :: Data {
462
471
self . added_data_files . push ( data_file) ;
@@ -470,13 +479,16 @@ impl<'a> SnapshotProduceAction<'a> {
470
479
fn new_manifest_output ( & mut self ) -> Result < OutputFile > {
471
480
let new_manifest_path = format ! (
472
481
"{}/{}/{}-m{}.{}" ,
473
- self . tx. current_metadata . location( ) ,
482
+ self . tx. current_table . metadata ( ) . location( ) ,
474
483
META_ROOT_PATH ,
475
484
self . commit_uuid,
476
485
self . manifest_counter. next( ) . unwrap( ) ,
477
486
DataFileFormat :: Avro
478
487
) ;
479
- self . tx . base_table . file_io ( ) . new_output ( new_manifest_path)
488
+ self . tx
489
+ . current_table
490
+ . file_io ( )
491
+ . new_output ( new_manifest_path)
480
492
}
481
493
482
494
// Write manifest file for added data files and return the ManifestFile for ManifestList.
@@ -485,7 +497,7 @@ impl<'a> SnapshotProduceAction<'a> {
485
497
added_data_files : Vec < DataFile > ,
486
498
) -> Result < ManifestFile > {
487
499
let snapshot_id = self . snapshot_id ;
488
- let format_version = self . tx . current_metadata . format_version ( ) ;
500
+ let format_version = self . tx . current_table . metadata ( ) . format_version ( ) ;
489
501
let content_type = {
490
502
let mut data_num = 0 ;
491
503
let mut delete_num = 0 ;
@@ -524,14 +536,15 @@ impl<'a> SnapshotProduceAction<'a> {
524
536
self . new_manifest_output ( ) ?,
525
537
Some ( self . snapshot_id ) ,
526
538
self . key_metadata . clone ( ) ,
527
- self . tx . current_metadata . current_schema ( ) . clone ( ) ,
539
+ self . tx . current_table . metadata ( ) . current_schema ( ) . clone ( ) ,
528
540
self . tx
529
- . current_metadata
541
+ . current_table
542
+ . metadata ( )
530
543
. default_partition_spec ( )
531
544
. as_ref ( )
532
545
. clone ( ) ,
533
546
) ;
534
- if self . tx . current_metadata . format_version ( ) == FormatVersion :: V1 {
547
+ if self . tx . current_table . metadata ( ) . format_version ( ) == FormatVersion :: V1 {
535
548
builder. build_v1 ( )
536
549
} else {
537
550
match content_type {
@@ -581,7 +594,7 @@ impl<'a> SnapshotProduceAction<'a> {
581
594
fn generate_manifest_list_file_path ( & self , attempt : i64 ) -> String {
582
595
format ! (
583
596
"{}/{}/snap-{}-{}-{}.{}" ,
584
- self . tx. current_metadata . location( ) ,
597
+ self . tx. current_table . metadata ( ) . location( ) ,
585
598
META_ROOT_PATH ,
586
599
self . snapshot_id,
587
600
attempt,
@@ -599,28 +612,28 @@ impl<'a> SnapshotProduceAction<'a> {
599
612
let new_manifests = self
600
613
. manifest_file ( & snapshot_produce_operation, & process)
601
614
. await ?;
602
- let next_seq_num = self . tx . current_metadata . next_sequence_number ( ) ;
615
+ let next_seq_num = self . tx . current_table . metadata ( ) . next_sequence_number ( ) ;
603
616
604
617
let summary = self . summary ( & snapshot_produce_operation) ;
605
618
606
619
let manifest_list_path = self . generate_manifest_list_file_path ( 0 ) ;
607
620
608
- let mut manifest_list_writer = match self . tx . current_metadata . format_version ( ) {
621
+ let mut manifest_list_writer = match self . tx . current_table . metadata ( ) . format_version ( ) {
609
622
FormatVersion :: V1 => ManifestListWriter :: v1 (
610
623
self . tx
611
- . base_table
624
+ . current_table
612
625
. file_io ( )
613
626
. new_output ( manifest_list_path. clone ( ) ) ?,
614
627
self . snapshot_id ,
615
- self . tx . current_metadata . current_snapshot_id ( ) ,
628
+ self . tx . current_table . metadata ( ) . current_snapshot_id ( ) ,
616
629
) ,
617
630
FormatVersion :: V2 => ManifestListWriter :: v2 (
618
631
self . tx
619
- . base_table
632
+ . current_table
620
633
. file_io ( )
621
634
. new_output ( manifest_list_path. clone ( ) ) ?,
622
635
self . snapshot_id ,
623
- self . tx . current_metadata . current_snapshot_id ( ) ,
636
+ self . tx . current_table . metadata ( ) . current_snapshot_id ( ) ,
624
637
next_seq_num,
625
638
) ,
626
639
} ;
@@ -631,10 +644,10 @@ impl<'a> SnapshotProduceAction<'a> {
631
644
let new_snapshot = Snapshot :: builder ( )
632
645
. with_manifest_list ( manifest_list_path)
633
646
. with_snapshot_id ( self . snapshot_id )
634
- . with_parent_snapshot_id ( self . tx . current_metadata . current_snapshot_id ( ) )
647
+ . with_parent_snapshot_id ( self . tx . current_table . metadata ( ) . current_snapshot_id ( ) )
635
648
. with_sequence_number ( next_seq_num)
636
649
. with_summary ( summary)
637
- . with_schema_id ( self . tx . current_metadata . current_schema_id ( ) )
650
+ . with_schema_id ( self . tx . current_table . metadata ( ) . current_schema_id ( ) )
638
651
. with_timestamp_ms ( commit_ts)
639
652
. build ( ) ;
640
653
@@ -653,11 +666,11 @@ impl<'a> SnapshotProduceAction<'a> {
653
666
] ,
654
667
vec ! [
655
668
TableRequirement :: UuidMatch {
656
- uuid: self . tx. current_metadata . uuid( ) ,
669
+ uuid: self . tx. current_table . metadata ( ) . uuid( ) ,
657
670
} ,
658
671
TableRequirement :: RefSnapshotIdMatch {
659
672
r#ref: MAIN_BRANCH . to_string( ) ,
660
- snapshot_id: self . tx. current_metadata . current_snapshot_id( ) ,
673
+ snapshot_id: self . tx. current_table . metadata ( ) . current_snapshot_id( ) ,
661
674
} ,
662
675
] ,
663
676
) ?;
@@ -697,10 +710,20 @@ impl<'a> ReplaceSortOrderAction<'a> {
697
710
698
711
let requirements = vec ! [
699
712
TableRequirement :: CurrentSchemaIdMatch {
700
- current_schema_id: self . tx. current_metadata. current_schema( ) . schema_id( ) ,
713
+ current_schema_id: self
714
+ . tx
715
+ . current_table
716
+ . metadata( )
717
+ . current_schema( )
718
+ . schema_id( ) ,
701
719
} ,
702
720
TableRequirement :: DefaultSortOrderIdMatch {
703
- default_sort_order_id: self . tx. current_metadata. default_sort_order( ) . order_id,
721
+ default_sort_order_id: self
722
+ . tx
723
+ . current_table
724
+ . metadata( )
725
+ . default_sort_order( )
726
+ . order_id,
704
727
} ,
705
728
] ;
706
729
@@ -716,7 +739,8 @@ impl<'a> ReplaceSortOrderAction<'a> {
716
739
) -> Result < Self > {
717
740
let field_id = self
718
741
. tx
719
- . current_metadata
742
+ . current_table
743
+ . metadata ( )
720
744
. current_schema ( )
721
745
. field_id_by_name ( name)
722
746
. ok_or_else ( || {
0 commit comments