Skip to content

Arrow2 merge avro #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6f4ad77
Planner code cleanup (#1450)
alamb Dec 15, 2021
1448d97
Fix bug in projection: "column types must match schema types, expecte…
alamb Dec 15, 2021
0052667
Support identifiers with `.` in them (#1449)
alamb Dec 15, 2021
6478a33
Fixes for working with functions in dataframes, additional documentat…
tobyhede Dec 15, 2021
9d31866
support sum/avg agg for decimal, change sum(float32) --> float64 (#1408)
liukun4515 Dec 17, 2021
8193e03
Minimize features (#1399)
carols10cents Dec 18, 2021
0b8bffd
Update roadmap with features completed (#1464)
alamb Dec 18, 2021
35d65fc
fix calculate in many_to_many_hash_partition test. (#1463)
Ted-Jiang Dec 18, 2021
07b2985
Avoid send empty batches for Hash partitioning. (#1459)
Ted-Jiang Dec 19, 2021
b5082e0
minor support mod operation for expr (#1467)
liukun4515 Dec 20, 2021
5ef42eb
Left join could use bitmap for left join instead of Vec<bool> (#1291)
boazberman Dec 21, 2021
5668be7
Add Timezone to Scalar::Time* types, and better timezone awareness …
maxburke Dec 21, 2021
ecfc7d8
Pass local address host so we do not get mismatch between IPv4 and IP…
thinkharderdev Dec 22, 2021
4012713
Fix SortExec discards field metadata on the output schema (#1477)
alamb Dec 23, 2021
68db579
Minor: Rename `predicate_builder` --> `pruning_predicate` for consist…
alamb Dec 23, 2021
233ed7d
Fix duplicated 'cargo run --example parquet_sql' (#1482)
sergey-melnychuk Dec 24, 2021
a551505
add dependbot (#1489)
xudong963 Dec 28, 2021
8d20f14
Workaround build failure: Pin quote to 1.0.10 (#1499)
alamb Dec 29, 2021
91ee5a4
Refactor testing modules (#1491)
hntd187 Dec 29, 2021
7374b18
add indexed fields support to python api (#1502)
nl5887 Dec 31, 2021
72410f6
add rfc for datafusion (#1490)
xudong963 Dec 31, 2021
07f5b3d
Add example on how to query multiple parquet files (#1497)
nitisht Dec 31, 2021
7607ace
Fix ORDER BY on aggregate (#1506)
viirya Jan 1, 2022
bac97fa
remove python (#1518)
jimexist Jan 4, 2022
2fae23f
Fix single_distinct_to_groupby for arbitrary expressions (#1519)
james727 Jan 5, 2022
ecb09d9
Remove one copy of datatype serialization code (#1524)
alamb Jan 6, 2022
847e78a
Fix bugs with nullability during rewrites: Combine `simplify` and `Si…
alamb Jan 8, 2022
8949bc3
Correct typos in README (#1528)
brnnnfx Jan 9, 2022
d6d90e9
Add load test command in tpch.rs. (#1530)
Ted-Jiang Jan 9, 2022
90de12a
Add stddev operator (#1525)
realno Jan 10, 2022
2008b1d
Update docs to note support for VARIANCE and STDDEV (#1543)
alamb Jan 10, 2022
44db376
Merge remote-tracking branch 'origin/master' into arrow2_merge
Igosuki Jan 11, 2022
ca9b485
merge latest datafusion
Igosuki Jan 11, 2022
b9125bc
start migrating avro to arrow2
Igosuki Jan 11, 2022
99fdac3
lints
Igosuki Jan 11, 2022
1b916aa
merge latest datafusion
Igosuki Jan 12, 2022
d611d4d
Fix hash utils
Igosuki Jan 12, 2022
171332f
missing import in hash_utils test with no_collision
Igosuki Jan 12, 2022
4344454
address clippies in root workspace
Igosuki Jan 12, 2022
257a7c5
fix tests #1
Igosuki Jan 12, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: 2
updates:
- package-ecosystem: cargo
directory: "/"
schedule:
interval: weekly
day: sunday
time: "7:00"
open-pull-requests-limit: 10
target-branch: master
labels: [auto-dependencies]
131 changes: 0 additions & 131 deletions .github/workflows/python_build.yml

This file was deleted.

62 changes: 0 additions & 62 deletions .github/workflows/python_test.yaml

This file was deleted.

4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ jobs:
cargo test --no-default-features
cargo run --example csv_sql
cargo run --example parquet_sql
# cargo run --example avro_sql --features=datafusion/avro
#nopass
cargo run --example avro_sql --features=datafusion/avro
env:
CARGO_HOME: "/github/home/.cargo"
CARGO_TARGET_DIR: "/github/home/target"
Expand All @@ -127,6 +128,7 @@ jobs:
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
cd ballista/rust
# snmalloc requires cmake so build without default features
#nopass
cargo test --no-default-features --features sled
env:
CARGO_HOME: "/github/home/.cargo"
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ lto = true
codegen-units = 1

[patch.crates-io]
#arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", rev = "f2c7503bc171a4c75c0af9905823c8795bd17f9b" }
arrow2 = { git = "https://github.com/blaze-init/arrow2.git", branch = "shuffle_ipc" }
parquet2 = { git = "https://github.com/blaze-init/parquet2.git", branch = "meta_new" }
arrow2 = { git = "https://github.com/jorgecarleitao/arrow2.git", rev = "ef7937dfe56033c2cc491482c67587b52cd91554" }
#arrow2 = { git = "https://github.com/blaze-init/arrow2.git", branch = "shuffle_ipc" }
#parquet2 = { git = "https://github.com/blaze-init/parquet2.git", branch = "meta_new" }
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ DataFusion is designed to be extensible at all points. To that end, you can prov

## Rust Version Compatbility

This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.
This crate is tested with the latest stable version of Rust. We do not currently test against other, older versions of the Rust compiler.

# Supported SQL

Expand All @@ -264,9 +264,9 @@ This library currently supports many SQL constructs, including
- `SELECT ... FROM ...` together with any expression
- `ALIAS` to name an expression
- `CAST` to change types, including e.g. `Timestamp(Nanosecond, None)`
- most mathematical unary and binary expressions such as `+`, `/`, `sqrt`, `tan`, `>=`.
- Many mathematical unary and binary expressions such as `+`, `/`, `sqrt`, `tan`, `>=`.
- `WHERE` to filter
- `GROUP BY` together with one of the following aggregations: `MIN`, `MAX`, `COUNT`, `SUM`, `AVG`
- `GROUP BY` together with one of the following aggregations: `MIN`, `MAX`, `COUNT`, `SUM`, `AVG`, `VAR`, `STDDEV` (sample and population)
- `ORDER BY` together with an expression and optional `ASC` or `DESC` and also optional `NULLS FIRST` or `NULLS LAST`

## Supported Functions
Expand Down Expand Up @@ -366,7 +366,7 @@ Please see [Roadmap](docs/source/specification/roadmap.md) for information of wh
There is no formal document describing DataFusion's architecture yet, but the following presentations offer a good overview of its different components and how they interact together.

- (March 2021): The DataFusion architecture is described in _Query Engine Design and the Rust-Based DataFusion in Apache Arrow_: [recording](https://www.youtube.com/watch?v=K6eCAVEk4kU) (DataFusion content starts [~ 15 minutes in](https://www.youtube.com/watch?v=K6eCAVEk4kU&t=875s)) and [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-query-engine-design-and-the-rustbased-datafusion-in-apache-arrow-244161934)
- (Feburary 2021): How DataFusion is used within the Ballista Project is described in \*Ballista: Distributed Compute with Rust and Apache Arrow: [recording](https://www.youtube.com/watch?v=ZZHQaOap9pQ)
- (February 2021): How DataFusion is used within the Ballista Project is described in \*Ballista: Distributed Compute with Rust and Apache Arrow: [recording](https://www.youtube.com/watch?v=ZZHQaOap9pQ)

# Developer's guide

Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ build = "build.rs"
simd = ["datafusion/simd"]

[dependencies]
ahash = "0.7"
ahash = { version = "0.7", default-features = false }
async-trait = "0.1.36"
futures = "0.3"
hashbrown = "0.11"
Expand All @@ -41,7 +41,7 @@ sqlparser = "0.13"
tokio = "1.0"
tonic = "0.6"
uuid = { version = "0.8", features = ["v4"] }
chrono = "0.4"
chrono = { version = "0.4", default-features = false }

arrow-format = { version = "0.3", features = ["flight-data", "flight-service"] }
arrow = { package = "arrow2", version="0.8", features = ["io_ipc", "io_flight"] }
Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ enum AggregateFunction {
COUNT = 4;
APPROX_DISTINCT = 5;
ARRAY_AGG = 6;
VARIANCE=7;
VARIANCE_POP=8;
STDDEV=9;
STDDEV_POP=10;
}

message AggregateExprNode {
Expand Down
19 changes: 15 additions & 4 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Client API for sending requests to executors.

use arrow::io::flight::deserialize_schemas;
use arrow::io::ipc::IpcSchema;
use std::sync::{Arc, Mutex};
use std::{collections::HashMap, pin::Pin};
use std::{
Expand Down Expand Up @@ -121,10 +123,12 @@ impl BallistaClient {
{
Some(flight_data) => {
// convert FlightData to a stream
let schema = Arc::new(Schema::try_from(&flight_data)?);
let (schema, ipc_schema) =
deserialize_schemas(flight_data.data_body.as_slice()).unwrap();
let schema = Arc::new(schema);

// all the remaining stream messages should be dictionary and record batches
Ok(Box::pin(FlightDataStream::new(stream, schema)))
Ok(Box::pin(FlightDataStream::new(stream, schema, ipc_schema)))
}
None => Err(ballista_error(
"Did not receive schema batch from flight server",
Expand All @@ -136,13 +140,19 @@ impl BallistaClient {
struct FlightDataStream {
stream: Mutex<Streaming<FlightData>>,
schema: SchemaRef,
ipc_schema: IpcSchema,
}

impl FlightDataStream {
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
pub fn new(
stream: Streaming<FlightData>,
schema: SchemaRef,
ipc_schema: IpcSchema,
) -> Self {
Self {
stream: Mutex::new(stream),
schema,
ipc_schema,
}
}
}
Expand All @@ -161,10 +171,11 @@ impl Stream for FlightDataStream {
.map_err(|e| ArrowError::from_external_error(Box::new(e)))
.and_then(|flight_data_chunk| {
let hm = HashMap::new();

arrow::io::flight::deserialize_batch(
&flight_data_chunk,
self.schema.clone(),
true,
&self.ipc_schema,
&hm,
)
});
Expand Down
9 changes: 7 additions & 2 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,17 @@ impl ShuffleWriter {
num_rows: 0,
num_bytes: 0,
path: path.to_owned(),
writer: FileWriter::try_new(buffer_writer, schema, WriteOptions::default())?,
writer: FileWriter::try_new(
buffer_writer,
schema,
None,
WriteOptions::default(),
)?,
})
}

fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.writer.write(batch)?;
self.writer.write(batch, None)?;
self.num_batches += 1;
self.num_rows += batch.num_rows() as u64;
let num_bytes: usize = batch
Expand Down
Loading