Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into sqllogictest-nan
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 25, 2023
2 parents a8f842e + d059dd3 commit 73afe5f
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ jobs:
needs: [ linux-build-lib ]
runs-on: ubuntu-20.04
container:
image: amd64/rust
image: amd64/rust:bullseye # Workaround https://github.com/actions/setup-python/issues/721
steps:
- uses: actions/checkout@v3
with:
Expand Down
104 changes: 103 additions & 1 deletion docs/source/library-user-guide/adding-udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,106 @@

# Adding User Defined Functions: Scalar/Window/Aggregate

Coming Soon
User Defined Functions (UDFs) are functions that can be used in the context of DataFusion execution.

This page covers how to add UDFs to DataFusion. In particular, it covers how to add Scalar, Window, and Aggregate UDFs.

| UDF Type | Description | Example |
| --------- | ---------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------- |
| Scalar | A function that takes a row of data and returns a single value. | [simple_udf.rs](../../../datafusion-examples/examples/simple_udf.rs) |
| Window | A function that takes a row of data and returns a single value, but also has access to the rows around it. | [simple_udwf.rs](../../../datafusion-examples/examples/simple_udwf.rs) |
| Aggregate | A function that takes a group of rows and returns a single value. | [simple_udaf.rs](../../../datafusion-examples/examples/simple_udaf.rs) |

First we'll talk about adding an Scalar UDF end-to-end, then we'll talk about the differences between the different types of UDFs.

## Adding a Scalar UDF

A Scalar UDF is a function that takes a row of data and returns a single value. For example, this function takes a single i64 and returns a single i64 with 1 added to it:

```rust
use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array};
use datafusion::common::Result;

use datafusion::common::cast::as_int64_array;

pub fn add_one(args: &[ArrayRef]) -> Result<ArrayRef> {
// Error handling omitted for brevity

let i64s = as_int64_array(&args[0])?;

let new_array = i64s
.iter()
.map(|array_elem| array_elem.map(|value| value + 1))
.collect::<Int64Array>();

Ok(Arc::new(new_array))
}
```

For brevity, we'll skipped some error handling, but e.g. you may want to check that `args.len()` is the expected number of arguments.

This "works" in isolation, i.e. if you have a slice of `ArrayRef`s, you can call `add_one` and it will return a new `ArrayRef` with 1 added to each value.

```rust
let input = vec![Some(1), None, Some(3)];
let input = Arc::new(Int64Array::from(input)) as ArrayRef;

let result = add_one(&[input]).unwrap();
let result = result.as_any().downcast_ref::<Int64Array>().unwrap();

assert_eq!(result, &Int64Array::from(vec![Some(2), None, Some(4)]));
```

The challenge however is that DataFusion doesn't know about this function. We need to register it with DataFusion so that it can be used in the context of a query.

### Registering a Scalar UDF

To register a Scalar UDF, you need to wrap the function implementation in a `ScalarUDF` struct and then register it with the `SessionContext`. DataFusion provides the `create_udf` and `make_scalar_function` helper functions to make this easier.

```rust
let udf = create_udf(
"add_one",
vec![DataType::Int64],
Arc::new(DataType::Int64),
Volatility::Immutable,
make_scalar_function(add_one),
);
```

A few things to note:

- The first argument is the name of the function. This is the name that will be used in SQL queries.
- The second argument is a vector of `DataType`s. This is the list of argument types that the function accepts. I.e. in this case, the function accepts a single `Int64` argument.
- The third argument is the return type of the function. I.e. in this case, the function returns an `Int64`.
- The fourth argument is the volatility of the function. In short, this is used to determine if the function's performance can be optimized in some situations. In this case, the function is `Immutable` because it always returns the same value for the same input. A random number generator would be `Volatile` because it returns a different value for the same input.
- The fifth argument is the function implementation. This is the function that we defined above.

That gives us a `ScalarUDF` that we can register with the `SessionContext`:

```rust
let mut ctx = SessionContext::new();

ctx.register_udf(udf);
```

At this point, you can use the `add_one` function in your query:

```rust
let sql = "SELECT add_one(1)";

let df = ctx.sql(&sql).await.unwrap();
```

## Adding a Window UDF

Scalar UDFs are functions that take a row of data and return a single value. Window UDFs are similar, but they also have access to the rows around them. Access to the the proximal rows is helpful, but adds some complexity to the implementation.

Body coming soon.

## Adding an Aggregate UDF

Aggregate UDFs are functions that take a group of rows and return a single value. These are akin to SQL's `SUM` or `COUNT` functions.

Body coming soon.
164 changes: 163 additions & 1 deletion docs/source/library-user-guide/working-with-exprs.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,166 @@

# Working with Exprs

Coming Soon
<!-- https://github.com/apache/arrow-datafusion/issues/7304 -->

`Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation, and follows the standard "expression tree" abstraction found in most compilers and databases.

For example, the SQL expression `a + b` would be represented as an `Expr` with a `BinaryExpr` variant. A `BinaryExpr` has a left and right `Expr` and an operator.

As another example, the SQL expression `a + b * c` would be represented as an `Expr` with a `BinaryExpr` variant. The left `Expr` would be `a` and the right `Expr` would be another `BinaryExpr` with a left `Expr` of `b` and a right `Expr` of `c`. As a classic expression tree, this would look like:

```text
┌────────────────────┐
│ BinaryExpr │
│ op: + │
└────────────────────┘
▲ ▲
┌───────┘ └────────────────┐
│ │
┌────────────────────┐ ┌────────────────────┐
│ Expr::Col │ │ BinaryExpr │
│ col: a │ │ op: * │
└────────────────────┘ └────────────────────┘
▲ ▲
┌────────┘ └─────────┐
│ │
┌────────────────────┐ ┌────────────────────┐
│ Expr::Col │ │ Expr::Col │
│ col: b │ │ col: c │
└────────────────────┘ └────────────────────┘
```

As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF.

There are also executable examples for working with `Expr`s:

- [rewrite_expr.rs](../../../datafusion-examples/examples/catalog.rs)
- [expr_api.rs](../../../datafusion-examples/examples/expr_api.rs)

## A Scalar UDF Example

We'll use a `ScalarUDF` expression as our example. This necessitates implementing an actual UDF, and for ease we'll use the same example from the [adding UDFs](./adding-udfs.md) guide.

So assuming you've written that function, you can use it to create an `Expr`:

```rust
let add_one_udf = create_udf(
"add_one",
vec![DataType::Int64],
Arc::new(DataType::Int64),
Volatility::Immutable,
make_scalar_function(add_one), // <-- the function we wrote
);

// make the expr `add_one(5)`
let expr = add_one_udf.call(vec![lit(5)]);

// make the expr `add_one(my_column)`
let expr = add_one_udf.call(vec![col("my_column")]);
```

If you'd like to learn more about `Expr`s, before we get into the details of creating and rewriting them, you can read the [expression user-guide](./../user-guide/expressions.md).

## Rewriting Exprs

Rewriting Expressions is the process of taking an `Expr` and transforming it into another `Expr`. This is useful for a number of reasons, including:

- Simplifying `Expr`s to make them easier to evaluate
- Optimizing `Expr`s to make them faster to evaluate
- Converting `Expr`s to other forms, e.g. converting a `BinaryExpr` to a `CastExpr`

In our example, we'll use rewriting to update our `add_one` UDF, to be rewritten as a `BinaryExpr` with a `Literal` of 1. We're effectively inlining the UDF.

### Rewriting with `transform`

To implement the inlining, we'll need to write a function that takes an `Expr` and returns a `Result<Expr>`. If the expression is _not_ to be rewritten `Transformed::No` is used to wrap the original `Expr`. If the expression _is_ to be rewritten, `Transformed::Yes` is used to wrap the new `Expr`.

```rust
fn rewrite_add_one(expr: Expr) -> Result<Expr> {
expr.transform(&|expr| {
Ok(match expr {
Expr::ScalarUDF(scalar_fun) if scalar_fun.fun.name == "add_one" => {
let input_arg = scalar_fun.args[0].clone();
let new_expression = input_arg + lit(1i64);

Transformed::Yes(new_expression)
}
_ => Transformed::No(expr),
})
})
}
```

### Creating an `OptimizerRule`

In DataFusion, an `OptimizerRule` is a trait that supports rewriting`Expr`s that appear in various parts of the `LogicalPlan`. It follows DataFusion's general mantra of trait implementations to drive behavior.

We'll call our rule `AddOneInliner` and implement the `OptimizerRule` trait. The `OptimizerRule` trait has two methods:

- `name` - returns the name of the rule
- `try_optimize` - takes a `LogicalPlan` and returns an `Option<LogicalPlan>`. If the rule is able to optimize the plan, it returns `Some(LogicalPlan)` with the optimized plan. If the rule is not able to optimize the plan, it returns `None`.

```rust
struct AddOneInliner {}

impl OptimizerRule for AddOneInliner {
fn name(&self) -> &str {
"add_one_inliner"
}

fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// Map over the expressions and rewrite them
let new_expressions = plan
.expressions()
.into_iter()
.map(|expr| rewrite_add_one(expr))
.collect::<Result<Vec<_>>>()?;

let inputs = plan.inputs().into_iter().cloned().collect::<Vec<_>>();

let plan = plan.with_new_exprs(&new_expressions, &inputs);

plan.map(Some)
}
}
```

Note the use of `rewrite_add_one` which is mapped over `plan.expressions()` to rewrite the expressions, then `plan.with_new_exprs` is used to create a new `LogicalPlan` with the rewritten expressions.

We're almost there. Let's just test our rule works properly.

## Testing the Rule

Testing the rule is fairly simple, we can create a SessionState with our rule and then create a DataFrame and run a query. The logical plan will be optimized by our rule.

```rust
use datafusion::prelude::*;

let rules = Arc::new(AddOneInliner {});
let state = ctx.state().with_optimizer_rules(vec![rules]);

let ctx = SessionContext::with_state(state);
ctx.register_udf(add_one);

let sql = "SELECT add_one(1) AS added_one";
let plan = ctx.sql(sql).await?.logical_plan();

println!("{:?}", plan);
```

This results in the following output:

```text
Projection: Int64(1) + Int64(1) AS added_one
EmptyRelation
```

I.e. the `add_one` UDF has been inlined into the projection.

## Conclusion

In this guide, we've seen how to create `Expr`s programmatically and how to rewrite them. This is useful for simplifying and optimizing `Expr`s. We've also seen how to test our rule to ensure it works properly.

0 comments on commit 73afe5f

Please sign in to comment.