Skip to content

Commit 0cd0bbe

Browse files
authored
Consolidate example to_date.rs into dateframe.rs (#13939)
* Consolidate example to_date.rs into dateframe.rs * Assert results using assert_batches_eq * clippy
1 parent 4d07579 commit 0cd0bbe

File tree

3 files changed

+91
-78
lines changed

3 files changed

+91
-78
lines changed

datafusion-examples/examples/dataframe.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,24 @@ use std::io::Write;
2828
use std::sync::Arc;
2929
use tempfile::tempdir;
3030

31-
/// This example demonstrates using DataFusion's DataFrame API to
31+
/// This example demonstrates using DataFusion's DataFrame API
32+
///
33+
/// # Reading from different formats
3234
///
3335
/// * [read_parquet]: execute queries against parquet files
3436
/// * [read_csv]: execute queries against csv files
3537
/// * [read_memory]: execute queries against in-memory arrow data
3638
///
37-
/// This example demonstrates the various methods to write out a DataFrame to local storage.
38-
/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example
39-
/// using a remote object store.
39+
/// # Writing out to local storage
40+
///
41+
/// The following examples demonstrate how to write a DataFrame to local
42+
/// storage. See `external_dependency/dataframe-to-s3.rs` for an example writing
43+
/// to a remote object store.
44+
///
4045
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
46+
///
47+
/// # Querying data
48+
/// * [query_to_date]: execute queries against parquet files
4149
#[tokio::main]
4250
async fn main() -> Result<()> {
4351
// The SessionContext is the main high level API for interacting with DataFusion
@@ -46,6 +54,7 @@ async fn main() -> Result<()> {
4654
read_csv(&ctx).await?;
4755
read_memory(&ctx).await?;
4856
write_out(&ctx).await?;
57+
query_to_date().await?;
4958
Ok(())
5059
}
5160

@@ -206,3 +215,38 @@ async fn write_out(ctx: &SessionContext) -> std::result::Result<(), DataFusionEr
206215

207216
Ok(())
208217
}
218+
219+
/// This example demonstrates how to use the to_date series
220+
/// of functions in the DataFrame API as well as via sql.
221+
async fn query_to_date() -> Result<()> {
222+
// define a schema.
223+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
224+
225+
// define data.
226+
let batch = RecordBatch::try_new(
227+
schema,
228+
vec![Arc::new(StringArray::from(vec![
229+
"2020-09-08T13:42:29Z",
230+
"2020-09-08T13:42:29.190855-05:00",
231+
"2020-08-09 12:13:29",
232+
"2020-01-02",
233+
]))],
234+
)?;
235+
236+
// declare a new context. In spark API, this corresponds to a new spark SQLsession
237+
let ctx = SessionContext::new();
238+
239+
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
240+
ctx.register_batch("t", batch)?;
241+
let df = ctx.table("t").await?;
242+
243+
// use to_date function to convert col 'a' to timestamp type using the default parsing
244+
let df = df.with_column("a", to_date(vec![col("a")]))?;
245+
246+
let df = df.select_columns(&["a"])?;
247+
248+
// print the results
249+
df.show().await?;
250+
251+
Ok(())
252+
}

datafusion-examples/examples/sql_query.rs

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@ use datafusion::datasource::listing::ListingOptions;
2323
use datafusion::datasource::MemTable;
2424
use datafusion::error::{DataFusionError, Result};
2525
use datafusion::prelude::SessionContext;
26-
use datafusion_common::exec_datafusion_err;
26+
use datafusion_common::{assert_batches_eq, exec_datafusion_err};
2727
use object_store::local::LocalFileSystem;
2828
use std::path::Path;
2929
use std::sync::Arc;
30-
use std::time::Duration;
31-
use tokio::time::timeout;
3230

3331
/// Examples of various ways to execute queries using SQL
3432
///
@@ -52,17 +50,30 @@ pub async fn query_memtable() -> Result<()> {
5250
// Register the in-memory table containing the data
5351
ctx.register_table("users", Arc::new(mem_table))?;
5452

53+
// running a SQL query results in a "DataFrame", which can be used
54+
// to execute the query and collect the results
5555
let dataframe = ctx.sql("SELECT * FROM users;").await?;
5656

57-
timeout(Duration::from_secs(10), async move {
58-
let result = dataframe.collect().await.unwrap();
59-
let record_batch = result.first().unwrap();
60-
61-
assert_eq!(1, record_batch.column(0).len());
62-
dbg!(record_batch.columns());
63-
})
64-
.await
65-
.unwrap();
57+
// Calling 'show' on the dataframe will execute the query and
58+
// print the results
59+
dataframe.clone().show().await?;
60+
61+
// calling 'collect' on the dataframe will execute the query and
62+
// buffer the results into a vector of RecordBatch. There are other
63+
// APIs on DataFrame for incrementally generating results (e.g. streaming)
64+
let result = dataframe.collect().await?;
65+
66+
// Use the assert_batches_eq macro to compare the results
67+
assert_batches_eq!(
68+
[
69+
"+----+--------------+",
70+
"| id | bank_account |",
71+
"+----+--------------+",
72+
"| 1 | 9000 |",
73+
"+----+--------------+",
74+
],
75+
&result
76+
);
6677

6778
Ok(())
6879
}
@@ -133,7 +144,16 @@ async fn query_parquet() -> Result<()> {
133144
.await?;
134145

135146
// print the results
136-
df.show().await?;
147+
let results = df.collect().await?;
148+
assert_batches_eq!(
149+
[
150+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
151+
"| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |",
152+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
153+
"| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |",
154+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
155+
],
156+
&results);
137157

138158
// Second example were we temporarily move into the test data's parent directory and
139159
// simulate a relative path, this requires registering an ObjectStore.
@@ -173,7 +193,16 @@ async fn query_parquet() -> Result<()> {
173193
.await?;
174194

175195
// print the results
176-
df.show().await?;
196+
let results = df.collect().await?;
197+
assert_batches_eq!(
198+
[
199+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
200+
"| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |",
201+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
202+
"| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |",
203+
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
204+
],
205+
&results);
177206

178207
// Reset the current directory
179208
std::env::set_current_dir(cur_dir)?;

datafusion-examples/examples/to_date.rs

Lines changed: 0 additions & 60 deletions
This file was deleted.

0 commit comments

Comments
 (0)