Skip to content

Commit 8216e32

Browse files
authored
[MINOR]: Fix some minor silent bugs (#11127)
1 parent ff116c3 commit 8216e32

File tree

4 files changed

+38
-32
lines changed

4 files changed

+38
-32
lines changed

datafusion/core/tests/fifo/mod.rs

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -217,17 +217,6 @@ mod unix_test {
217217
.set_bool("datafusion.execution.coalesce_batches", false)
218218
.with_target_partitions(1);
219219
let ctx = SessionContext::new_with_config(config);
220-
// Tasks
221-
let mut tasks: Vec<JoinHandle<()>> = vec![];
222-
223-
// Join filter
224-
let a1_iter = 0..TEST_DATA_SIZE;
225-
// Join key
226-
let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
227-
let lines = a1_iter
228-
.zip(a2_iter)
229-
.map(|(a1, a2)| format!("{a1},{a2}\n"))
230-
.collect::<Vec<_>>();
231220

232221
// Create a new temporary FIFO file
233222
let tmp_dir = TempDir::new()?;
@@ -238,22 +227,6 @@ mod unix_test {
238227
// Create a mutex for tracking if the right input source is waiting for data.
239228
let waiting = Arc::new(AtomicBool::new(true));
240229

241-
// Create writing threads for the left and right FIFO files
242-
tasks.push(create_writing_thread(
243-
left_fifo.clone(),
244-
"a1,a2\n".to_owned(),
245-
lines.clone(),
246-
waiting.clone(),
247-
TEST_BATCH_SIZE,
248-
));
249-
tasks.push(create_writing_thread(
250-
right_fifo.clone(),
251-
"a1,a2\n".to_owned(),
252-
lines.clone(),
253-
waiting.clone(),
254-
TEST_BATCH_SIZE,
255-
));
256-
257230
// Create schema
258231
let schema = Arc::new(Schema::new(vec![
259232
Field::new("a1", DataType::UInt32, false),
@@ -264,10 +237,10 @@ mod unix_test {
264237
let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
265238

266239
// Set unbounded sorted files read configuration
267-
let provider = fifo_table(schema.clone(), left_fifo, order.clone());
240+
let provider = fifo_table(schema.clone(), left_fifo.clone(), order.clone());
268241
ctx.register_table("left", provider)?;
269242

270-
let provider = fifo_table(schema.clone(), right_fifo, order);
243+
let provider = fifo_table(schema.clone(), right_fifo.clone(), order);
271244
ctx.register_table("right", provider)?;
272245

273246
// Execute the query, with no matching rows. (since key is modulus 10)
@@ -287,6 +260,34 @@ mod unix_test {
287260
.await?;
288261
let mut stream = df.execute_stream().await?;
289262
let mut operations = vec![];
263+
264+
// Tasks
265+
let mut tasks: Vec<JoinHandle<()>> = vec![];
266+
267+
// Join filter
268+
let a1_iter = 0..TEST_DATA_SIZE;
269+
// Join key
270+
let a2_iter = (0..TEST_DATA_SIZE).map(|x| x % 10);
271+
let lines = a1_iter
272+
.zip(a2_iter)
273+
.map(|(a1, a2)| format!("{a1},{a2}\n"))
274+
.collect::<Vec<_>>();
275+
276+
// Create writing threads for the left and right FIFO files
277+
tasks.push(create_writing_thread(
278+
left_fifo,
279+
"a1,a2\n".to_owned(),
280+
lines.clone(),
281+
waiting.clone(),
282+
TEST_BATCH_SIZE,
283+
));
284+
tasks.push(create_writing_thread(
285+
right_fifo,
286+
"a1,a2\n".to_owned(),
287+
lines.clone(),
288+
waiting.clone(),
289+
TEST_BATCH_SIZE,
290+
));
290291
// Partial.
291292
while let Some(Ok(batch)) = stream.next().await {
292293
waiting.store(false, Ordering::SeqCst);

datafusion/core/tests/tpcds_planning.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1044,7 +1044,10 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> {
10441044
for table in &tables {
10451045
ctx.register_table(
10461046
table.name.as_str(),
1047-
Arc::new(MemTable::try_new(Arc::new(table.schema.clone()), vec![])?),
1047+
Arc::new(MemTable::try_new(
1048+
Arc::new(table.schema.clone()),
1049+
vec![vec![]],
1050+
)?),
10481051
)?;
10491052
}
10501053

datafusion/physical-expr/src/partitioning.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ impl Partitioning {
152152
match required {
153153
Distribution::UnspecifiedDistribution => true,
154154
Distribution::SinglePartition if self.partition_count() == 1 => true,
155+
// When partition count is 1, hash requirement is satisfied.
156+
Distribution::HashPartitioned(_) if self.partition_count() == 1 => true,
155157
Distribution::HashPartitioned(required_exprs) => {
156158
match self {
157159
// Here we do not check the partition count for hash partitioning and assumes the partition count
@@ -290,7 +292,7 @@ mod tests {
290292
assert_eq!(result, (true, false, false, false, false))
291293
}
292294
Distribution::HashPartitioned(_) => {
293-
assert_eq!(result, (false, false, false, true, false))
295+
assert_eq!(result, (true, false, false, true, false))
294296
}
295297
}
296298
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ impl ExecutionPlan for AggregateExec {
675675
vec![Distribution::UnspecifiedDistribution]
676676
}
677677
AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned => {
678-
vec![Distribution::HashPartitioned(self.output_group_expr())]
678+
vec![Distribution::HashPartitioned(self.group_by.input_exprs())]
679679
}
680680
AggregateMode::Final | AggregateMode::Single => {
681681
vec![Distribution::SinglePartition]

0 commit comments

Comments
 (0)