Skip to content

Commit 30a6ed5

Browse files
Omega359alamb
andauthored
Add distinct_on to dataframe api (#11012)
* Add distinct_on to dataframe api #11011 * cargo fmt * Update datafusion/core/src/dataframe/mod.rs as per reviewer feedback Co-authored-by: Andrew Lamb <[email protected]> --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 8aad936 commit 30a6ed5

File tree

2 files changed

+118
-0
lines changed

2 files changed

+118
-0
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,38 @@ impl DataFrame {
522522
})
523523
}
524524

525+
/// Return a new `DataFrame` with duplicated rows removed as per the specified expression list
526+
/// according to the provided sorting expressions grouped by the `DISTINCT ON` clause
527+
/// expressions.
528+
///
529+
/// # Example
530+
/// ```
531+
/// # use datafusion::prelude::*;
532+
/// # use datafusion::error::Result;
533+
/// # #[tokio::main]
534+
/// # async fn main() -> Result<()> {
535+
/// let ctx = SessionContext::new();
536+
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
537+
/// // Return a single row (a, b) for each distinct value of a
538+
/// .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
539+
/// # Ok(())
540+
/// # }
541+
/// ```
542+
pub fn distinct_on(
543+
self,
544+
on_expr: Vec<Expr>,
545+
select_expr: Vec<Expr>,
546+
sort_expr: Option<Vec<Expr>>,
547+
) -> Result<DataFrame> {
548+
let plan = LogicalPlanBuilder::from(self.plan)
549+
.distinct_on(on_expr, select_expr, sort_expr)?
550+
.build()?;
551+
Ok(DataFrame {
552+
session_state: self.session_state,
553+
plan,
554+
})
555+
}
556+
525557
/// Return a new `DataFrame` that has statistics for a DataFrame.
526558
///
527559
/// Only summarizes numeric datatypes at the moment and returns nulls for
@@ -2359,6 +2391,91 @@ mod tests {
23592391
Ok(())
23602392
}
23612393

2394+
#[tokio::test]
2395+
async fn test_distinct_on() -> Result<()> {
2396+
let t = test_table().await?;
2397+
let plan = t
2398+
.distinct_on(vec![col("c1")], vec![col("aggregate_test_100.c1")], None)
2399+
.unwrap();
2400+
2401+
let sql_plan =
2402+
create_plan("select distinct on (c1) c1 from aggregate_test_100").await?;
2403+
2404+
assert_same_plan(&plan.plan.clone(), &sql_plan);
2405+
2406+
let df_results = plan.clone().collect().await?;
2407+
2408+
#[rustfmt::skip]
2409+
assert_batches_sorted_eq!(
2410+
["+----+",
2411+
"| c1 |",
2412+
"+----+",
2413+
"| a |",
2414+
"| b |",
2415+
"| c |",
2416+
"| d |",
2417+
"| e |",
2418+
"+----+"],
2419+
&df_results
2420+
);
2421+
2422+
Ok(())
2423+
}
2424+
2425+
#[tokio::test]
2426+
async fn test_distinct_on_sort_by() -> Result<()> {
2427+
let t = test_table().await?;
2428+
let plan = t
2429+
.select(vec![col("c1")])
2430+
.unwrap()
2431+
.distinct_on(
2432+
vec![col("c1")],
2433+
vec![col("c1")],
2434+
Some(vec![col("c1").sort(true, true)]),
2435+
)
2436+
.unwrap()
2437+
.sort(vec![col("c1").sort(true, true)])
2438+
.unwrap();
2439+
2440+
let df_results = plan.clone().collect().await?;
2441+
2442+
#[rustfmt::skip]
2443+
assert_batches_sorted_eq!(
2444+
["+----+",
2445+
"| c1 |",
2446+
"+----+",
2447+
"| a |",
2448+
"| b |",
2449+
"| c |",
2450+
"| d |",
2451+
"| e |",
2452+
"+----+"],
2453+
&df_results
2454+
);
2455+
2456+
Ok(())
2457+
}
2458+
2459+
#[tokio::test]
2460+
async fn test_distinct_on_sort_by_unprojected() -> Result<()> {
2461+
let t = test_table().await?;
2462+
let err = t
2463+
.select(vec![col("c1")])
2464+
.unwrap()
2465+
.distinct_on(
2466+
vec![col("c1")],
2467+
vec![col("c1")],
2468+
Some(vec![col("c1").sort(true, true)]),
2469+
)
2470+
.unwrap()
2471+
// try to sort on some value not present in input to distinct
2472+
.sort(vec![col("c2").sort(true, true)])
2473+
.unwrap_err();
2474+
assert_eq!(err.strip_backtrace(), "Error during planning: For SELECT DISTINCT, ORDER BY expressions c2 must appear in select list");
2475+
2476+
Ok(())
2477+
}
2478+
23622479
#[tokio::test]
23632480
async fn join() -> Result<()> {
23642481
let left = test_table().await?.select_columns(&["c1", "c2"])?;

docs/source/user-guide/dataframe.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ execution. The plan is evaluated (executed) when an action method is invoked, su
6464
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ |
6565
| aggregate | Perform an aggregate query with optional grouping expressions. |
6666
| distinct | Filter out duplicate rows. |
67+
| distinct_on | Filter out duplicate rows based on provided expressions. |
6768
| drop_columns | Create a projection with all but the provided column names. |
6869
| except | Calculate the exception of two DataFrames. The two DataFrames must have exactly the same schema |
6970
| filter | Filter a DataFrame to only include rows that match the specified filter expression. |

0 commit comments

Comments
 (0)