@@ -25,11 +25,11 @@ use datafusion::{
25
25
listing:: { ListingOptions , ListingTable , ListingTableConfig , ListingTableUrl } ,
26
26
} ,
27
27
error:: DataFusionError ,
28
- logical_expr:: { col, SortExpr } ,
28
+ logical_expr:: col,
29
29
} ;
30
- use futures_util:: { future , stream:: FuturesUnordered , Future , TryStreamExt } ;
30
+ use futures_util:: { stream:: FuturesUnordered , Future , TryStreamExt } ;
31
31
use itertools:: Itertools ;
32
- use object_store:: { ObjectMeta , ObjectStore } ;
32
+ use object_store:: { path :: Path , ObjectMeta , ObjectStore } ;
33
33
34
34
use crate :: {
35
35
event:: DEFAULT_TIMESTAMP_KEY ,
@@ -60,25 +60,25 @@ impl ListingTableBuilder {
60
60
client : Arc < dyn ObjectStore > ,
61
61
time_filters : & [ PartialTimeFilter ] ,
62
62
) -> Result < Self , DataFusionError > {
63
+ // Extract the minimum start time from the time filters.
63
64
let start_time = time_filters
64
65
. iter ( )
65
- . filter_map ( |x | match x {
66
- PartialTimeFilter :: Low ( Bound :: Excluded ( x) ) => Some ( x ) ,
67
- PartialTimeFilter :: Low ( Bound :: Included ( x) ) => Some ( x) ,
66
+ . filter_map ( |filter | match filter {
67
+ PartialTimeFilter :: Low ( Bound :: Excluded ( x) )
68
+ | PartialTimeFilter :: Low ( Bound :: Included ( x) ) => Some ( x) ,
68
69
_ => None ,
69
70
} )
70
- . min ( )
71
- . cloned ( ) ;
71
+ . min ( ) ;
72
72
73
+ // Extract the maximum end time from the time filters.
73
74
let end_time = time_filters
74
75
. iter ( )
75
- . filter_map ( |x | match x {
76
- PartialTimeFilter :: High ( Bound :: Excluded ( x) ) => Some ( x ) ,
77
- PartialTimeFilter :: High ( Bound :: Included ( x) ) => Some ( x) ,
76
+ . filter_map ( |filter | match filter {
77
+ PartialTimeFilter :: High ( Bound :: Excluded ( x) )
78
+ | PartialTimeFilter :: High ( Bound :: Included ( x) ) => Some ( x) ,
78
79
_ => None ,
79
80
} )
80
- . max ( )
81
- . cloned ( ) ;
81
+ . max ( ) ;
82
82
83
83
let Some ( ( start_time, end_time) ) = start_time. zip ( end_time) else {
84
84
return Err ( DataFusionError :: NotImplemented (
@@ -87,62 +87,49 @@ impl ListingTableBuilder {
87
87
) ) ;
88
88
} ;
89
89
90
+ // Generate prefixes for the given time range
90
91
let prefixes = TimePeriod :: new (
91
92
start_time. and_utc ( ) ,
92
93
end_time. and_utc ( ) ,
93
94
OBJECT_STORE_DATA_GRANULARITY ,
94
95
)
95
96
. generate_prefixes ( ) ;
96
97
97
- let prefixes = prefixes
98
- . into_iter ( )
99
- . map ( |entry| {
100
- let path =
101
- relative_path:: RelativePathBuf :: from ( format ! ( "{}/{}" , & self . stream, entry) ) ;
102
- storage. absolute_url ( path. as_relative_path ( ) ) . to_string ( )
103
- } )
104
- . collect_vec ( ) ;
105
-
106
- let mut minute_resolve: HashMap < String , Vec < String > > = HashMap :: new ( ) ;
98
+ // Categorizes prefixes into "minute" and general resolve lists.
99
+ let mut minute_resolve = HashMap :: < String , Vec < String > > :: new ( ) ;
107
100
let mut all_resolve = Vec :: new ( ) ;
108
-
109
101
for prefix in prefixes {
110
- let components = prefix. split_terminator ( '/' ) ;
111
- if components. last ( ) . is_some_and ( |x| x. starts_with ( "minute" ) ) {
112
- let hour_prefix = & prefix[ 0 ..prefix. rfind ( "minute" ) . expect ( "minute exists" ) ] ;
102
+ let path = relative_path:: RelativePathBuf :: from ( format ! ( "{}/{}" , & self . stream, prefix) ) ;
103
+ storage. absolute_url ( path. as_relative_path ( ) ) . to_string ( ) ;
104
+ if let Some ( pos) = prefix. rfind ( "minute" ) {
105
+ let hour_prefix = & prefix[ ..pos] ;
113
106
minute_resolve
114
107
. entry ( hour_prefix. to_owned ( ) )
115
- . and_modify ( |list| list . push ( prefix ) )
116
- . or_default ( ) ;
108
+ . or_default ( )
109
+ . push ( prefix ) ;
117
110
} else {
118
- all_resolve. push ( prefix)
111
+ all_resolve. push ( prefix) ;
119
112
}
120
113
}
121
114
122
- type ResolveFuture = Pin <
123
- Box < dyn Future < Output = Result < Vec < ObjectMeta > , object_store:: Error > > + Send + ' static > ,
124
- > ;
125
- // Pin<Box<dyn Future<Output = Result<BoxStream<'_, Result<ObjectMeta>>>> + Send + 'async_trait>>
126
- // BoxStream<'_, Result<ObjectMeta>>
115
+ /// Resolve all prefixes asynchronously and collect the object metadata.
116
+ type ResolveFuture =
117
+ Pin < Box < dyn Future < Output = Result < Vec < ObjectMeta > , object_store:: Error > > + Send > > ;
127
118
let tasks: FuturesUnordered < ResolveFuture > = FuturesUnordered :: new ( ) ;
128
-
129
- for ( listing_prefix, prefix) in minute_resolve {
119
+ for ( listing_prefix, prefixes) in minute_resolve {
130
120
let client = Arc :: clone ( & client) ;
131
121
tasks. push ( Box :: pin ( async move {
132
- let mut list = client
133
- . list ( Some ( & object_store:: path:: Path :: from ( listing_prefix) ) )
134
- . try_collect :: < Vec < _ > > ( )
135
- . await ?;
122
+ let path = Path :: from ( listing_prefix) ;
123
+ let mut objects = client. list ( Some ( & path) ) . try_collect :: < Vec < _ > > ( ) . await ?;
136
124
137
- list. retain ( |object| {
138
- prefix. iter ( ) . any ( |prefix| {
139
- object
140
- . location
125
+ objects. retain ( |obj| {
126
+ prefixes. iter ( ) . any ( |prefix| {
127
+ obj. location
141
128
. prefix_matches ( & object_store:: path:: Path :: from ( prefix. as_ref ( ) ) )
142
129
} )
143
130
} ) ;
144
131
145
- Ok ( list )
132
+ Ok ( objects )
146
133
} ) ) ;
147
134
}
148
135
@@ -157,25 +144,23 @@ impl ListingTableBuilder {
157
144
} ) ) ;
158
145
}
159
146
160
- let res: Vec < Vec < String > > = tasks
161
- . and_then ( |res| {
162
- future:: ok (
163
- res. into_iter ( )
164
- . map ( |res| res. location . to_string ( ) )
165
- . collect_vec ( ) ,
166
- )
167
- } )
168
- . try_collect ( )
147
+ let listing = tasks
148
+ . try_collect :: < Vec < Vec < ObjectMeta > > > ( )
169
149
. await
170
- . map_err ( |err| DataFusionError :: External ( Box :: new ( err) ) ) ?;
171
-
172
- let mut res = res. into_iter ( ) . flatten ( ) . collect_vec ( ) ;
173
- res. sort ( ) ;
174
- res. reverse ( ) ;
150
+ . map_err ( |err| DataFusionError :: External ( Box :: new ( err) ) ) ?
151
+ . into_iter ( )
152
+ . flat_map ( |res| {
153
+ res. into_iter ( )
154
+ . map ( |obj| obj. location . to_string ( ) )
155
+ . collect :: < Vec < String > > ( )
156
+ } )
157
+ . sorted ( )
158
+ . rev ( )
159
+ . collect_vec ( ) ;
175
160
176
161
Ok ( Self {
177
162
stream : self . stream ,
178
- listing : res ,
163
+ listing,
179
164
} )
180
165
}
181
166
@@ -188,25 +173,21 @@ impl ListingTableBuilder {
188
173
if self . listing . is_empty ( ) {
189
174
return Ok ( None ) ;
190
175
}
191
- let file_sort_order: Vec < Vec < SortExpr > > ;
192
- let file_format = ParquetFormat :: default ( ) . with_enable_pruning ( true ) ;
193
- if let Some ( time_partition) = time_partition {
194
- file_sort_order = vec ! [ vec![ col( time_partition) . sort( true , false ) ] ] ;
195
- } else {
196
- file_sort_order = vec ! [ vec![ col( DEFAULT_TIMESTAMP_KEY ) . sort( true , false ) ] ] ;
197
- }
198
176
177
+ let file_sort_order = vec ! [ vec![ time_partition
178
+ . map_or_else( || col( DEFAULT_TIMESTAMP_KEY ) , col)
179
+ . sort( true , false ) ] ] ;
180
+ let file_format = ParquetFormat :: default ( ) . with_enable_pruning ( true ) ;
199
181
let listing_options = ListingOptions :: new ( Arc :: new ( file_format) )
200
182
. with_file_extension ( ".parquet" )
201
183
. with_file_sort_order ( file_sort_order)
202
184
. with_collect_stat ( true )
203
185
. with_target_partitions ( 1 ) ;
204
-
205
186
let config = ListingTableConfig :: new_with_multi_paths ( map ( self . listing ) )
206
187
. with_listing_options ( listing_options)
207
188
. with_schema ( schema) ;
189
+ let listing_table = ListingTable :: try_new ( config) ?;
208
190
209
- let listing_table = Arc :: new ( ListingTable :: try_new ( config) ?) ;
210
- Ok ( Some ( listing_table) )
191
+ Ok ( Some ( Arc :: new ( listing_table) ) )
211
192
}
212
193
}
0 commit comments