Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace OnceLock with LazyLock, update MSRV to 1.80 #11690

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.76"
rust-version = "1.80"
version = "40.0.0"

[workspace.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.76"
rust-version = "1.80"
readme = "README.md"

[dependencies]
Expand Down
20 changes: 8 additions & 12 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::process::ExitCode;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
Expand Down Expand Up @@ -295,9 +295,8 @@ impl ByteUnit {
}

fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
fn byte_suffixes() -> &'static HashMap<&'static str, ByteUnit> {
static BYTE_SUFFIXES: OnceLock<HashMap<&'static str, ByteUnit>> = OnceLock::new();
BYTE_SUFFIXES.get_or_init(|| {
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
let mut m = HashMap::new();
m.insert("b", ByteUnit::Byte);
m.insert("k", ByteUnit::KiB);
Expand All @@ -309,23 +308,20 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
m.insert("t", ByteUnit::TiB);
m.insert("tb", ByteUnit::TiB);
m
})
}
});

fn suffix_re() -> &'static regex::Regex {
static SUFFIX_REGEX: OnceLock<regex::Regex> = OnceLock::new();
SUFFIX_REGEX.get_or_init(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap())
}
static SUFFIX_REGEX: LazyLock<regex::Regex> =
LazyLock::new(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap());

let lower = size.to_lowercase();
if let Some(caps) = suffix_re().captures(&lower) {
if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
let num_str = caps.get(1).unwrap().as_str();
let num = num_str.parse::<usize>().map_err(|_| {
format!("Invalid numeric value in memory pool size '{}'", size)
})?;

let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
let unit = byte_suffixes()
let unit = BYTE_SUFFIXES
.get(suffix)
.ok_or_else(|| format!("Invalid memory pool size '{}'", size))?;
let memory_pool_size = usize::try_from(unit.multiplier())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.76"
rust-version = "1.80"

[lints]
workspace = true
Expand Down
58 changes: 27 additions & 31 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ mod test {
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

#[test]
fn test_only_scans() {
Expand All @@ -358,7 +358,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

// scan all row groups, no selection
Expand All @@ -377,7 +377,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

// skip all row groups, no selection
Expand All @@ -403,7 +403,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

assert_eq!(row_group_indexes, vec![0, 1]);
Expand Down Expand Up @@ -442,7 +442,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

assert_eq!(row_group_indexes, vec![1, 2, 3]);
Expand Down Expand Up @@ -478,7 +478,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
Expand All @@ -504,39 +504,35 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
/// respectively
fn row_group_metadata() -> &'static [RowGroupMetaData] {
ROW_GROUP_METADATA.get_or_init(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
})
}
static ROW_GROUP_METADATA: LazyLock<Vec<RowGroupMetaData>> = LazyLock::new(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
});

/// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
fn get_test_schema_descr() -> SchemaDescPtr {
Expand Down
72 changes: 33 additions & 39 deletions datafusion/core/tests/expr_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
use sqlparser::ast::NullTreatment;
/// Tests of using and evaluating `Expr`s outside the context of a LogicalPlan
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

mod parse_sql_expr;
mod simplification;
Expand Down Expand Up @@ -320,7 +320,7 @@ async fn test_aggregate_ext_null_treatment() {
/// Evaluates the specified expr as an aggregate and compares the result to the
/// expected result.
async fn evaluate_agg_test(expr: Expr, expected_lines: Vec<&str>) {
let batch = test_batch();
let batch = TEST_BATCH.clone();

let ctx = SessionContext::new();
let group_expr = vec![];
Expand All @@ -347,7 +347,7 @@ async fn evaluate_agg_test(expr: Expr, expected_lines: Vec<&str>) {
/// Converts the `Expr` to a `PhysicalExpr`, evaluates it against the provided
/// `RecordBatch` and compares the result to the expected result.
fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
let batch = test_batch();
let batch = TEST_BATCH.clone();
let df_schema = DFSchema::try_from(batch.schema()).unwrap();
let physical_expr = SessionContext::new()
.create_physical_expr(expr, &df_schema)
Expand All @@ -365,39 +365,33 @@ fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
);
}

static TEST_BATCH: OnceLock<RecordBatch> = OnceLock::new();

fn test_batch() -> RecordBatch {
TEST_BATCH
.get_or_init(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
})
.clone()
}
static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
});
22 changes: 9 additions & 13 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use futures::StreamExt;
use std::any::Any;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};
use tokio::fs::File;

use datafusion::datasource::streaming::StreamingTable;
Expand Down Expand Up @@ -568,7 +568,10 @@ impl Scenario {
single_row_batches, ..
} = self
{
batches_byte_size(&maybe_split_batches(dict_batches(), *single_row_batches))
batches_byte_size(&maybe_split_batches(
DICT_BATCHES.clone(),
*single_row_batches,
))
} else {
panic!("Scenario does not support partition size");
}
Expand Down Expand Up @@ -604,7 +607,7 @@ impl Scenario {
} => {
use datafusion::physical_expr::expressions::col;
let batches: Vec<Vec<_>> = std::iter::repeat(maybe_split_batches(
dict_batches(),
DICT_BATCHES.clone(),
*single_row_batches,
))
.take(*partitions)
Expand Down Expand Up @@ -686,18 +689,11 @@ fn maybe_split_batches(
.collect()
}

static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();

/// Returns 5 sorted string dictionary batches each with 50 rows with
/// this schema.
/// 5 sorted string dictionary batches each with 50 rows with this schema.
///
/// a: Dictionary<Utf8, Int32>,
/// b: Dictionary<Utf8, Int32>,
fn dict_batches() -> Vec<RecordBatch> {
DICT_BATCHES.get_or_init(make_dict_batches).clone()
}

fn make_dict_batches() -> Vec<RecordBatch> {
static DICT_BATCHES: LazyLock<Vec<RecordBatch>> = LazyLock::new(|| {
let batch_size = 50;

let mut i = 0;
Expand Down Expand Up @@ -731,7 +727,7 @@ fn make_dict_batches() -> Vec<RecordBatch> {
});

batches
}
});

// How many bytes does the memory from dict_batches consume?
fn batches_byte_size(batches: &[RecordBatch]) -> usize {
Expand Down
Loading
Loading