Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Nov 28, 2024
1 parent 4d48597 commit 54658e6
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,19 @@ private static Map<Integer, Integer> computeExchangeSenderNum(PipelineDistribute
return senderNum;
}

private static void setMultiCastDestinationThrift(PipelineDistributedPlan fragmentPlan) {
private static void setMultiCastDestinationThriftIfNotSet(PipelineDistributedPlan fragmentPlan) {
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) fragmentPlan.getFragmentJob().getFragment().getSink();
List<List<TPlanFragmentDestination>> destinationList = multiCastDataSink.getDestinations();

List<DataStreamSink> dataStreamSinks = multiCastDataSink.getDataStreamSinks();
for (int i = 0; i < dataStreamSinks.size(); i++) {
DataStreamSink realSink = dataStreamSinks.get(i);
List<TPlanFragmentDestination> destinations = destinationList.get(i);
if (!destinations.isEmpty()) {
// we should only set destination only once,
// because all backends share the same MultiCastDataSink object
continue;
}
DataStreamSink realSink = dataStreamSinks.get(i);
for (Entry<DataSink, List<AssignedJob>> kv : fragmentPlan.getDestinations().entrySet()) {
DataSink sink = kv.getKey();
if (sink == realSink) {
Expand Down Expand Up @@ -318,7 +323,7 @@ private static TPipelineFragmentParams fragmentToThriftIfAbsent(
List<TPlanFragmentDestination> nonMultiCastDestinations;
if (fragment.getSink() instanceof MultiCastDataSink) {
nonMultiCastDestinations = Lists.newArrayList();
setMultiCastDestinationThrift(fragmentPlan);
setMultiCastDestinationThriftIfNotSet(fragmentPlan);
} else {
nonMultiCastDestinations = nonMultiCastDestinationToThrift(fragmentPlan);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_multicast_sink") {
multi_sql """
drop table if exists table_1_undef_partitions2_keys3_properties4_distributed_by5;
CREATE TABLE `table_1_undef_partitions2_keys3_properties4_distributed_by5` (
`col_int_undef_signed` int NULL,
`col_int_undef_signed_not_null` int NOT NULL,
`col_date_undef_signed` date NULL,
`col_date_undef_signed_not_null` date NOT NULL,
`col_varchar_10__undef_signed` varchar(10) NULL,
`col_varchar_10__undef_signed_not_null` varchar(10) NOT NULL,
`col_varchar_1024__undef_signed` varchar(1024) NULL,
`col_varchar_1024__undef_signed_not_null` varchar(1024) NOT NULL,
`pk` int NULL
) ENGINE=OLAP
DUPLICATE KEY(`col_int_undef_signed`, `col_int_undef_signed_not_null`, `col_date_undef_signed`)
DISTRIBUTED BY HASH(`pk`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
insert into table_1_undef_partitions2_keys3_properties4_distributed_by5 values(3, 6, '2023-12-17', '2023-12-17', 'ok', 'v', 'want', 'z', 0);
set enable_nereids_distribute_planner=true;
set parallel_pipeline_task_num = 1;
"""

for (def i in 0..<100) {
test {
sql """
WITH cte1 AS(
SELECT t1.`pk`
FROM table_1_undef_partitions2_keys3_properties4_distributed_by5 AS t1
ORDER BY t1.pk
)
SELECT cte1.`pk` AS pk1
FROM cte1
LEFT OUTER JOIN cte1 AS alias1
ON cte1 . `pk` = alias1 . `pk`
WHERE cte1.`pk` < 3
LIMIT 66666666
"""
result([[0]])
}
}
}

0 comments on commit 54658e6

Please sign in to comment.