1
- use std:: path:: Path ;
1
+ use std:: fs:: File ;
2
+ use std:: path:: { Path , PathBuf } ;
2
3
use std:: sync:: { Arc , LazyLock } ;
4
+ use std:: time:: Duration ;
3
5
4
6
use arrow_schema:: { DataType , Field , Schema , TimeUnit } ;
7
+ use clap:: ValueEnum ;
5
8
use datafusion:: datasource:: file_format:: parquet:: ParquetFormat ;
6
9
use datafusion:: datasource:: listing:: {
7
10
ListingOptions , ListingTable , ListingTableConfig , ListingTableUrl ,
8
11
} ;
9
12
use datafusion:: prelude:: { ParquetReadOptions , SessionContext } ;
10
13
use futures:: { StreamExt , TryStreamExt , stream} ;
14
+ use rayon:: iter:: { IntoParallelIterator , ParallelIterator } ;
15
+ use reqwest:: IntoUrl ;
16
+ use reqwest:: blocking:: Response ;
11
17
use tokio:: fs:: { OpenOptions , create_dir_all} ;
12
- use tracing:: info;
18
+ use tracing:: { info, warn } ;
13
19
use vortex:: TryIntoArray ;
14
20
use vortex:: dtype:: DType ;
15
21
use vortex:: dtype:: arrow:: FromArrowType ;
@@ -19,7 +25,7 @@ use vortex::layout::{LayoutRegistry, LayoutRegistryExt};
19
25
use vortex:: stream:: ArrayStreamAdapter ;
20
26
use vortex_datafusion:: persistent:: VortexFormat ;
21
27
22
- use crate :: idempotent_async;
28
+ use crate :: { idempotent , idempotent_async} ;
23
29
24
30
pub static HITS_SCHEMA : LazyLock < Schema > = LazyLock :: new ( || {
25
31
use DataType :: * ;
@@ -147,20 +153,36 @@ pub async fn register_vortex_files(
147
153
schema : & Schema ,
148
154
) -> anyhow:: Result < ( ) > {
149
155
let vortex_dir = input_path. join ( "vortex" ) ;
156
+ let parquet_path = input_path. join ( "parquet" ) ;
150
157
create_dir_all ( & vortex_dir) . await ?;
151
158
152
- stream:: iter ( 0 ..100 )
153
- . map ( |idx| {
154
- let parquet_file_path = input_path
155
- . join ( "parquet" )
156
- . join ( format ! ( "hits_{idx}.parquet" ) ) ;
157
- let output_path = vortex_dir. join ( format ! ( "hits_{idx}.{VORTEX_FILE_EXTENSION}" ) ) ;
159
+ let parquet_inputs =
160
+ std:: fs:: read_dir ( parquet_path. clone ( ) ) ?. collect :: < std:: io:: Result < Vec < _ > > > ( ) ?;
161
+ info ! (
162
+ "Found {} parquet files in {}" ,
163
+ parquet_inputs. len( ) ,
164
+ parquet_path. to_str( ) . unwrap( )
165
+ ) ;
166
+
167
+ let iter = parquet_inputs
168
+ . iter ( )
169
+ . filter ( |entry| entry. path ( ) . extension ( ) . is_some_and ( |e| e == "parquet" ) ) ;
170
+
171
+ stream:: iter ( iter)
172
+ . map ( |dir_entry| {
173
+ let filename = {
174
+ let mut temp = dir_entry. path ( ) ;
175
+ temp. set_extension ( "" ) ;
176
+ temp. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) . to_string ( )
177
+ } ;
178
+ let parquet_file_path = parquet_path. join ( format ! ( "{filename}.parquet" ) ) ;
179
+ let output_path = vortex_dir. join ( format ! ( "{filename}.{VORTEX_FILE_EXTENSION}" ) ) ;
158
180
let session = session. clone ( ) ;
159
181
160
182
tokio:: spawn ( async move {
161
183
let output_path = output_path. clone ( ) ;
162
184
idempotent_async ( & output_path, move |vtx_file| async move {
163
- info ! ( "Processing file {idx} " ) ;
185
+ info ! ( "Processing file '{filename}' " ) ;
164
186
let record_batches = session
165
187
. read_parquet (
166
188
parquet_file_path. to_str ( ) . unwrap ( ) ,
@@ -259,3 +281,88 @@ pub fn clickbench_queries() -> Vec<(usize, String)> {
259
281
. enumerate ( )
260
282
. collect ( )
261
283
}
284
+
285
+ #[ derive( ValueEnum , Default , Clone , Copy , Debug , Hash , PartialEq , Eq ) ]
286
+ pub enum Flavor {
287
+ #[ default]
288
+ Partitioned ,
289
+ Single ,
290
+ }
291
+
292
+ impl std:: fmt:: Display for Flavor {
293
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
294
+ write ! ( f, "{}" , self . to_possible_value( ) . unwrap( ) . get_name( ) )
295
+ }
296
+ }
297
+
298
+ impl Flavor {
299
+ pub fn download (
300
+ & self ,
301
+ client : & reqwest:: blocking:: Client ,
302
+ basepath : impl AsRef < Path > ,
303
+ ) -> anyhow:: Result < ( ) > {
304
+ let basepath = basepath. as_ref ( ) ;
305
+ match self {
306
+ Flavor :: Single => {
307
+ let output_path = basepath. join ( "parquet" ) . join ( "hits.parquet" ) ;
308
+ idempotent ( & output_path, |output_path| {
309
+ info ! ( "Downloading single clickbench file" ) ;
310
+ let url = "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_single/hits.parquet" ;
311
+ let mut response = retry_get ( client, url) ?;
312
+ let mut file = File :: create ( output_path) ?;
313
+ response. copy_to ( & mut file) ?;
314
+
315
+ anyhow:: Ok ( ( ) )
316
+ } ) ?;
317
+
318
+ Ok ( ( ) )
319
+ }
320
+ Flavor :: Partitioned => {
321
+ // The clickbench-provided file is missing some higher-level type info, so we reprocess it
322
+ // to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
323
+ ( 0_u32 ..100 ) . into_par_iter ( ) . for_each ( |idx| {
324
+ let output_path = basepath. join ( "parquet" ) . join ( format ! ( "hits_{idx}.parquet" ) ) ;
325
+ idempotent ( & output_path, |output_path| {
326
+ info ! ( "Downloading file {idx}" ) ;
327
+ let url = format ! ( "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet" ) ;
328
+ let mut response = retry_get ( client, url) ?;
329
+ let mut file = File :: create ( output_path) ?;
330
+ response. copy_to ( & mut file) ?;
331
+
332
+ anyhow:: Ok ( PathBuf :: from ( output_path) )
333
+ } )
334
+ . unwrap ( ) ;
335
+ } ) ;
336
+
337
+ Ok ( ( ) )
338
+ }
339
+ }
340
+ }
341
+ }
342
+
343
+ fn retry_get ( client : & reqwest:: blocking:: Client , url : impl IntoUrl ) -> anyhow:: Result < Response > {
344
+ let url = url. as_str ( ) ;
345
+ let make_req = || client. get ( url) . send ( ) ;
346
+
347
+ let mut output = None ;
348
+
349
+ for attempt in 1 ..4 {
350
+ match make_req ( ) . and_then ( |r| r. error_for_status ( ) ) {
351
+ Ok ( r) => {
352
+ output = Some ( r) ;
353
+ break ;
354
+ }
355
+ Err ( e) => {
356
+ warn ! ( "Request errored with {e}, retying for the {attempt} time" ) ;
357
+ }
358
+ }
359
+
360
+ // Very basic backoff mechanism
361
+ std:: thread:: sleep ( Duration :: from_secs ( attempt) ) ;
362
+ }
363
+
364
+ match output {
365
+ Some ( v) => Ok ( v) ,
366
+ None => anyhow:: bail!( "Exahusted retry attempts for {url}" ) ,
367
+ }
368
+ }
0 commit comments