Skip to content

refactor(rust): Reorganize polars_io::csv module #15831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! DataFrame module.
#[cfg(feature = "zip_with")]
use std::borrow::Cow;
use std::{mem, ops};

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ mod tests {

#[test]
fn csv_to_local_objectstore_cloudwriter() {
use crate::csv::CsvWriter;
use crate::csv::write::CsvWriter;
use crate::prelude::SerWriter;

let mut df = example_dataframe();
Expand All @@ -144,7 +144,7 @@ mod tests {
#[cfg_attr(target_os = "windows", ignore)]
#[test]
fn cloudwriter_from_cloudlocation_test() {
use crate::csv::CsvWriter;
use crate::csv::write::CsvWriter;
use crate::prelude::SerWriter;

let mut df = example_dataframe();
Expand Down
75 changes: 3 additions & 72 deletions crates/polars-io/src/csv/mod.rs
Original file line number Diff line number Diff line change
@@ -1,73 +1,4 @@
//! # (De)serializing CSV files
//!
//! ## Maximal performance
//! Currently [CsvReader::new](CsvReader::new) has an extra copy. If you want optimal performance in CSV parsing/
//! reading, it is advised to use [CsvReader::from_path](CsvReader::from_path).
//!
//! ## Write a DataFrame to a csv file.
//!
//! ## Example
//!
//! ```
//! use polars_core::prelude::*;
//! use polars_io::prelude::*;
//! use std::fs::File;
//!
//! fn example(df: &mut DataFrame) -> PolarsResult<()> {
//! let mut file = File::create("example.csv").expect("could not create file");
//!
//! CsvWriter::new(&mut file)
//! .include_header(true)
//! .with_separator(b',')
//! .finish(df)
//! }
//! ```
//!
//! ## Read a csv file to a DataFrame
//!
//! ## Example
//!
//! ```
//! use polars_core::prelude::*;
//! use polars_io::prelude::*;
//! use std::fs::File;
//!
//! fn example() -> PolarsResult<DataFrame> {
//! // always prefer `from_path` as that is fastest.
//! CsvReader::from_path("iris_csv")?
//! .has_header(true)
//! .finish()
//! }
//! ```
//!
pub(crate) mod buffer;
pub(crate) mod parser;
pub mod read_impl;
//! Functionality for reading and writing CSV files.

mod read;
pub(super) mod splitfields;
pub mod utils;
mod write;
pub(super) mod write_impl;

use std::fs::File;
use std::io::Write;
use std::path::PathBuf;

pub use parser::{count_rows, CsvParserOptions};
use polars_core::prelude::*;
#[cfg(feature = "temporal")]
use polars_time::prelude::*;
#[cfg(feature = "temporal")]
use rayon::prelude::*;
pub use read::{CommentPrefix, CsvEncoding, CsvReader, NullValues};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
pub use write::{BatchedWriter, CsvWriter, CsvWriterOptions, QuoteStyle};
pub use write_impl::SerializeOptions;

use crate::csv::read_impl::CoreReader;
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::utils::{get_reader_bytes, resolve_homedir};
use crate::{RowIndex, SerReader, SerWriter};
pub mod read;
pub mod write;
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use polars_time::prelude::string::infer::{
};
use polars_utils::vec::PushUnchecked;

use crate::csv::parser::{is_whitespace, skip_whitespace};
use crate::csv::utils::escape_field;
use crate::csv::CsvEncoding;
use super::options::CsvEncoding;
use super::parser::{is_whitespace, skip_whitespace};
use super::utils::escape_field;

pub(crate) trait PrimitiveParser: PolarsNumericType {
fn parse(bytes: &[u8]) -> Option<Self::Native>;
Expand Down
34 changes: 34 additions & 0 deletions crates/polars-io/src/csv/read/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Functionality for reading CSV files.
//!
//! Note: currently, `CsvReader::new` has an extra copy. If you want optimal performance,
//! it is advised to use [`CsvReader::from_path`] instead.
//!
//! # Examples
//!
//! ```
//! use polars_core::prelude::*;
//! use polars_io::prelude::*;
//! use std::fs::File;
//!
//! fn example() -> PolarsResult<DataFrame> {
//! // Prefer `from_path` over `new` as it is faster.
//! CsvReader::from_path("example.csv")?
//! .has_header(true)
//! .finish()
//! }
//! ```

mod buffer;
mod options;
mod parser;
mod read_impl;
mod reader;
mod splitfields;
mod utils;

pub use options::{CommentPrefix, CsvEncoding, CsvParserOptions, NullValues};
pub use parser::count_rows;
pub use read_impl::batched_mmap::{BatchedCsvReaderMmap, OwnedBatchedCsvReaderMmap};
pub use read_impl::batched_read::{BatchedCsvReaderRead, OwnedBatchedCsvReader};
pub use reader::CsvReader;
pub use utils::{infer_file_schema, is_compressed};
156 changes: 156 additions & 0 deletions crates/polars-io/src/csv/read/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
use polars_core::schema::{IndexOfSchema, Schema, SchemaRef};
use polars_error::PolarsResult;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CsvParserOptions {
pub has_header: bool,
pub separator: u8,
pub comment_prefix: Option<CommentPrefix>,
pub quote_char: Option<u8>,
pub eol_char: u8,
pub encoding: CsvEncoding,
pub skip_rows: usize,
pub skip_rows_after_header: usize,
pub schema: Option<SchemaRef>,
pub schema_overwrite: Option<SchemaRef>,
pub infer_schema_length: Option<usize>,
pub try_parse_dates: bool,
pub null_values: Option<NullValues>,
pub ignore_errors: bool,
pub raise_if_empty: bool,
pub truncate_ragged_lines: bool,
pub decimal_comma: bool,
pub n_threads: Option<usize>,
pub low_memory: bool,
}

impl Default for CsvParserOptions {
fn default() -> Self {
Self {
has_header: true,
separator: b',',
comment_prefix: None,
quote_char: Some(b'"'),
eol_char: b'\n',
encoding: CsvEncoding::default(),
skip_rows: 0,
skip_rows_after_header: 0,
schema: None,
schema_overwrite: None,
infer_schema_length: Some(100),
try_parse_dates: false,
null_values: None,
ignore_errors: false,
raise_if_empty: true,
truncate_ragged_lines: false,
decimal_comma: false,
n_threads: None,
low_memory: false,
}
}
}

#[derive(Copy, Clone, Debug, Default, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CsvEncoding {
/// Utf8 encoding.
#[default]
Utf8,
/// Utf8 encoding and unknown bytes are replaced with �.
LossyUtf8,
}

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CommentPrefix {
/// A single byte character that indicates the start of a comment line.
Single(u8),
/// A string that indicates the start of a comment line.
/// This allows for multiple characters to be used as a comment identifier.
Multi(String),
}

impl CommentPrefix {
/// Creates a new `CommentPrefix` for the `Single` variant.
pub fn new_single(c: u8) -> Self {
CommentPrefix::Single(c)
}

/// Creates a new `CommentPrefix`. If `Multi` variant is used and the string is longer
/// than 5 characters, it will return `None`.
pub fn new_multi(s: String) -> Option<Self> {
if s.len() <= 5 {
Some(CommentPrefix::Multi(s))
} else {
None
}
}
}

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum NullValues {
/// A single value that's used for all columns
AllColumnsSingle(String),
/// Multiple values that are used for all columns
AllColumns(Vec<String>),
/// Tuples that map column names to null value of that column
Named(Vec<(String, String)>),
}

impl NullValues {
pub(super) fn compile(self, schema: &Schema) -> PolarsResult<NullValuesCompiled> {
Ok(match self {
NullValues::AllColumnsSingle(v) => NullValuesCompiled::AllColumnsSingle(v),
NullValues::AllColumns(v) => NullValuesCompiled::AllColumns(v),
NullValues::Named(v) => {
let mut null_values = vec!["".to_string(); schema.len()];
for (name, null_value) in v {
let i = schema.try_index_of(&name)?;
null_values[i] = null_value;
}
NullValuesCompiled::Columns(null_values)
},
})
}
}

pub(super) enum NullValuesCompiled {
/// A single value that's used for all columns
AllColumnsSingle(String),
// Multiple null values that are null for all columns
AllColumns(Vec<String>),
/// A different null value per column, computed from `NullValues::Named`
Columns(Vec<String>),
}

impl NullValuesCompiled {
pub(super) fn apply_projection(&mut self, projections: &[usize]) {
if let Self::Columns(nv) = self {
let nv = projections
.iter()
.map(|i| std::mem::take(&mut nv[*i]))
.collect::<Vec<_>>();

*self = NullValuesCompiled::Columns(nv);
}
}

/// # Safety
///
/// The caller must ensure that `index` is in bounds
pub(super) unsafe fn is_null(&self, field: &[u8], index: usize) -> bool {
use NullValuesCompiled::*;
match self {
AllColumnsSingle(v) => v.as_bytes() == field,
AllColumns(v) => v.iter().any(|v| v.as_bytes() == field),
Columns(v) => {
debug_assert!(index < v.len());
v.get_unchecked(index).as_bytes() == field
},
}
}
}
Loading
Loading