Skip to content

Commit

Permalink
Allow for multiple input files per table instead of a single file (#519)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdye64 authored Oct 21, 2023
1 parent 59140f2 commit 501acff
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
10 changes: 5 additions & 5 deletions datafusion/input/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import os
import glob
from typing import Any

from datafusion.common import DataTypeMap, SqlTable
Expand All @@ -41,14 +42,12 @@ def build_table(
format = extension.lstrip(".").lower()
num_rows = 0 # Total number of rows in the file. Used for statistics
columns = []

if format == "parquet":
import pyarrow.parquet as pq

# Read the Parquet metadata
metadata = pq.read_metadata(input_file)
num_rows = metadata.num_rows

# Iterate through the schema and build the SqlTable
for col in metadata.schema:
columns.append(
Expand All @@ -57,7 +56,6 @@ def build_table(
DataTypeMap.from_parquet_type_str(col.physical_type),
)
)

elif format == "csv":
import csv

Expand All @@ -73,7 +71,6 @@ def build_table(
print(header_row)
for _ in reader:
num_rows += 1

# TODO: Need to actually consume this row into resonable columns
raise RuntimeError(
"TODO: Currently unable to support CSV input files."
Expand All @@ -84,4 +81,7 @@ def build_table(
Only Parquet and CSV."
)

return SqlTable(table_name, columns, num_rows, input_file)
# Input could possibly be multiple files. Create a list if so
input_files = glob.glob(input_file)

return SqlTable(table_name, columns, num_rows, input_files)
2 changes: 1 addition & 1 deletion datafusion/tests/test_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ def test_location_input():
tbl = location_input.build_table(input_file, table_name)
assert "blog" == tbl.name
assert 3 == len(tbl.columns)
assert "blogs.parquet" in tbl.filepath
assert "blogs.parquet" in tbl.filepaths[0]
16 changes: 8 additions & 8 deletions src/common/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct SqlTable {
#[pyo3(get, set)]
pub statistics: SqlStatistics,
#[pyo3(get, set)]
pub filepath: Option<String>,
pub filepaths: Option<Vec<String>>,
}

#[pymethods]
Expand All @@ -66,7 +66,7 @@ impl SqlTable {
table_name: String,
columns: Vec<(String, DataTypeMap)>,
row_count: f64,
filepath: Option<String>,
filepaths: Option<Vec<String>>,
) -> Self {
Self {
name: table_name,
Expand All @@ -76,7 +76,7 @@ impl SqlTable {
indexes: Vec::new(),
constraints: Vec::new(),
statistics: SqlStatistics::new(row_count),
filepath,
filepaths,
}
}
}
Expand Down Expand Up @@ -124,20 +124,20 @@ impl SqlSchema {
pub struct SqlTableSource {
schema: SchemaRef,
statistics: Option<SqlStatistics>,
filepath: Option<String>,
filepaths: Option<Vec<String>>,
}

impl SqlTableSource {
/// Initialize a new `EmptyTable` from a schema
pub fn new(
schema: SchemaRef,
statistics: Option<SqlStatistics>,
filepath: Option<String>,
filepaths: Option<Vec<String>>,
) -> Self {
Self {
schema,
statistics,
filepath,
filepaths,
}
}

Expand All @@ -148,8 +148,8 @@ impl SqlTableSource {

/// Access optional filepath associated with this table source
#[allow(dead_code)]
pub fn filepath(&self) -> Option<&String> {
self.filepath.as_ref()
pub fn filepaths(&self) -> Option<&Vec<String>> {
self.filepaths.as_ref()
}
}

Expand Down

0 comments on commit 501acff

Please sign in to comment.