From 3a7efc0156b87f821727e1c7182e76775cbe74a7 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 19 Feb 2025 10:28:56 +0800 Subject: [PATCH 1/9] feat: add fill_null methods to DataFrame for handling null values --- datafusion/core/src/dataframe/mod.rs | 72 +++++++++++++++++++++++++- datafusion/core/tests/dataframe/mod.rs | 41 +++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 6f540fa02c75..cb29a9684354 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -51,13 +51,15 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::config::{CsvOptions, JsonOptions}; use datafusion_common::{ exec_err, not_impl_err, plan_err, Column, DFSchema, DataFusionError, ParamValues, - SchemaError, UnnestOptions, + ScalarValue, SchemaError, UnnestOptions, }; use datafusion_expr::dml::InsertOp; +use datafusion_expr::expr::{Alias, ScalarFunction}; use datafusion_expr::{case, is_null, lit, SortExpr}; use datafusion_expr::{ utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE, }; +use datafusion_functions::core::coalesce; use datafusion_functions_aggregate::expr_fn::{ avg, count, max, median, min, stddev, sum, }; @@ -1926,6 +1928,74 @@ impl DataFrame { plan, }) } + + /// Fill null values in specified columns with a given value + /// If no columns are specified, applies to all columns + /// Only fills if the value can be cast to the column's type + /// + /// # Arguments + /// * `value` - Value to fill nulls with + /// * `columns` - Optional list of column names to fill. If None, fills all columns + pub fn fill_null( + &self, + value: ScalarValue, + columns: Option>, + ) -> Result { + let cols = match columns { + Some(names) => self.find_columns(&names)?, + None => self + .logical_plan() + .schema() + .fields() + .iter() + .map(|f| f.as_ref().clone()) + .collect(), + }; + + // Create projections for each column + let projections = self + .logical_plan() + .schema() + .fields() + .iter() + .map(|field| { + if cols.contains(field) { + // Try to cast fill value to column type. If the cast fails, fallback to the original column. + match value.clone().cast_to(field.data_type()) { + Ok(fill_value) => Expr::Alias(Alias { + expr: Box::new(Expr::ScalarFunction(ScalarFunction { + func: coalesce(), + args: vec![col(field.name()), lit(fill_value)], + })), + relation: None, + name: field.name().to_string(), + }), + Err(_) => col(field.name()), + } + } else { + col(field.name()) + } + }) + .collect::>(); + + self.clone().select(projections) + } + + // Helper to find columns from names + fn find_columns(&self, names: &[String]) -> Result> { + let schema = self.logical_plan().schema(); + names + .iter() + .map(|name| { + schema + .field_with_name(None, name) + .map(|f| f.clone()) + .map_err(|_| { + DataFusionError::Plan(format!("Column '{}' not found", name)) + }) + }) + .collect() + } } #[derive(Debug)] diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index d545157607c7..51a9fd3d7e26 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -5342,3 +5342,44 @@ async fn test_insert_into_checking() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_fill_null() -> Result<()> { + // Create a simple table with nulls. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8, true), + ])); + let a_values = Int32Array::from(vec![Some(1), None, Some(3)]); + let b_values = StringArray::from(vec![Some("x"), None, Some("z")]); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(a_values), Arc::new(b_values)], + )?; + + let ctx = SessionContext::new(); + let table = MemTable::try_new(schema.clone(), vec![vec![batch]])?; + ctx.register_table("t_null", Arc::new(table))?; + let df = ctx.table("t_null").await?; + + // Use fill_null to replace nulls on each column. + let df_filled = df + .fill_null(ScalarValue::Int32(Some(0)), Some(vec!["a".to_string()]))? + .fill_null( + ScalarValue::Utf8(Some("default".to_string())), + Some(vec!["b".to_string()]), + )?; + + let results = df_filled.collect().await?; + let expected = vec![ + "+---+---------+", + "| a | b |", + "+---+---------+", + "| 1 | x |", + "| 0 | default |", + "| 3 | z |", + "+---+---------+", + ]; + assert_batches_sorted_eq!(expected, &results); + Ok(()) +} From c334f5d07290635b7ba3e716d0874fdc95352e96 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 19 Feb 2025 16:04:12 +0800 Subject: [PATCH 2/9] test: refactor fill_null tests and create helper function for null table --- datafusion/core/tests/dataframe/mod.rs | 49 ++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 51a9fd3d7e26..3bc67af13105 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -5343,9 +5343,7 @@ async fn test_insert_into_checking() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_fill_null() -> Result<()> { - // Create a simple table with nulls. +async fn create_null_table() -> Result { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Utf8, true), @@ -5361,6 +5359,12 @@ async fn test_fill_null() -> Result<()> { let table = MemTable::try_new(schema.clone(), vec![vec![batch]])?; ctx.register_table("t_null", Arc::new(table))?; let df = ctx.table("t_null").await?; + Ok(df) +} + +#[tokio::test] +async fn test_fill_null() -> Result<()> { + let df = create_null_table().await?; // Use fill_null to replace nulls on each column. let df_filled = df @@ -5383,3 +5387,42 @@ async fn test_fill_null() -> Result<()> { assert_batches_sorted_eq!(expected, &results); Ok(()) } + +#[tokio::test] +async fn test_fill_null_all_columns() -> Result<()> { + let df = create_null_table().await?; + + // Use fill_null to replace nulls on all columns. + // Only column "b" will be replaced since ScalarValue::Utf8(Some("default".to_string())) + // can be cast to Utf8. + let df_filled = df.fill_null(ScalarValue::Utf8(Some("default".to_string())), None)?; + + let results = df_filled.clone().collect().await?; + + let expected = vec![ + "+---+---------+", + "| a | b |", + "+---+---------+", + "| 1 | x |", + "| | default |", + "| 3 | z |", + "+---+---------+", + ]; + + assert_batches_sorted_eq!(expected, &results); + + let df_filled = df_filled.fill_null(ScalarValue::Int32(Some(0)), None)?; + + let results = df_filled.collect().await?; + let expected = vec![ + "+---+---------+", + "| a | b |", + "+---+---------+", + "| 1 | x |", + "| 0 | default |", + "| 3 | z |", + "+---+---------+", + ]; + assert_batches_sorted_eq!(expected, &results); + Ok(()) +} From 1575a1f7b2e025f003a9ebc347d68ce710fe50a3 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Wed, 19 Feb 2025 16:04:24 +0800 Subject: [PATCH 3/9] style: reorder imports in mod.rs for better organization --- datafusion/core/src/dataframe/mod.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index cb29a9684354..41688f92e504 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -53,11 +53,13 @@ use datafusion_common::{ exec_err, not_impl_err, plan_err, Column, DFSchema, DataFusionError, ParamValues, ScalarValue, SchemaError, UnnestOptions, }; -use datafusion_expr::dml::InsertOp; -use datafusion_expr::expr::{Alias, ScalarFunction}; -use datafusion_expr::{case, is_null, lit, SortExpr}; use datafusion_expr::{ - utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE, + case, + dml::InsertOp, + expr::{Alias, ScalarFunction}, + is_null, lit, + utils::COUNT_STAR_EXPANSION, + SortExpr, TableProviderFilterPushDown, UNNAMED_TABLE, }; use datafusion_functions::core::coalesce; use datafusion_functions_aggregate::expr_fn::{ From 7a1b99c4a6d05a4c2fa75ace7b316343d7296a73 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 20 Feb 2025 10:05:59 +0800 Subject: [PATCH 4/9] clippy lint --- datafusion/core/src/dataframe/mod.rs | 9 +++------ datafusion/core/tests/dataframe/mod.rs | 6 +++--- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 41688f92e504..5cfd98196a2d 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1989,12 +1989,9 @@ impl DataFrame { names .iter() .map(|name| { - schema - .field_with_name(None, name) - .map(|f| f.clone()) - .map_err(|_| { - DataFusionError::Plan(format!("Column '{}' not found", name)) - }) + schema.field_with_name(None, name).cloned().map_err(|_| { + DataFusionError::Plan(format!("Column '{}' not found", name)) + }) }) .collect() } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 3bc67af13105..6e5106e9e712 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -5375,7 +5375,7 @@ async fn test_fill_null() -> Result<()> { )?; let results = df_filled.collect().await?; - let expected = vec![ + let expected = [ "+---+---------+", "| a | b |", "+---+---------+", @@ -5399,7 +5399,7 @@ async fn test_fill_null_all_columns() -> Result<()> { let results = df_filled.clone().collect().await?; - let expected = vec![ + let expected = [ "+---+---------+", "| a | b |", "+---+---------+", @@ -5414,7 +5414,7 @@ async fn test_fill_null_all_columns() -> Result<()> { let df_filled = df_filled.fill_null(ScalarValue::Int32(Some(0)), None)?; let results = df_filled.collect().await?; - let expected = vec![ + let expected = [ "+---+---------+", "| a | b |", "+---+---------+", From d517fba5212391980dca6c270a0f40d920218b7b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Thu, 20 Feb 2025 10:35:15 +0800 Subject: [PATCH 5/9] test: add comment to clarify test --- datafusion/core/tests/dataframe/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 6e5106e9e712..e884fdacecfb 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -5344,6 +5344,14 @@ async fn test_insert_into_checking() -> Result<()> { } async fn create_null_table() -> Result { + // create a DataFrame with null values + // "+---+----+", + // "| a | b |", + // "+---+---+", + // "| 1 | x |", + // "| | |", + // "| 3 | z |", + // "+---+---+", let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), Field::new("b", DataType::Utf8, true), @@ -5411,6 +5419,7 @@ async fn test_fill_null_all_columns() -> Result<()> { assert_batches_sorted_eq!(expected, &results); + // Fill column "a" null values with a value that cannot be cast to Int32. let df_filled = df_filled.fill_null(ScalarValue::Int32(Some(0)), None)?; let results = df_filled.collect().await?; From 884a2ff4eac8d7166679726926b7f9afedb8b04c Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 24 Feb 2025 14:45:46 +0800 Subject: [PATCH 6/9] refactor: columns Vec --- datafusion/core/src/dataframe/mod.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 5cfd98196a2d..c19177dc01bc 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -50,8 +50,8 @@ use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::config::{CsvOptions, JsonOptions}; use datafusion_common::{ - exec_err, not_impl_err, plan_err, Column, DFSchema, DataFusionError, ParamValues, - ScalarValue, SchemaError, UnnestOptions, + exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, + DataFusionError, ParamValues, ScalarValue, SchemaError, UnnestOptions, }; use datafusion_expr::{ case, @@ -1932,26 +1932,26 @@ impl DataFrame { } /// Fill null values in specified columns with a given value - /// If no columns are specified, applies to all columns + /// If no columns are specified (empty vector), applies to all columns /// Only fills if the value can be cast to the column's type /// /// # Arguments /// * `value` - Value to fill nulls with - /// * `columns` - Optional list of column names to fill. If None, fills all columns + /// * `columns` - List of column names to fill. If empty, fills all columns pub fn fill_null( &self, value: ScalarValue, - columns: Option>, + columns: Vec, ) -> Result { - let cols = match columns { - Some(names) => self.find_columns(&names)?, - None => self - .logical_plan() + let cols = if columns.is_empty() { + self.logical_plan() .schema() .fields() .iter() .map(|f| f.as_ref().clone()) - .collect(), + .collect() + } else { + self.find_columns(&columns)? }; // Create projections for each column @@ -1989,9 +1989,10 @@ impl DataFrame { names .iter() .map(|name| { - schema.field_with_name(None, name).cloned().map_err(|_| { - DataFusionError::Plan(format!("Column '{}' not found", name)) - }) + schema + .field_with_name(None, name) + .cloned() + .map_err(|_| plan_datafusion_err!("Column '{}' not found", name)) }) .collect() } From b9bbbd31f7ffb4f38d0aace274a47f2daedae26a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 24 Feb 2025 15:00:57 +0800 Subject: [PATCH 7/9] docs: enhance fill_null documentation with example usage --- datafusion/core/src/dataframe/mod.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index c19177dc01bc..7646c19c4e56 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1937,7 +1937,17 @@ impl DataFrame { /// /// # Arguments /// * `value` - Value to fill nulls with - /// * `columns` - List of column names to fill. If empty, fills all columns + /// * `columns` - List of column names to fill. If empty, fills all columns. + /// + /// # Example + /// ``` + /// // Example usage of fill_null: + /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; + /// // Fill nulls in only columns "a" and "c": + /// let df = df.fill_null(ScalarValue::from(0), vec!["a".to_owned(), "c".to_owned()])?; + /// // Fill nulls across all columns: + /// let df = df.fill_null(ScalarValue::from(0), vec![])?; + /// ``` pub fn fill_null( &self, value: ScalarValue, From b28756f0140037f2236d75da216a245f9dbaff88 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 24 Feb 2025 15:01:44 +0800 Subject: [PATCH 8/9] test: columns Vec --- datafusion/core/tests/dataframe/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index e884fdacecfb..ae7e46a1ef89 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -5376,10 +5376,10 @@ async fn test_fill_null() -> Result<()> { // Use fill_null to replace nulls on each column. let df_filled = df - .fill_null(ScalarValue::Int32(Some(0)), Some(vec!["a".to_string()]))? + .fill_null(ScalarValue::Int32(Some(0)), vec!["a".to_string()])? .fill_null( ScalarValue::Utf8(Some("default".to_string())), - Some(vec!["b".to_string()]), + vec!["b".to_string()], )?; let results = df_filled.collect().await?; @@ -5403,7 +5403,8 @@ async fn test_fill_null_all_columns() -> Result<()> { // Use fill_null to replace nulls on all columns. // Only column "b" will be replaced since ScalarValue::Utf8(Some("default".to_string())) // can be cast to Utf8. - let df_filled = df.fill_null(ScalarValue::Utf8(Some("default".to_string())), None)?; + let df_filled = + df.fill_null(ScalarValue::Utf8(Some("default".to_string())), vec![])?; let results = df_filled.clone().collect().await?; @@ -5420,7 +5421,7 @@ async fn test_fill_null_all_columns() -> Result<()> { assert_batches_sorted_eq!(expected, &results); // Fill column "a" null values with a value that cannot be cast to Int32. - let df_filled = df_filled.fill_null(ScalarValue::Int32(Some(0)), None)?; + let df_filled = df_filled.fill_null(ScalarValue::Int32(Some(0)), vec![])?; let results = df_filled.collect().await?; let expected = [ From 36fd40889c656ca8cf97563242d8d5b1b73d7b1a Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 24 Feb 2025 15:06:28 +0800 Subject: [PATCH 9/9] docs: update fill_null documentation with detailed usage examples --- datafusion/core/src/dataframe/mod.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 7646c19c4e56..2883f4586c5b 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1941,12 +1941,19 @@ impl DataFrame { /// /// # Example /// ``` - /// // Example usage of fill_null: + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # use datafusion_common::ScalarValue; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; /// // Fill nulls in only columns "a" and "c": /// let df = df.fill_null(ScalarValue::from(0), vec!["a".to_owned(), "c".to_owned()])?; /// // Fill nulls across all columns: /// let df = df.fill_null(ScalarValue::from(0), vec![])?; + /// # Ok(()) + /// # } /// ``` pub fn fill_null( &self,