Skip to content

Commit 8d48adf

Browse files
committed
Change back to non async for try_into_logical_plan
1 parent 7559c44 commit 8d48adf

File tree

6 files changed

+21
-22
lines changed

6 files changed

+21
-22
lines changed

datafusion/core/src/datasource/datasource.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,5 +86,5 @@ pub trait TableProvider: Sync + Send {
8686
#[async_trait]
8787
pub trait TableProviderFactory: Sync + Send {
8888
/// Create a TableProvider with the given url
89-
async fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;
89+
fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;
9090
}

datafusion/core/src/execution/context.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ impl SessionContext {
429429
cmd.file_type
430430
))
431431
})?;
432-
let table = (*factory).create(cmd.location.as_str()).await?;
432+
let table = (*factory).create(cmd.location.as_str())?;
433433
self.register_table(cmd.name.as_str(), table)?;
434434
let plan = LogicalPlanBuilder::empty(false).build()?;
435435
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))

datafusion/core/src/test_util.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -328,10 +328,7 @@ pub struct TestTableFactory {}
328328

329329
#[async_trait]
330330
impl TableProviderFactory for TestTableFactory {
331-
async fn create(
332-
&self,
333-
url: &str,
334-
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
331+
fn create(&self, url: &str) -> datafusion_common::Result<Arc<dyn TableProvider>> {
335332
Ok(Arc::new(TestTableProvider {
336333
url: url.to_string(),
337334
}))

datafusion/proto/src/bytes/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ pub async fn logical_plan_from_bytes_with_extension_codec(
164164
let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
165165
DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e))
166166
})?;
167-
protobuf.try_into_logical_plan(ctx, extension_codec).await
167+
protobuf.try_into_logical_plan(ctx, extension_codec)
168168
}
169169

170170
#[derive(Debug)]
@@ -189,7 +189,7 @@ impl LogicalExtensionCodec for DefaultExtensionCodec {
189189
))
190190
}
191191

192-
async fn try_decode_table_provider(
192+
fn try_decode_table_provider(
193193
&self,
194194
_buf: &[u8],
195195
_schema: SchemaRef,

datafusion/proto/src/lib.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ mod roundtrip_tests {
172172
))
173173
}
174174

175-
async fn try_decode_table_provider(
175+
fn try_decode_table_provider(
176176
&self,
177177
buf: &[u8],
178178
_schema: SchemaRef,
@@ -189,7 +189,7 @@ mod roundtrip_tests {
189189
.get("testtable")
190190
.expect("Unable to find testtable factory")
191191
.clone();
192-
let provider = (*factory).create(msg.url.as_str()).await?;
192+
let provider = (*factory).create(msg.url.as_str())?;
193193
Ok(provider)
194194
}
195195

@@ -431,7 +431,7 @@ mod roundtrip_tests {
431431
}
432432
}
433433

434-
async fn try_decode_table_provider(
434+
fn try_decode_table_provider(
435435
&self,
436436
_buf: &[u8],
437437
_schema: SchemaRef,

datafusion/proto/src/logical_plan.rs

+13-11
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
8686
B: BufMut,
8787
Self: Sized;
8888

89-
async fn try_into_logical_plan(
89+
fn try_into_logical_plan(
9090
&self,
9191
ctx: &SessionContext,
9292
extension_codec: &dyn LogicalExtensionCodec,
@@ -130,7 +130,7 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
130130
buf: &mut Vec<u8>,
131131
) -> Result<(), DataFusionError>;
132132

133-
async fn try_decode_table_provider(
133+
fn try_decode_table_provider(
134134
&self,
135135
buf: &[u8],
136136
schema: SchemaRef,
@@ -170,7 +170,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
170170
))
171171
}
172172

173-
async fn try_decode_table_provider(
173+
fn try_decode_table_provider(
174174
&self,
175175
_buf: &[u8],
176176
_schema: SchemaRef,
@@ -196,7 +196,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
196196
macro_rules! into_logical_plan {
197197
($PB:expr, $CTX:expr, $CODEC:expr) => {{
198198
if let Some(field) = $PB.as_ref() {
199-
field.as_ref().try_into_logical_plan($CTX, $CODEC).await
199+
field.as_ref().try_into_logical_plan($CTX, $CODEC)
200200
} else {
201201
Err(proto_error("Missing required field in protobuf"))
202202
}
@@ -301,7 +301,7 @@ impl AsLogicalPlan for LogicalPlanNode {
301301
})
302302
}
303303

304-
async fn try_into_logical_plan(
304+
fn try_into_logical_plan(
305305
&self,
306306
ctx: &SessionContext,
307307
extension_codec: &dyn LogicalExtensionCodec,
@@ -487,9 +487,11 @@ impl AsLogicalPlan for LogicalPlanNode {
487487
.iter()
488488
.map(|expr| parse_expr(expr, ctx))
489489
.collect::<Result<Vec<_>, _>>()?;
490-
let provider = extension_codec
491-
.try_decode_table_provider(&scan.custom_table_data, schema, ctx)
492-
.await?;
490+
let provider = extension_codec.try_decode_table_provider(
491+
&scan.custom_table_data,
492+
schema,
493+
ctx,
494+
)?;
493495

494496
LogicalPlanBuilder::scan_with_filters(
495497
&scan.table_name,
@@ -591,7 +593,7 @@ impl AsLogicalPlan for LogicalPlanNode {
591593
.input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
592594
"Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
593595
)))?
594-
.try_into_logical_plan(ctx, extension_codec).await?;
596+
.try_into_logical_plan(ctx, extension_codec)?;
595597
let definition = if !create_view.definition.is_empty() {
596598
Some(create_view.definition.clone())
597599
} else {
@@ -716,7 +718,7 @@ impl AsLogicalPlan for LogicalPlanNode {
716718
LogicalPlanType::Union(union) => {
717719
let mut input_plans: Vec<LogicalPlan> = vec![];
718720
for i in union.inputs.iter() {
719-
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
721+
let res = i.try_into_logical_plan(ctx, extension_codec)?;
720722
input_plans.push(res);
721723
}
722724

@@ -744,7 +746,7 @@ impl AsLogicalPlan for LogicalPlanNode {
744746
LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
745747
let mut input_plans: Vec<LogicalPlan> = vec![];
746748
for i in inputs.iter() {
747-
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
749+
let res = i.try_into_logical_plan(ctx, extension_codec)?;
748750
input_plans.push(res);
749751
}
750752

0 commit comments

Comments
 (0)