Skip to content

Commit 0f16849

Browse files
authored
Avoid Arc::clone when serializing physical expressions (#12235)
`Arc::clone` is indispensable when passing shared references between threads. For synchronous code`&` and `&Arc` can be (and often are) used, with the latter being future-compatible, should the code start to need `Arc::clone` (e.g. due to parallelization).
1 parent 827d7e1 commit 0f16849

File tree

3 files changed

+55
-97
lines changed

3 files changed

+55
-97
lines changed

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,7 +1122,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
11221122
let expr = exec
11231123
.expr()
11241124
.iter()
1125-
.map(|expr| serialize_physical_expr(Arc::clone(&expr.0), extension_codec))
1125+
.map(|expr| serialize_physical_expr(&expr.0, extension_codec))
11261126
.collect::<Result<Vec<_>>>()?;
11271127
let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
11281128
return Ok(protobuf::PhysicalPlanNode {
@@ -1163,7 +1163,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
11631163
protobuf::FilterExecNode {
11641164
input: Some(Box::new(input)),
11651165
expr: Some(serialize_physical_expr(
1166-
Arc::clone(exec.predicate()),
1166+
exec.predicate(),
11671167
extension_codec,
11681168
)?),
11691169
default_filter_selectivity: exec.default_selectivity() as u32,
@@ -1220,8 +1220,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
12201220
.on()
12211221
.iter()
12221222
.map(|tuple| {
1223-
let l = serialize_physical_expr(tuple.0.to_owned(), extension_codec)?;
1224-
let r = serialize_physical_expr(tuple.1.to_owned(), extension_codec)?;
1223+
let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1224+
let r = serialize_physical_expr(&tuple.1, extension_codec)?;
12251225
Ok::<_, DataFusionError>(protobuf::JoinOn {
12261226
left: Some(l),
12271227
right: Some(r),
@@ -1233,10 +1233,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
12331233
.filter()
12341234
.as_ref()
12351235
.map(|f| {
1236-
let expression = serialize_physical_expr(
1237-
f.expression().to_owned(),
1238-
extension_codec,
1239-
)?;
1236+
let expression =
1237+
serialize_physical_expr(f.expression(), extension_codec)?;
12401238
let column_indices = f
12411239
.column_indices()
12421240
.iter()
@@ -1294,8 +1292,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
12941292
.on()
12951293
.iter()
12961294
.map(|tuple| {
1297-
let l = serialize_physical_expr(tuple.0.to_owned(), extension_codec)?;
1298-
let r = serialize_physical_expr(tuple.1.to_owned(), extension_codec)?;
1295+
let l = serialize_physical_expr(&tuple.0, extension_codec)?;
1296+
let r = serialize_physical_expr(&tuple.1, extension_codec)?;
12991297
Ok::<_, DataFusionError>(protobuf::JoinOn {
13001298
left: Some(l),
13011299
right: Some(r),
@@ -1307,10 +1305,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
13071305
.filter()
13081306
.as_ref()
13091307
.map(|f| {
1310-
let expression = serialize_physical_expr(
1311-
f.expression().to_owned(),
1312-
extension_codec,
1313-
)?;
1308+
let expression =
1309+
serialize_physical_expr(f.expression(), extension_codec)?;
13141310
let column_indices = f
13151311
.column_indices()
13161312
.iter()
@@ -1348,7 +1344,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
13481344
.map(|expr| {
13491345
Ok(protobuf::PhysicalSortExprNode {
13501346
expr: Some(Box::new(serialize_physical_expr(
1351-
expr.expr.to_owned(),
1347+
&expr.expr,
13521348
extension_codec,
13531349
)?)),
13541350
asc: !expr.options.descending,
@@ -1368,7 +1364,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
13681364
.map(|expr| {
13691365
Ok(protobuf::PhysicalSortExprNode {
13701366
expr: Some(Box::new(serialize_physical_expr(
1371-
expr.expr.to_owned(),
1367+
&expr.expr,
13721368
extension_codec,
13731369
)?)),
13741370
asc: !expr.options.descending,
@@ -1475,14 +1471,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
14751471
.group_expr()
14761472
.null_expr()
14771473
.iter()
1478-
.map(|expr| serialize_physical_expr(expr.0.to_owned(), extension_codec))
1474+
.map(|expr| serialize_physical_expr(&expr.0, extension_codec))
14791475
.collect::<Result<Vec<_>>>()?;
14801476

14811477
let group_expr = exec
14821478
.group_expr()
14831479
.expr()
14841480
.iter()
1485-
.map(|expr| serialize_physical_expr(expr.0.to_owned(), extension_codec))
1481+
.map(|expr| serialize_physical_expr(&expr.0, extension_codec))
14861482
.collect::<Result<Vec<_>>>()?;
14871483

14881484
let limit = exec.limit().map(|value| protobuf::AggLimit {
@@ -1581,7 +1577,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
15811577
if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
15821578
let predicate = exec
15831579
.predicate()
1584-
.map(|pred| serialize_physical_expr(Arc::clone(pred), extension_codec))
1580+
.map(|pred| serialize_physical_expr(pred, extension_codec))
15851581
.transpose()?;
15861582
return Ok(protobuf::PhysicalPlanNode {
15871583
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
@@ -1653,7 +1649,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
16531649
.map(|expr| {
16541650
let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
16551651
expr: Some(Box::new(serialize_physical_expr(
1656-
expr.expr.to_owned(),
1652+
&expr.expr,
16571653
extension_codec,
16581654
)?)),
16591655
asc: !expr.options.descending,
@@ -1722,7 +1718,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
17221718
.map(|expr| {
17231719
let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
17241720
expr: Some(Box::new(serialize_physical_expr(
1725-
expr.expr.to_owned(),
1721+
&expr.expr,
17261722
extension_codec,
17271723
)?)),
17281724
asc: !expr.options.descending,
@@ -1761,10 +1757,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
17611757
.filter()
17621758
.as_ref()
17631759
.map(|f| {
1764-
let expression = serialize_physical_expr(
1765-
f.expression().to_owned(),
1766-
extension_codec,
1767-
)?;
1760+
let expression =
1761+
serialize_physical_expr(f.expression(), extension_codec)?;
17681762
let column_indices = f
17691763
.column_indices()
17701764
.iter()
@@ -1806,13 +1800,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
18061800
let window_expr = exec
18071801
.window_expr()
18081802
.iter()
1809-
.map(|e| serialize_physical_window_expr(Arc::clone(e), extension_codec))
1803+
.map(|e| serialize_physical_window_expr(e, extension_codec))
18101804
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
18111805

18121806
let partition_keys = exec
18131807
.partition_keys
18141808
.iter()
1815-
.map(|e| serialize_physical_expr(Arc::clone(e), extension_codec))
1809+
.map(|e| serialize_physical_expr(e, extension_codec))
18161810
.collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
18171811

18181812
return Ok(protobuf::PhysicalPlanNode {
@@ -1836,13 +1830,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
18361830
let window_expr = exec
18371831
.window_expr()
18381832
.iter()
1839-
.map(|e| serialize_physical_window_expr(Arc::clone(e), extension_codec))
1833+
.map(|e| serialize_physical_window_expr(e, extension_codec))
18401834
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
18411835

18421836
let partition_keys = exec
18431837
.partition_keys
18441838
.iter()
1845-
.map(|e| serialize_physical_expr(Arc::clone(e), extension_codec))
1839+
.map(|e| serialize_physical_expr(e, extension_codec))
18461840
.collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
18471841

18481842
let input_order_mode = match &exec.input_order_mode {
@@ -1886,7 +1880,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
18861880
let expr: PhysicalSortExpr = requirement.to_owned().into();
18871881
let sort_expr = protobuf::PhysicalSortExprNode {
18881882
expr: Some(Box::new(serialize_physical_expr(
1889-
expr.expr.to_owned(),
1883+
&expr.expr,
18901884
extension_codec,
18911885
)?)),
18921886
asc: !expr.options.descending,
@@ -2025,7 +2019,7 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync {
20252019

20262020
fn try_encode_expr(
20272021
&self,
2028-
_node: Arc<dyn PhysicalExpr>,
2022+
_node: &Arc<dyn PhysicalExpr>,
20292023
_buf: &mut Vec<u8>,
20302024
) -> Result<()> {
20312025
not_impl_err!("PhysicalExtensionCodec is not provided")

0 commit comments

Comments
 (0)