Skip to content

Commit ba26f13

Browse files
committed
Merge branch 'main' into 13525/invariant-checking-for-implicit-LP-changes
2 parents e52187e + 405b99c commit ba26f13

File tree

200 files changed

+7833
-2899
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

200 files changed

+7833
-2899
lines changed

.github/workflows/rust.yml

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -326,22 +326,26 @@ jobs:
326326
env:
327327
POSTGRES_PORT: ${{ job.services.postgres.ports[5432] }}
328328

329-
windows:
330-
name: cargo test (win64)
331-
runs-on: windows-latest
332-
steps:
333-
- uses: actions/checkout@v4
334-
with:
335-
submodules: true
336-
- name: Setup Rust toolchain
337-
uses: ./.github/actions/setup-windows-builder
338-
- name: Run tests (excluding doctests)
339-
shell: bash
340-
run: |
341-
export PATH=$PATH:$HOME/d/protoc/bin
342-
cargo test --lib --tests --bins --features avro,json,backtrace
343-
cd datafusion-cli
344-
cargo test --lib --tests --bins --all-features
329+
# Temporarily commenting out the Windows flow, the reason is enormously slow running build
330+
# Waiting for new Windows 2025 github runner
331+
# Details: https://github.com/apache/datafusion/issues/13726
332+
#
333+
# windows:
334+
# name: cargo test (win64)
335+
# runs-on: windows-latest
336+
# steps:
337+
# - uses: actions/checkout@v4
338+
# with:
339+
# submodules: true
340+
# - name: Setup Rust toolchain
341+
# uses: ./.github/actions/setup-windows-builder
342+
# - name: Run tests (excluding doctests)
343+
# shell: bash
344+
# run: |
345+
# export PATH=$PATH:$HOME/d/protoc/bin
346+
# cargo test --lib --tests --bins --features avro,json,backtrace
347+
# cd datafusion-cli
348+
# cargo test --lib --tests --bins --all-features
345349

346350
macos:
347351
name: cargo test (macos)

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ recursive = "0.1.1"
147147
regex = "1.8"
148148
rstest = "0.23.0"
149149
serde_json = "1"
150-
sqlparser = { version = "0.52.0", features = ["visitor"] }
150+
sqlparser = { version = "0.53.0", features = ["visitor"] }
151151
tempfile = "3"
152152
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
153153
url = "2.2"

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ Default features:
112112
- `parquet`: support for reading the [Apache Parquet] format
113113
- `regex_expressions`: regular expression functions, such as `regexp_match`
114114
- `unicode_expressions`: Include unicode aware functions such as `character_length`
115-
- `unparser` : enables support to reverse LogicalPlans back into SQL
115+
- `unparser`: enables support to reverse LogicalPlans back into SQL
116+
- `recursive-protection`: uses [recursive](https://docs.rs/recursive/latest/recursive/) for stack overflow protection.
116117

117118
Optional features:
118119

datafusion-cli/Cargo.lock

Lines changed: 20 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/src/exec.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ use crate::{
3333
};
3434

3535
use datafusion::common::instant::Instant;
36-
use datafusion::common::plan_datafusion_err;
36+
use datafusion::common::{plan_datafusion_err, plan_err};
3737
use datafusion::config::ConfigFileType;
3838
use datafusion::datasource::listing::ListingTableUrl;
3939
use datafusion::error::{DataFusionError, Result};
4040
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
41+
use datafusion::physical_plan::execution_plan::EmissionType;
4142
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
4243
use datafusion::sql::parser::{DFParser, Statement};
4344
use datafusion::sql::sqlparser::dialect::dialect_from_str;
@@ -234,10 +235,19 @@ pub(super) async fn exec_and_print(
234235
let df = ctx.execute_logical_plan(plan).await?;
235236
let physical_plan = df.create_physical_plan().await?;
236237

237-
if physical_plan.execution_mode().is_unbounded() {
238+
if physical_plan.boundedness().is_unbounded() {
239+
if physical_plan.pipeline_behavior() == EmissionType::Final {
240+
return plan_err!(
241+
"The given query can generate a valid result only once \
242+
the source finishes, but the source is unbounded"
243+
);
244+
}
245+
// As the input stream comes, we can generate results.
246+
// However, memory safety is not guaranteed.
238247
let stream = execute_stream(physical_plan, task_ctx.clone())?;
239248
print_options.print_stream(stream, now).await?;
240249
} else {
250+
// Bounded stream; collected results are printed after all input consumed.
241251
let schema = physical_plan.schema();
242252
let results = collect(physical_plan, task_ctx.clone()).await?;
243253
adjusted.into_inner().print_batches(schema, &results, now)?;

datafusion-cli/src/object_storage.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,12 +472,13 @@ mod tests {
472472

473473
#[tokio::test]
474474
async fn s3_object_store_builder() -> Result<()> {
475-
let access_key_id = "fake_access_key_id";
476-
let secret_access_key = "fake_secret_access_key";
475+
// "fake" is uppercase to ensure the values are not lowercased when parsed
476+
let access_key_id = "FAKE_access_key_id";
477+
let secret_access_key = "FAKE_secret_access_key";
477478
let region = "fake_us-east-2";
478479
let endpoint = "endpoint33";
479-
let session_token = "fake_session_token";
480-
let location = "s3://bucket/path/file.parquet";
480+
let session_token = "FAKE_session_token";
481+
let location = "s3://bucket/path/FAKE/file.parquet";
481482

482483
let table_url = ListingTableUrl::parse(location)?;
483484
let scheme = table_url.scheme();

datafusion-examples/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ cargo run --example dataframe
5757
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
5858
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
5959
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
60-
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data
61-
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
60+
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
6261
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
6362
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
6463
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
@@ -77,6 +76,7 @@ cargo run --example dataframe
7776
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
7877
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
7978
- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression functions
79+
- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a remote catalog (e.g. over a network)
8080
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF)
8181
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined Scalar Function (UDF)
8282
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF)

datafusion-examples/examples/custom_datasource.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ use datafusion::error::Result;
3030
use datafusion::execution::context::TaskContext;
3131
use datafusion::logical_expr::LogicalPlanBuilder;
3232
use datafusion::physical_expr::EquivalenceProperties;
33+
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
3334
use datafusion::physical_plan::memory::MemoryStream;
3435
use datafusion::physical_plan::{
35-
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
36-
Partitioning, PlanProperties, SendableRecordBatchStream,
36+
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
37+
PlanProperties, SendableRecordBatchStream,
3738
};
3839
use datafusion::prelude::*;
3940

@@ -214,7 +215,8 @@ impl CustomExec {
214215
PlanProperties::new(
215216
eq_properties,
216217
Partitioning::UnknownPartitioning(1),
217-
ExecutionMode::Bounded,
218+
EmissionType::Incremental,
219+
Boundedness::Bounded,
218220
)
219221
}
220222
}

datafusion-examples/examples/dataframe.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717

1818
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1919
use datafusion::arrow::datatypes::{DataType, Field, Schema};
20+
use datafusion::dataframe::DataFrameWriteOptions;
2021
use datafusion::error::Result;
2122
use datafusion::prelude::*;
23+
use datafusion_common::config::CsvOptions;
24+
use datafusion_common::parsers::CompressionTypeVariant;
25+
use datafusion_common::DataFusionError;
2226
use std::fs::File;
2327
use std::io::Write;
2428
use std::sync::Arc;
@@ -29,13 +33,19 @@ use tempfile::tempdir;
2933
/// * [read_parquet]: execute queries against parquet files
3034
/// * [read_csv]: execute queries against csv files
3135
/// * [read_memory]: execute queries against in-memory arrow data
36+
///
37+
/// This example demonstrates the various methods to write out a DataFrame to local storage.
38+
/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example
39+
/// using a remote object store.
40+
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
3241
#[tokio::main]
3342
async fn main() -> Result<()> {
3443
// The SessionContext is the main high level API for interacting with DataFusion
3544
let ctx = SessionContext::new();
3645
read_parquet(&ctx).await?;
3746
read_csv(&ctx).await?;
3847
read_memory(&ctx).await?;
48+
write_out(&ctx).await?;
3949
Ok(())
4050
}
4151

@@ -139,3 +149,60 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> {
139149

140150
Ok(())
141151
}
152+
153+
/// Use the DataFrame API to:
154+
/// 1. Write out a DataFrame to a table
155+
/// 2. Write out a DataFrame to a parquet file
156+
/// 3. Write out a DataFrame to a csv file
157+
/// 4. Write out a DataFrame to a json file
158+
async fn write_out(ctx: &SessionContext) -> std::result::Result<(), DataFusionError> {
159+
let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap();
160+
161+
// Ensure the column names and types match the target table
162+
df = df.with_column_renamed("column1", "tablecol1").unwrap();
163+
164+
ctx.sql(
165+
"create external table
166+
test(tablecol1 varchar)
167+
stored as parquet
168+
location './datafusion-examples/test_table/'",
169+
)
170+
.await?
171+
.collect()
172+
.await?;
173+
174+
// This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c').
175+
// The behavior of write_table depends on the TableProvider's implementation
176+
// of the insert_into method.
177+
df.clone()
178+
.write_table("test", DataFrameWriteOptions::new())
179+
.await?;
180+
181+
df.clone()
182+
.write_parquet(
183+
"./datafusion-examples/test_parquet/",
184+
DataFrameWriteOptions::new(),
185+
None,
186+
)
187+
.await?;
188+
189+
df.clone()
190+
.write_csv(
191+
"./datafusion-examples/test_csv/",
192+
// DataFrameWriteOptions contains options which control how data is written
193+
// such as compression codec
194+
DataFrameWriteOptions::new(),
195+
Some(CsvOptions::default().with_compression(CompressionTypeVariant::GZIP)),
196+
)
197+
.await?;
198+
199+
df.clone()
200+
.write_json(
201+
"./datafusion-examples/test_json/",
202+
DataFrameWriteOptions::new(),
203+
None,
204+
)
205+
.await?;
206+
207+
Ok(())
208+
}

0 commit comments

Comments
 (0)