Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Aug 20, 2024
1 parent 935343f commit 825a80c
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 33 deletions.
30 changes: 29 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.analysis.ExprId;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
Expand All @@ -53,6 +54,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Triple;
Expand Down Expand Up @@ -913,15 +915,41 @@ public TNormalizedPlanNode normalize(Normalizer normalizer) {
.collect(Collectors.toSet())
);
normalizedPlan.setLimit(limit);

normalize(normalizedPlan, normalizer);
normalizeProjection(normalizedPlan, normalizer);
return normalizedPlan;
}

public void normalize(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) {
throw new IllegalStateException("Unsupported normalization");
}

protected void normalizeProjection(TNormalizedPlanNode normalizedPlanNode, Normalizer normalizer) {
List<SlotDescriptor> outputSlots = normalizer
.getDescriptorTable()
.getTupleDesc(getOutputTupleIds().get(0))
.getSlots();

Map<SlotId, Expr> outputSlotToProject = Maps.newLinkedHashMap();
for (int i = 0; i < outputSlots.size(); i++) {
if (projectList == null) {
SlotRef slotRef = new SlotRef(outputSlots.get(i));
outputSlotToProject.put(outputSlots.get(i).getId(), slotRef);
} else {
Expr projectExpr = projectList.get(i);
if (projectExpr instanceof SlotRef) {
int outputId = outputSlots.get(i).getId().asInt();
int refId = ((SlotRef) projectExpr).getSlotId().asInt();
normalizer.setSlotIdToNormalizeId(outputId, normalizer.normalizeSlotId(refId));
}
outputSlotToProject.put(outputSlots.get(i).getId(), projectExpr);
}
}

List<TExpr> sortNormalizeProject = normalizeProjection(outputSlotToProject, normalizer);
normalizedPlanNode.setProjects(sortNormalizeProject);
}

protected void normalizeConjuncts(TNormalizedPlanNode normalizedPlan, Normalizer normalizer) {
normalizedPlan.setConjuncts(normalizeExprs(getConjuncts(), normalizer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.doris.planner;

import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.planner.normalize.QueryCacheNormalizer;
import org.apache.doris.thrift.TNormalizedPlanNode;
import org.apache.doris.thrift.TQueryCacheParam;
import org.apache.doris.thrift.TRuntimeFilterMode;
import org.apache.doris.thrift.TRuntimeFilterType;
Expand Down Expand Up @@ -108,7 +106,7 @@ protected void runBeforeAll() throws Exception {
createTables(nonPart, part1, part2, multiLeveParts);

connectContext.getSessionVariable().setEnableNereidsPlanner(true);
connectContext.getSessionVariable().setDisableNereidsRules(RuleType.PRUNE_EMPTY_PARTITION.toString());
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
}

@Test
Expand All @@ -132,7 +130,7 @@ public void testNormalize() throws Exception {
}

@Test
public void testProject() throws Exception {
public void testProjectOnOlapScan() throws Exception {
String digest1 = getDigest("select k1 + 1, k2, sum(v1), sum(v2) as v from db1.non_part group by 1, 2");
String digest2 = getDigest("select sum(v2), k2, sum(v1), k1 + 1 from db1.non_part group by 2, 4");
Assertions.assertEquals(digest1, digest2);
Expand All @@ -141,6 +139,17 @@ public void testProject() throws Exception {
Assertions.assertNotEquals(digest1, digest3);
}

@Test
public void testProjectOnAggregate() throws Exception {
connectContext.getSessionVariable()
.setDisableNereidsRules("PRUNE_EMPTY_PARTITION,TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT");
String digest1 = getDigest("select k1 + 1, k2 + 1, sum(v1) + 1, sum(v2) + 1 as v from db1.non_part group by k1, k2");
String digest2 = getDigest("select sum(v2) + 1, k2 + 1, sum(v1) + 1, k1 + 1 as v from db1.non_part group by k1, k2");
Assertions.assertEquals(digest1, digest2);
}



@Test
public void testPartitionTable() throws Throwable {
TQueryCacheParam queryCacheParam1 = getQueryCacheParam(
Expand Down Expand Up @@ -201,21 +210,6 @@ private TQueryCacheParam getQueryCacheParam(String sql) throws Exception {
return queryCaches.get(0);
}

private void assertDigestEquals(String sql1, String sql2) throws Exception {
List<TQueryCacheParam> queryCache1 = normalize(sql1);
List<TQueryCacheParam> queryCache2 = normalize(sql2);

Assertions.assertEquals(queryCache1.size(), queryCache2.size());

for (int i = 0; i < queryCache1.size(); i++) {
TQueryCacheParam cacheParam1 = queryCache1.get(i);
TQueryCacheParam cacheParam2 = queryCache2.get(i);
String digest1 = Hex.encodeHexString(cacheParam1.digest);
String digest2 = Hex.encodeHexString(cacheParam2.digest);
Assertions.assertEquals(digest1, digest2);
}
}

private List<TQueryCacheParam> normalize(String sql) throws Exception {
Planner planner = getSqlStmtExecutor(sql).planner();
DescriptorTable descTable = planner.getDescTable();
Expand All @@ -230,16 +224,4 @@ private List<TQueryCacheParam> normalize(String sql) throws Exception {
}
return queryCacheParams;
}

private List<TNormalizedPlanNode> normalizePlans(String sql) throws Exception {
Planner planner = getSqlStmtExecutor(sql).planner();
DescriptorTable descTable = planner.getDescTable();
List<PlanFragment> fragments = planner.getFragments();
List<TNormalizedPlanNode> normalizedPlans = new ArrayList<>();
for (PlanFragment fragment : fragments) {
QueryCacheNormalizer normalizer = new QueryCacheNormalizer(fragment, descTable);
normalizedPlans.addAll(normalizer.normalizePlans(connectContext));
}
return normalizedPlans;
}
}
60 changes: 60 additions & 0 deletions gensrc/thrift/Normalization.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

namespace java org.apache.doris.thrift

include "Exprs.thrift"
include "Types.thrift"
include "Opcodes.thrift"
include "Descriptors.thrift"
include "Partitions.thrift"
include "PlanNodes.thrift"

struct TNormalizedOlapScanNode {
1: optional i64 table_id
2: optional i64 index_id
3: optional bool is_preaggregation
4: optional list<string> key_column_names
5: optional list<Types.TPrimitiveType> key_column_types
6: optional string rollup_name
7: optional string sort_column
8: optional list<string> select_columns
9: optional list<Exprs.TExpr> projection
}

struct TNormalizedAggregateNode {
1: optional list<Exprs.TExpr> grouping_exprs
2: optional list<Exprs.TExpr> aggregate_functions
3: optional Types.TTupleId intermediate_tuple_id
4: optional Types.TTupleId output_tuple_id
5: optional bool is_finalize
6: optional bool use_streaming_preaggregation
}

struct TNormalizedPlanNode {
1: optional Types.TPlanNodeId node_id
2: optional PlanNodes.TPlanNodeType node_type
3: optional i32 num_children
5: optional set<Types.TTupleId> tuple_ids
6: optional set<Types.TTupleId> nullable_tuples
7: optional list<Exprs.TExpr> conjuncts
8: optional list<Exprs.TExpr> projects
9: optional i64 limit

10: optional TNormalizedOlapScanNode olap_scan_node
11: optional TNormalizedAggregateNode aggregation_node
}
5 changes: 4 additions & 1 deletion gensrc/thrift/Planner.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ include "Exprs.thrift"
include "DataSinks.thrift"
include "PlanNodes.thrift"
include "Partitions.thrift"
include "QueryCache.thrift"

// TPlanFragment encapsulates info needed to execute a particular
// plan fragment, including how to produce and how to partition its output.
Expand Down Expand Up @@ -61,6 +62,8 @@ struct TPlanFragment {
// sink) in a single instance of this fragment. This is used for an optimization in
// InitialReservation. Measured in bytes. required in V1
8: optional i64 initial_reservation_total_claims

9: optional QueryCache.TQueryCacheParam query_cache_param
}

// location information for a single scan range
Expand All @@ -79,4 +82,4 @@ struct TScanRangeLocations {
1: required PlanNodes.TScanRange scan_range
// non-empty list
2: list<TScanRangeLocation> locations
}
}
45 changes: 45 additions & 0 deletions gensrc/thrift/QueryCache.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

namespace cpp doris
namespace java org.apache.doris.thrift

struct TQueryCacheParam {
1: required i32 node_id

2: required binary digest

// the query slots order can different to the query cache slots order,
// so we should mapping cache slot id to current slot id
3: required map<i32, i32> output_slot_mapping

// mapping partition id to tablet id,
// if the table is non-partition table, the partition id is -1
4: required map<i64, set<i64>> partition_to_tablets

// mapping partition id to filter range,
// if the table is non-partition table, the partition id is -1.
// BE will use <digest, tablet id, filter range> as the key to search query cache.
// note that, BE not care what the filter range content is, just use as the part of the key.
5: required map<i64, string> partition_filter_key

6: optional bool force_refresh_query_cache

7: optional i64 entry_max_bytes

8: optional i64 entry_max_rows
}

0 comments on commit 825a80c

Please sign in to comment.