Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace OnceLock with LazyLock, update MSRV to 1.80 #12601

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.78"
rust-version = "1.80"
version = "42.0.0"

[workspace.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.78"
rust-version = "1.80"
readme = "README.md"

[dependencies]
Expand Down
36 changes: 19 additions & 17 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::process::ExitCode;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
Expand Down Expand Up @@ -289,25 +289,27 @@ impl ByteUnit {

fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
fn byte_suffixes() -> &'static HashMap<&'static str, ByteUnit> {
static BYTE_SUFFIXES: OnceLock<HashMap<&'static str, ByteUnit>> = OnceLock::new();
BYTE_SUFFIXES.get_or_init(|| {
let mut m = HashMap::new();
m.insert("b", ByteUnit::Byte);
m.insert("k", ByteUnit::KiB);
m.insert("kb", ByteUnit::KiB);
m.insert("m", ByteUnit::MiB);
m.insert("mb", ByteUnit::MiB);
m.insert("g", ByteUnit::GiB);
m.insert("gb", ByteUnit::GiB);
m.insert("t", ByteUnit::TiB);
m.insert("tb", ByteUnit::TiB);
m
})
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
let mut m = HashMap::new();
m.insert("b", ByteUnit::Byte);
m.insert("k", ByteUnit::KiB);
m.insert("kb", ByteUnit::KiB);
m.insert("m", ByteUnit::MiB);
m.insert("mb", ByteUnit::MiB);
m.insert("g", ByteUnit::GiB);
m.insert("gb", ByteUnit::GiB);
m.insert("t", ByteUnit::TiB);
m.insert("tb", ByteUnit::TiB);
m
});
&BYTE_SUFFIXES
}

fn suffix_re() -> &'static regex::Regex {
static SUFFIX_REGEX: OnceLock<regex::Regex> = OnceLock::new();
SUFFIX_REGEX.get_or_init(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap())
static SUFFIX_REGEX: LazyLock<regex::Regex> =
LazyLock::new(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap());
&SUFFIX_REGEX
}

let lower = size.to_lowercase();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.78"
rust-version = "1.80"

[lints]
workspace = true
Expand Down
44 changes: 22 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ mod test {
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

#[test]
fn test_only_scans() {
Expand Down Expand Up @@ -511,31 +511,31 @@ mod test {
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();
static ROW_GROUP_METADATA: LazyLock<Vec<RowGroupMetaData>> = LazyLock::new(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
});

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
/// respectively
fn row_group_metadata() -> &'static [RowGroupMetaData] {
ROW_GROUP_METADATA.get_or_init(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
})
&ROW_GROUP_METADATA
}

/// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
Expand Down
66 changes: 32 additions & 34 deletions datafusion/core/tests/expr_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
use sqlparser::ast::NullTreatment;
/// Tests of using and evaluating `Expr`s outside the context of a LogicalPlan
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

mod parse_sql_expr;
mod simplification;
Expand Down Expand Up @@ -350,39 +350,37 @@ fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
);
}

static TEST_BATCH: OnceLock<RecordBatch> = OnceLock::new();
static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
});

fn test_batch() -> RecordBatch {
TEST_BATCH
.get_or_init(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
})
.clone()
TEST_BATCH.clone()
}
6 changes: 3 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use futures::StreamExt;
use std::any::Any;
use std::num::NonZeroUsize;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};
use tokio::fs::File;

use datafusion::datasource::streaming::StreamingTable;
Expand Down Expand Up @@ -725,15 +725,15 @@ fn maybe_split_batches(
.collect()
}

static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();
static DICT_BATCHES: LazyLock<Vec<RecordBatch>> = LazyLock::new(make_dict_batches);

/// Returns 5 sorted string dictionary batches each with 50 rows with
/// this schema.
///
/// a: Dictionary<Utf8, Int32>,
/// b: Dictionary<Utf8, Int32>,
fn dict_batches() -> Vec<RecordBatch> {
DICT_BATCHES.get_or_init(make_dict_batches).clone()
DICT_BATCHES.clone()
}

fn make_dict_batches() -> Vec<RecordBatch> {
Expand Down
64 changes: 32 additions & 32 deletions datafusion/core/tests/parquet/external_access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_physical_plan::ExecutionPlan;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};
use tempfile::NamedTempFile;

#[tokio::test]
Expand Down Expand Up @@ -369,45 +369,45 @@ struct TestData {
file_size: u64,
}

static TEST_DATA: OnceLock<TestData> = OnceLock::new();
static TEST_DATA: LazyLock<TestData> = LazyLock::new(|| {
let scenario = Scenario::UTF8;
let row_per_group = 5;

/// Return a parquet file with 2 row groups each with 5 rows
fn get_test_data() -> &'static TestData {
TEST_DATA.get_or_init(|| {
let scenario = Scenario::UTF8;
let row_per_group = 5;
let mut temp_file = tempfile::Builder::new()
.prefix("user_access_plan")
.suffix(".parquet")
.tempfile()
.expect("tempfile creation");

let mut temp_file = tempfile::Builder::new()
.prefix("user_access_plan")
.suffix(".parquet")
.tempfile()
.expect("tempfile creation");
let props = WriterProperties::builder()
.set_max_row_group_size(row_per_group)
.build();

let props = WriterProperties::builder()
.set_max_row_group_size(row_per_group)
.build();
let batches = create_data_batch(scenario);
let schema = batches[0].schema();

let batches = create_data_batch(scenario);
let schema = batches[0].schema();
let mut writer =
ArrowWriter::try_new(&mut temp_file, schema.clone(), Some(props)).unwrap();

let mut writer =
ArrowWriter::try_new(&mut temp_file, schema.clone(), Some(props)).unwrap();
for batch in batches {
writer.write(&batch).expect("writing batch");
}
writer.close().unwrap();

for batch in batches {
writer.write(&batch).expect("writing batch");
}
writer.close().unwrap();
let file_name = temp_file.path().to_string_lossy().to_string();
let file_size = temp_file.path().metadata().unwrap().len();

let file_name = temp_file.path().to_string_lossy().to_string();
let file_size = temp_file.path().metadata().unwrap().len();
TestData {
temp_file,
schema,
file_name,
file_size,
}
});

TestData {
temp_file,
schema,
file_name,
file_size,
}
})
/// Return a parquet file with 2 row groups each with 5 rows
fn get_test_data() -> &'static TestData {
&TEST_DATA
}

/// Return the total value of the specified metric name
Expand Down
12 changes: 5 additions & 7 deletions datafusion/expr/src/test/function_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ macro_rules! create_func {
/// Singleton instance of [$UDAF], ensures the UDAF is only created once
/// named STATIC_$(UDAF). For example `STATIC_FirstValue`
#[allow(non_upper_case_globals)]
static [< STATIC_ $UDAF >]: std::sync::OnceLock<std::sync::Arc<crate::AggregateUDF>> =
std::sync::OnceLock::new();
static [< STATIC_ $UDAF >]: std::sync::LazyLock<std::sync::Arc<crate::AggregateUDF>> =
std::sync::LazyLock::new(|| {
std::sync::Arc::new(crate::AggregateUDF::from(<$UDAF>::default()))
});

#[doc = concat!("AggregateFunction that returns a [AggregateUDF](crate::AggregateUDF) for [`", stringify!($UDAF), "`]")]
pub fn $AGGREGATE_UDF_FN() -> std::sync::Arc<crate::AggregateUDF> {
[< STATIC_ $UDAF >]
.get_or_init(|| {
std::sync::Arc::new(crate::AggregateUDF::from(<$UDAF>::default()))
})
.clone()
[< STATIC_ $UDAF >].clone()
}
}
}
Expand Down
12 changes: 5 additions & 7 deletions datafusion/functions-aggregate/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,14 @@ macro_rules! create_func {
/// Singleton instance of [$UDAF], ensures the UDAF is only created once
/// named STATIC_$(UDAF). For example `STATIC_FirstValue`
#[allow(non_upper_case_globals)]
static [< STATIC_ $UDAF >]: std::sync::OnceLock<std::sync::Arc<datafusion_expr::AggregateUDF>> =
std::sync::OnceLock::new();
static [< STATIC_ $UDAF >]: std::sync::LazyLock<std::sync::Arc<datafusion_expr::AggregateUDF>> =
std::sync::LazyLock::new(|| {
std::sync::Arc::new(datafusion_expr::AggregateUDF::from($CREATE))
});

#[doc = concat!("AggregateFunction that returns a [`AggregateUDF`](datafusion_expr::AggregateUDF) for [`", stringify!($UDAF), "`]")]
pub fn $AGGREGATE_UDF_FN() -> std::sync::Arc<datafusion_expr::AggregateUDF> {
[< STATIC_ $UDAF >]
.get_or_init(|| {
std::sync::Arc::new(datafusion_expr::AggregateUDF::from($CREATE))
})
.clone()
[< STATIC_ $UDAF >].clone()
}
}
}
Expand Down
Loading
Loading