Skip to content

Commit

Permalink
chore: dependency cleanup (#1150)
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm authored Dec 14, 2024
1 parent b3cf8d1 commit 80c2c56
Show file tree
Hide file tree
Showing 33 changed files with 320 additions and 402 deletions.
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executo
resolver = "2"

[workspace.dependencies]
anyhow = "1"
arrow = { version = "53", features = ["ipc_compression"] }
arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
clap = { version = "4.5", features = ["derive", "cargo"] }
Expand All @@ -40,9 +39,9 @@ tonic-build = { version = "0.12", default-features = false, features = [
"transport",
"prost"
] }
tracing = "0.1.36"
tracing = "0.1"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
ctor = { version = "0.2" }
mimalloc = { version = "0.1" }

Expand All @@ -58,7 +57,6 @@ dashmap = { version = "6.1" }
async-trait = { version = "0.1.4" }
serde = { version = "1.0" }
tokio-stream = { version = "0.1" }
parse_arg = { version = "0.1" }
url = { version = "2.5" }

# cargo build --profile release-lto
Expand Down
1 change: 0 additions & 1 deletion ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ keywords = ["ballista", "cli"]
license = "Apache-2.0"
homepage = "https://github.com/apache/arrow-ballista"
repository = "https://github.com/apache/arrow-ballista"
rust-version = "1.72"
readme = "README.md"

[dependencies]
Expand Down
8 changes: 3 additions & 5 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,26 @@ repository = "https://github.com/apache/arrow-ballista"
readme = "README.md"
authors = ["Apache DataFusion <[email protected]>"]
edition = "2021"
rust-version = "1.72"

[dependencies]
async-trait = { workspace = true }
ballista-core = { path = "../core", version = "0.12.0" }
ballista-executor = { path = "../executor", version = "0.12.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.12.0", optional = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
tempfile = { workspace = true }

tokio = { workspace = true }
url = { workspace = true }

[dev-dependencies]
ballista-executor = { path = "../executor", version = "0.12.0" }
ballista-scheduler = { path = "../scheduler", version = "0.12.0" }
ctor = { workspace = true }
datafusion-proto = { workspace = true }
env_logger = { workspace = true }
rstest = { version = "0.23" }
tempfile = { workspace = true }
tonic = { workspace = true }

[features]
Expand Down
9 changes: 4 additions & 5 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,24 @@ exclude = ["*.proto"]
rustc-args = ["--cfg", "docsrs"]

[features]
build-binary = ["configure_me", "clap"]
docsrs = []
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = ["datafusion/force_hash_collisions"]


[dependencies]
arrow-flight = { workspace = true }
async-trait = { workspace = true }
chrono = { version = "0.4", default-features = false }
clap = { workspace = true }
clap = { workspace = true, optional = true }
configure_me = { workspace = true, optional = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-proto-common = { workspace = true }
futures = { workspace = true }

itertools = "0.13"
log = { workspace = true }
md-5 = { version = "^0.10.0" }
parse_arg = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
Expand All @@ -66,5 +65,5 @@ url = { workspace = true }
tempfile = { workspace = true }

[build-dependencies]
rustc_version = "0.4.0"
rustc_version = "0.4.1"
tonic-build = { workspace = true }
25 changes: 14 additions & 11 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

//! Ballista configuration
use clap::ValueEnum;
use core::fmt;
use std::collections::HashMap;
use std::result;

Expand Down Expand Up @@ -252,30 +250,33 @@ impl datafusion::config::ConfigExtension for BallistaConfig {

// an enum used to configure the scheduler policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
pub enum TaskSchedulingPolicy {
#[default]
PullStaged,
PushStaged,
}

#[cfg(feature = "build-binary")]
impl std::str::FromStr for TaskSchedulingPolicy {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
ValueEnum::from_str(s, true)
clap::ValueEnum::from_str(s, true)
}
}

impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
#[cfg(feature = "build-binary")]
impl configure_me::parse_arg::ParseArgFromStr for TaskSchedulingPolicy {
fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result {
write!(writer, "The scheduler policy for the scheduler")
}
}

// an enum used to configure the log rolling policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
pub enum LogRotationPolicy {
Minutely,
Hourly,
Expand All @@ -284,16 +285,18 @@ pub enum LogRotationPolicy {
Never,
}

#[cfg(feature = "build-binary")]
impl std::str::FromStr for LogRotationPolicy {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
ValueEnum::from_str(s, true)
clap::ValueEnum::from_str(s, true)
}
}

impl parse_arg::ParseArgFromStr for LogRotationPolicy {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
#[cfg(feature = "build-binary")]
impl configure_me::parse_arg::ParseArgFromStr for LogRotationPolicy {
fn describe_type<W: core::fmt::Write>(mut writer: W) -> core::fmt::Result {
write!(writer, "The log rotation policy")
}
}
Expand Down
148 changes: 148 additions & 0 deletions ballista/core/src/diagram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::Result;
use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};

use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> Result<()> {
let write_file = File::create(filename)?;
let mut w = BufWriter::new(&write_file);
writeln!(w, "digraph G {{")?;

// draw stages and entities
for stage in stages {
writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?;
writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?;
let mut id = AtomicUsize::new(0);
build_exec_plan_diagram(
&mut w,
stage.children()[0].as_ref(),
stage.stage_id(),
&mut id,
true,
)?;
writeln!(w, "\t}}")?;
}

// draw relationships
for stage in stages {
let mut id = AtomicUsize::new(0);
build_exec_plan_diagram(
&mut w,
stage.children()[0].as_ref(),
stage.stage_id(),
&mut id,
false,
)?;
}

write!(w, "}}")?;
Ok(())
}

fn build_exec_plan_diagram(
w: &mut BufWriter<&File>,
plan: &dyn ExecutionPlan,
stage_id: usize,
id: &mut AtomicUsize,
draw_entity: bool,
) -> Result<usize> {
let operator_str = if plan.as_any().downcast_ref::<AggregateExec>().is_some() {
"AggregateExec"
} else if plan.as_any().downcast_ref::<SortExec>().is_some() {
"SortExec"
} else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
"ProjectionExec"
} else if plan.as_any().downcast_ref::<HashJoinExec>().is_some() {
"HashJoinExec"
} else if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
"ParquetExec"
} else if plan.as_any().downcast_ref::<CsvExec>().is_some() {
"CsvExec"
} else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
"FilterExec"
} else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
"ShuffleWriterExec"
} else if plan
.as_any()
.downcast_ref::<UnresolvedShuffleExec>()
.is_some()
{
"UnresolvedShuffleExec"
} else if plan
.as_any()
.downcast_ref::<CoalesceBatchesExec>()
.is_some()
{
"CoalesceBatchesExec"
} else if plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
{
"CoalescePartitionsExec"
} else {
println!("Unknown: {plan:?}");
"Unknown"
};

let node_id = id.load(Ordering::SeqCst);
id.store(node_id + 1, Ordering::SeqCst);

if draw_entity {
writeln!(
w,
"\t\tstage_{stage_id}_exec_{node_id} [shape=box, label=\"{operator_str}\"];"
)?;
}
for child in plan.children() {
if let Some(shuffle) = child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
if !draw_entity {
writeln!(
w,
"\tstage_{}_exec_1 -> stage_{}_exec_{};",
shuffle.stage_id, stage_id, node_id
)?;
}
} else {
// relationships within same entity
let child_id =
build_exec_plan_diagram(w, child.as_ref(), stage_id, id, draw_entity)?;
if draw_entity {
writeln!(
w,
"\t\tstage_{stage_id}_exec_{child_id} -> stage_{stage_id}_exec_{node_id};"
)?;
}
}
}
Ok(node_id)
}
Loading

0 comments on commit 80c2c56

Please sign in to comment.