Skip to content

Commit 1b4c0a4

Browse files
authored
fix: repartitioned reads of CSV with custom line terminator (#13677)
1 parent 8404cd0 commit 1b4c0a4

File tree

4 files changed

+38
-15
lines changed

4 files changed

+38
-15
lines changed

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -612,11 +612,13 @@ impl FileOpener for CsvOpener {
612612
}
613613

614614
let store = Arc::clone(&self.config.object_store);
615+
let terminator = self.config.terminator;
615616

616617
Ok(Box::pin(async move {
617618
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
618619

619-
let calculated_range = calculate_range(&file_meta, &store).await?;
620+
let calculated_range =
621+
calculate_range(&file_meta, &store, terminator).await?;
620622

621623
let range = match calculated_range {
622624
RangeCalculation::Range(None) => None,

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ impl FileOpener for JsonOpener {
273273
let file_compression_type = self.file_compression_type.to_owned();
274274

275275
Ok(Box::pin(async move {
276-
let calculated_range = calculate_range(&file_meta, &store).await?;
276+
let calculated_range = calculate_range(&file_meta, &store, None).await?;
277277

278278
let range = match calculated_range {
279279
RangeCalculation::Range(None) => None,

datafusion/core/src/datasource/physical_plan/mod.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,23 +426,25 @@ enum RangeCalculation {
426426
async fn calculate_range(
427427
file_meta: &FileMeta,
428428
store: &Arc<dyn ObjectStore>,
429+
terminator: Option<u8>,
429430
) -> Result<RangeCalculation> {
430431
let location = file_meta.location();
431432
let file_size = file_meta.object_meta.size;
433+
let newline = terminator.unwrap_or(b'\n');
432434

433435
match file_meta.range {
434436
None => Ok(RangeCalculation::Range(None)),
435437
Some(FileRange { start, end }) => {
436438
let (start, end) = (start as usize, end as usize);
437439

438440
let start_delta = if start != 0 {
439-
find_first_newline(store, location, start - 1, file_size).await?
441+
find_first_newline(store, location, start - 1, file_size, newline).await?
440442
} else {
441443
0
442444
};
443445

444446
let end_delta = if end != file_size {
445-
find_first_newline(store, location, end - 1, file_size).await?
447+
find_first_newline(store, location, end - 1, file_size, newline).await?
446448
} else {
447449
0
448450
};
@@ -462,7 +464,7 @@ async fn calculate_range(
462464
/// within an object, such as a file, in an object store.
463465
///
464466
/// This function scans the contents of the object starting from the specified `start` position
465-
/// up to the `end` position, looking for the first occurrence of a newline (`'\n'`) character.
467+
/// up to the `end` position, looking for the first occurrence of a newline character.
466468
/// It returns the position of the first newline relative to the start of the range.
467469
///
468470
/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
@@ -474,6 +476,7 @@ async fn find_first_newline(
474476
location: &Path,
475477
start: usize,
476478
end: usize,
479+
newline: u8,
477480
) -> Result<usize> {
478481
let options = GetOptions {
479482
range: Some(GetRange::Bounded(start..end)),
@@ -486,7 +489,7 @@ async fn find_first_newline(
486489
let mut index = 0;
487490

488491
while let Some(chunk) = result_stream.next().await.transpose()? {
489-
if let Some(position) = chunk.iter().position(|&byte| byte == b'\n') {
492+
if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
490493
return Ok(index + position);
491494
}
492495

datafusion/sqllogictest/test_files/csv_files.slt

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -350,15 +350,33 @@ col2 TEXT
350350
LOCATION '../core/tests/data/cr_terminator.csv'
351351
OPTIONS ('format.terminator' E'\r', 'format.has_header' 'true');
352352

353-
# TODO: It should be passed but got the error: External error: query failed: DataFusion error: Object Store error: Generic LocalFileSystem error: Requested range was invalid
354-
# See the issue: https://github.com/apache/datafusion/issues/12328
355-
# query TT
356-
# select * from stored_table_with_cr_terminator;
357-
# ----
358-
# id0 value0
359-
# id1 value1
360-
# id2 value2
361-
# id3 value3
353+
# Check single-thread reading of CSV with custom line terminator
354+
statement ok
355+
SET datafusion.optimizer.repartition_file_min_size = 10485760;
356+
357+
query TT
358+
select * from stored_table_with_cr_terminator;
359+
----
360+
id0 value0
361+
id1 value1
362+
id2 value2
363+
id3 value3
364+
365+
# Check repartitioned reading of CSV with custom line terminator
366+
statement ok
367+
SET datafusion.optimizer.repartition_file_min_size = 1;
368+
369+
query TT
370+
select * from stored_table_with_cr_terminator order by col1;
371+
----
372+
id0 value0
373+
id1 value1
374+
id2 value2
375+
id3 value3
376+
377+
# Reset repartition_file_min_size to default value
378+
statement ok
379+
SET datafusion.optimizer.repartition_file_min_size = 10485760;
362380

363381
statement ok
364382
drop table stored_table_with_cr_terminator;

0 commit comments

Comments
 (0)