Skip to content

Commit

Permalink
Change OnceLock to LazyLock, update MSRV to 1.80
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi committed Sep 24, 2024
1 parent 64a3896 commit cdeeae7
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 182 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.78"
rust-version = "1.80"
version = "42.0.0"

[workspace.dependencies]
Expand Down
34 changes: 17 additions & 17 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::process::ExitCode;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
Expand Down Expand Up @@ -289,25 +289,25 @@ impl ByteUnit {

fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
fn byte_suffixes() -> &'static HashMap<&'static str, ByteUnit> {
static BYTE_SUFFIXES: OnceLock<HashMap<&'static str, ByteUnit>> = OnceLock::new();
BYTE_SUFFIXES.get_or_init(|| {
let mut m = HashMap::new();
m.insert("b", ByteUnit::Byte);
m.insert("k", ByteUnit::KiB);
m.insert("kb", ByteUnit::KiB);
m.insert("m", ByteUnit::MiB);
m.insert("mb", ByteUnit::MiB);
m.insert("g", ByteUnit::GiB);
m.insert("gb", ByteUnit::GiB);
m.insert("t", ByteUnit::TiB);
m.insert("tb", ByteUnit::TiB);
m
})
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
let mut m = HashMap::new();
m.insert("b", ByteUnit::Byte);
m.insert("k", ByteUnit::KiB);
m.insert("kb", ByteUnit::KiB);
m.insert("m", ByteUnit::MiB);
m.insert("mb", ByteUnit::MiB);
m.insert("g", ByteUnit::GiB);
m.insert("gb", ByteUnit::GiB);
m.insert("t", ByteUnit::TiB);
m.insert("tb", ByteUnit::TiB);
m
});
}

fn suffix_re() -> &'static regex::Regex {
static SUFFIX_REGEX: OnceLock<regex::Regex> = OnceLock::new();
SUFFIX_REGEX.get_or_init(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap())
static SUFFIX_REGEX: LazyLock<regex::Regex> =
LazyLock::new(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap());
}

let lower = size.to_lowercase();
Expand Down
44 changes: 22 additions & 22 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ mod test {
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

#[test]
fn test_only_scans() {
Expand Down Expand Up @@ -511,31 +511,31 @@ mod test {
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();
static ROW_GROUP_METADATA: LazyLock<Vec<RowGroupMetaData>> = LazyLock::new(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
});

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
/// respectively
fn row_group_metadata() -> &'static [RowGroupMetaData] {
ROW_GROUP_METADATA.get_or_init(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
})
&ROW_GROUP_METADATA
}

/// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
Expand Down
66 changes: 32 additions & 34 deletions datafusion/core/tests/expr_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
use sqlparser::ast::NullTreatment;
/// Tests of using and evaluating `Expr`s outside the context of a LogicalPlan
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

mod parse_sql_expr;
mod simplification;
Expand Down Expand Up @@ -350,39 +350,37 @@ fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
);
}

static TEST_BATCH: OnceLock<RecordBatch> = OnceLock::new();
static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
});

fn test_batch() -> RecordBatch {
TEST_BATCH
.get_or_init(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
})
.clone()
TEST_BATCH.clone()
}
6 changes: 3 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use futures::StreamExt;
use std::any::Any;
use std::num::NonZeroUsize;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};
use tokio::fs::File;

use datafusion::datasource::streaming::StreamingTable;
Expand Down Expand Up @@ -725,15 +725,15 @@ fn maybe_split_batches(
.collect()
}

static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();
static DICT_BATCHES: LazyLock<Vec<RecordBatch>> = LazyLock::new(make_dict_batches);

/// Returns 5 sorted string dictionary batches each with 50 rows with
/// this schema.
///
/// a: Dictionary<Utf8, Int32>,
/// b: Dictionary<Utf8, Int32>,
fn dict_batches() -> Vec<RecordBatch> {
DICT_BATCHES.get_or_init(make_dict_batches).clone()
DICT_BATCHES.clone()
}

fn make_dict_batches() -> Vec<RecordBatch> {
Expand Down
64 changes: 32 additions & 32 deletions datafusion/core/tests/parquet/external_access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_physical_plan::ExecutionPlan;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};
use tempfile::NamedTempFile;

#[tokio::test]
Expand Down Expand Up @@ -369,45 +369,45 @@ struct TestData {
file_size: u64,
}

static TEST_DATA: OnceLock<TestData> = OnceLock::new();
static TEST_DATA: LazyLock<TestData> = LazyLock::new(|| {
let scenario = Scenario::UTF8;
let row_per_group = 5;

/// Return a parquet file with 2 row groups each with 5 rows
fn get_test_data() -> &'static TestData {
TEST_DATA.get_or_init(|| {
let scenario = Scenario::UTF8;
let row_per_group = 5;
let mut temp_file = tempfile::Builder::new()
.prefix("user_access_plan")
.suffix(".parquet")
.tempfile()
.expect("tempfile creation");

let mut temp_file = tempfile::Builder::new()
.prefix("user_access_plan")
.suffix(".parquet")
.tempfile()
.expect("tempfile creation");
let props = WriterProperties::builder()
.set_max_row_group_size(row_per_group)
.build();

let props = WriterProperties::builder()
.set_max_row_group_size(row_per_group)
.build();
let batches = create_data_batch(scenario);
let schema = batches[0].schema();

let batches = create_data_batch(scenario);
let schema = batches[0].schema();
let mut writer =
ArrowWriter::try_new(&mut temp_file, schema.clone(), Some(props)).unwrap();

let mut writer =
ArrowWriter::try_new(&mut temp_file, schema.clone(), Some(props)).unwrap();
for batch in batches {
writer.write(&batch).expect("writing batch");
}
writer.close().unwrap();

for batch in batches {
writer.write(&batch).expect("writing batch");
}
writer.close().unwrap();
let file_name = temp_file.path().to_string_lossy().to_string();
let file_size = temp_file.path().metadata().unwrap().len();

let file_name = temp_file.path().to_string_lossy().to_string();
let file_size = temp_file.path().metadata().unwrap().len();
TestData {
temp_file,
schema,
file_name,
file_size,
}
});

TestData {
temp_file,
schema,
file_name,
file_size,
}
})
/// Return a parquet file with 2 row groups each with 5 rows
fn get_test_data() -> &'static TestData {
&TEST_DATA
}

/// Return the total value of the specified metric name
Expand Down
12 changes: 5 additions & 7 deletions datafusion/expr/src/test/function_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ macro_rules! create_func {
/// Singleton instance of [$UDAF], ensures the UDAF is only created once
/// named STATIC_$(UDAF). For example `STATIC_FirstValue`
#[allow(non_upper_case_globals)]
static [< STATIC_ $UDAF >]: std::sync::OnceLock<std::sync::Arc<crate::AggregateUDF>> =
std::sync::OnceLock::new();
static [< STATIC_ $UDAF >]: std::sync::LazyLock<std::sync::Arc<crate::AggregateUDF>> =
std::sync::LazyLock::new(|| {
std::sync::Arc::new(crate::AggregateUDF::from(<$UDAF>::default()))
});

#[doc = concat!("AggregateFunction that returns a [AggregateUDF](crate::AggregateUDF) for [`", stringify!($UDAF), "`]")]
pub fn $AGGREGATE_UDF_FN() -> std::sync::Arc<crate::AggregateUDF> {
[< STATIC_ $UDAF >]
.get_or_init(|| {
std::sync::Arc::new(crate::AggregateUDF::from(<$UDAF>::default()))
})
.clone()
[< STATIC_ $UDAF >].clone()
}
}
}
Expand Down
12 changes: 5 additions & 7 deletions datafusion/functions-aggregate/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,14 @@ macro_rules! create_func {
/// Singleton instance of [$UDAF], ensures the UDAF is only created once
/// named STATIC_$(UDAF). For example `STATIC_FirstValue`
#[allow(non_upper_case_globals)]
static [< STATIC_ $UDAF >]: std::sync::OnceLock<std::sync::Arc<datafusion_expr::AggregateUDF>> =
std::sync::OnceLock::new();
static [< STATIC_ $UDAF >]: std::sync::LazyLock<std::sync::Arc<datafusion_expr::AggregateUDF>> =
std::sync::LazyLock::new(|| {
std::sync::Arc::new(datafusion_expr::AggregateUDF::from($CREATE))
});

#[doc = concat!("AggregateFunction that returns a [`AggregateUDF`](datafusion_expr::AggregateUDF) for [`", stringify!($UDAF), "`]")]
pub fn $AGGREGATE_UDF_FN() -> std::sync::Arc<datafusion_expr::AggregateUDF> {
[< STATIC_ $UDAF >]
.get_or_init(|| {
std::sync::Arc::new(datafusion_expr::AggregateUDF::from($CREATE))
})
.clone()
[< STATIC_ $UDAF >].clone()
}
}
}
Expand Down
16 changes: 7 additions & 9 deletions datafusion/functions-nested/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,17 @@ macro_rules! create_func {
/// Singleton instance of [`$UDF`], ensures the UDF is only created once
/// named STATIC_$(UDF). For example `STATIC_ArrayToString`
#[allow(non_upper_case_globals)]
static [< STATIC_ $UDF >]: std::sync::OnceLock<std::sync::Arc<datafusion_expr::ScalarUDF>> =
std::sync::OnceLock::new();
static [< STATIC_ $UDF >]: std::sync::LazyLock<std::sync::Arc<datafusion_expr::ScalarUDF>> =
std::sync::LazyLock::new(|| {
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
<$UDF>::new(),
))
});

#[doc = concat!("ScalarFunction that returns a [`ScalarUDF`](datafusion_expr::ScalarUDF) for ")]
#[doc = stringify!($UDF)]
pub fn $SCALAR_UDF_FN() -> std::sync::Arc<datafusion_expr::ScalarUDF> {
[< STATIC_ $UDF >]
.get_or_init(|| {
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
<$UDF>::new(),
))
})
.clone()
[< STATIC_ $UDF >].clone()
}
}
};
Expand Down
Loading

0 comments on commit cdeeae7

Please sign in to comment.