Skip to content

Commit

Permalink
Add schema and file_sort_order to read/register_parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
judahrand committed Sep 10, 2023
1 parent 6f4e94c commit d48c610
Showing 1 changed file with 28 additions and 3 deletions.
31 changes: 28 additions & 3 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::catalog::{PyCatalog, PyTable};
use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
use crate::errors::{py_datafusion_err, DataFusionError};
use crate::expr::PyExpr;
use crate::physical_plan::PyExecutionPlan;
use crate::record_batch::PyRecordBatchStream;
use crate::sql::logical::PyLogicalPlan;
Expand Down Expand Up @@ -445,20 +446,34 @@ impl PySessionContext {
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (name, path, table_partition_cols=vec![],
parquet_pruning=true,
file_extension=".parquet"))]
file_extension=".parquet",
skip_metadata=true,
schema=None,
file_sort_order=None))]
fn register_parquet(
&mut self,
name: &str,
path: &str,
table_partition_cols: Vec<(String, String)>,
parquet_pruning: bool,
file_extension: &str,
skip_metadata: bool,
schema: Option<PyArrowType<Schema>>,
file_sort_order: Option<Vec<Vec<PyExpr>>>,
py: Python,
) -> PyResult<()> {
let mut options = ParquetReadOptions::default()
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
.parquet_pruning(parquet_pruning);
.parquet_pruning(parquet_pruning)
.skip_metadata(skip_metadata);
options.file_extension = file_extension;
options.schema = schema.as_ref().map(|x| &x.0);
options.file_sort_order = file_sort_order
.unwrap_or(vec![])
.into_iter()
.map(|e| e.into_iter().map(|f| f.into()).collect())
.collect();

let result = self.ctx.register_parquet(name, path, options);
wait_for_future(py, result).map_err(DataFusionError::from)?;
Ok(())
Expand Down Expand Up @@ -722,21 +737,31 @@ impl PySessionContext {
table_partition_cols=vec![],
parquet_pruning=true,
file_extension=".parquet",
skip_metadata=true))]
skip_metadata=true,
schema=None,
file_sort_order=None))]
fn read_parquet(
&self,
path: &str,
table_partition_cols: Vec<(String, String)>,
parquet_pruning: bool,
file_extension: &str,
skip_metadata: bool,
schema: Option<PyArrowType<Schema>>,
file_sort_order: Option<Vec<Vec<PyExpr>>>,
py: Python,
) -> PyResult<PyDataFrame> {
let mut options = ParquetReadOptions::default()
.table_partition_cols(convert_table_partition_cols(table_partition_cols)?)
.parquet_pruning(parquet_pruning)
.skip_metadata(skip_metadata);
options.file_extension = file_extension;
options.schema = schema.as_ref().map(|x| &x.0);
options.file_sort_order = file_sort_order
.unwrap_or(vec![])
.into_iter()
.map(|e| e.into_iter().map(|f| f.into()).collect())
.collect();

let result = self.ctx.read_parquet(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
Expand Down

0 comments on commit d48c610

Please sign in to comment.