Skip to content

Commit

Permalink
Fix apache#11687: Replace all uses of OnceLock with LazyLock
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafferty97 committed Jul 28, 2024
1 parent ebad5bf commit b5c8724
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 225 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.76"
rust-version = "1.80"
version = "40.0.0"

[workspace.dependencies]
Expand Down
20 changes: 8 additions & 12 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::process::ExitCode;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
Expand Down Expand Up @@ -295,9 +295,8 @@ impl ByteUnit {
}

fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
fn byte_suffixes() -> &'static HashMap<&'static str, ByteUnit> {
static BYTE_SUFFIXES: OnceLock<HashMap<&'static str, ByteUnit>> = OnceLock::new();
BYTE_SUFFIXES.get_or_init(|| {
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
let mut m = HashMap::new();
m.insert("b", ByteUnit::Byte);
m.insert("k", ByteUnit::KiB);
Expand All @@ -309,23 +308,20 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
m.insert("t", ByteUnit::TiB);
m.insert("tb", ByteUnit::TiB);
m
})
}
});

fn suffix_re() -> &'static regex::Regex {
static SUFFIX_REGEX: OnceLock<regex::Regex> = OnceLock::new();
SUFFIX_REGEX.get_or_init(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap())
}
static SUFFIX_REGEX: LazyLock<regex::Regex> =
LazyLock::new(|| regex::Regex::new(r"^(-?[0-9]+)([a-z]+)?$").unwrap());

let lower = size.to_lowercase();
if let Some(caps) = suffix_re().captures(&lower) {
if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
let num_str = caps.get(1).unwrap().as_str();
let num = num_str.parse::<usize>().map_err(|_| {
format!("Invalid numeric value in memory pool size '{}'", size)
})?;

let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
let unit = byte_suffixes()
let unit = BYTE_SUFFIXES
.get(suffix)
.ok_or_else(|| format!("Invalid memory pool size '{}'", size))?;
let memory_pool_size = usize::try_from(unit.multiplier())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.76"
rust-version = "1.80"

[lints]
workspace = true
Expand Down
58 changes: 27 additions & 31 deletions datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ mod test {
use parquet::basic::LogicalType;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

#[test]
fn test_only_scans() {
Expand All @@ -358,7 +358,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

// scan all row groups, no selection
Expand All @@ -377,7 +377,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

// skip all row groups, no selection
Expand All @@ -403,7 +403,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

assert_eq!(row_group_indexes, vec![0, 1]);
Expand Down Expand Up @@ -442,7 +442,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let row_selection = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap();

assert_eq!(row_group_indexes, vec![1, 2, 3]);
Expand Down Expand Up @@ -478,7 +478,7 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
Expand All @@ -504,39 +504,35 @@ mod test {

let row_group_indexes = access_plan.row_group_indexes();
let err = access_plan
.into_overall_row_selection(row_group_metadata())
.into_overall_row_selection(&ROW_GROUP_METADATA)
.unwrap_err()
.to_string();
assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
}

static ROW_GROUP_METADATA: OnceLock<Vec<RowGroupMetaData>> = OnceLock::new();

/// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
/// respectively
fn row_group_metadata() -> &'static [RowGroupMetaData] {
ROW_GROUP_METADATA.get_or_init(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
})
}
static ROW_GROUP_METADATA: LazyLock<Vec<RowGroupMetaData>> = LazyLock::new(|| {
let schema_descr = get_test_schema_descr();
let row_counts = [10, 20, 30, 40];

row_counts
.into_iter()
.map(|num_rows| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
});

/// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
fn get_test_schema_descr() -> SchemaDescPtr {
Expand Down
72 changes: 33 additions & 39 deletions datafusion/core/tests/expr_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
use sqlparser::ast::NullTreatment;
/// Tests of using and evaluating `Expr`s outside the context of a LogicalPlan
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};

mod parse_sql_expr;
mod simplification;
Expand Down Expand Up @@ -320,7 +320,7 @@ async fn test_aggregate_ext_null_treatment() {
/// Evaluates the specified expr as an aggregate and compares the result to the
/// expected result.
async fn evaluate_agg_test(expr: Expr, expected_lines: Vec<&str>) {
let batch = test_batch();
let batch = TEST_BATCH.clone();

let ctx = SessionContext::new();
let group_expr = vec![];
Expand All @@ -347,7 +347,7 @@ async fn evaluate_agg_test(expr: Expr, expected_lines: Vec<&str>) {
/// Converts the `Expr` to a `PhysicalExpr`, evaluates it against the provided
/// `RecordBatch` and compares the result to the expected result.
fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
let batch = test_batch();
let batch = TEST_BATCH.clone();
let df_schema = DFSchema::try_from(batch.schema()).unwrap();
let physical_expr = SessionContext::new()
.create_physical_expr(expr, &df_schema)
Expand All @@ -365,39 +365,33 @@ fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
);
}

static TEST_BATCH: OnceLock<RecordBatch> = OnceLock::new();

fn test_batch() -> RecordBatch {
TEST_BATCH
.get_or_init(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
})
.clone()
}
static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
let int_array: ArrayRef =
Arc::new(Int64Array::from_iter(vec![Some(10), None, Some(5)]));

// { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
Arc::new(Field::new("a", DataType::Utf8, false)),
Arc::new(StringArray::from(vec![
"2021-02-01",
"2021-02-02",
"2021-02-03",
])) as _,
)]));

// ["one"] ["two", "three", "four"] ["five"]
let mut builder = ListBuilder::new(StringBuilder::new());
builder.append_value([Some("one")]);
builder.append_value([Some("two"), Some("three"), Some("four")]);
builder.append_value([Some("five")]);
let list_array: ArrayRef = Arc::new(builder.finish());

RecordBatch::try_from_iter(vec![
("id", string_array),
("i", int_array),
("props", struct_array),
("list", list_array),
])
.unwrap()
});
22 changes: 9 additions & 13 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use futures::StreamExt;
use std::any::Any;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, LazyLock};
use tokio::fs::File;

use datafusion::datasource::streaming::StreamingTable;
Expand Down Expand Up @@ -568,7 +568,10 @@ impl Scenario {
single_row_batches, ..
} = self
{
batches_byte_size(&maybe_split_batches(dict_batches(), *single_row_batches))
batches_byte_size(&maybe_split_batches(
DICT_BATCHES.clone(),
*single_row_batches,
))
} else {
panic!("Scenario does not support partition size");
}
Expand Down Expand Up @@ -604,7 +607,7 @@ impl Scenario {
} => {
use datafusion::physical_expr::expressions::col;
let batches: Vec<Vec<_>> = std::iter::repeat(maybe_split_batches(
dict_batches(),
DICT_BATCHES.clone(),
*single_row_batches,
))
.take(*partitions)
Expand Down Expand Up @@ -686,18 +689,11 @@ fn maybe_split_batches(
.collect()
}

static DICT_BATCHES: OnceLock<Vec<RecordBatch>> = OnceLock::new();

/// Returns 5 sorted string dictionary batches each with 50 rows with
/// this schema.
/// 5 sorted string dictionary batches each with 50 rows with this schema.
///
/// a: Dictionary<Utf8, Int32>,
/// b: Dictionary<Utf8, Int32>,
fn dict_batches() -> Vec<RecordBatch> {
DICT_BATCHES.get_or_init(make_dict_batches).clone()
}

fn make_dict_batches() -> Vec<RecordBatch> {
static DICT_BATCHES: LazyLock<Vec<RecordBatch>> = LazyLock::new(|| {
let batch_size = 50;

let mut i = 0;
Expand Down Expand Up @@ -731,7 +727,7 @@ fn make_dict_batches() -> Vec<RecordBatch> {
});

batches
}
});

// How many bytes does the memory from dict_batches consume?
fn batches_byte_size(batches: &[RecordBatch]) -> usize {
Expand Down
Loading

0 comments on commit b5c8724

Please sign in to comment.