-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable querying policy-enabled table in MSQ, and use RestrictedDataSource as a base in DataSourceAnalysis. #17666
base: master
Are you sure you want to change the base?
Changes from all commits
05df2a5
22a794a
22f6fc2
51a86b4
e5b5128
3011c5a
cbc38c8
c231fa4
3f1be25
8b81bc7
63aa977
f531aed
0c2b782
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,6 +52,7 @@ | |
import org.apache.druid.query.LookupDataSource; | ||
import org.apache.druid.query.QueryContext; | ||
import org.apache.druid.query.QueryDataSource; | ||
import org.apache.druid.query.RestrictedDataSource; | ||
import org.apache.druid.query.TableDataSource; | ||
import org.apache.druid.query.UnionDataSource; | ||
import org.apache.druid.query.UnnestDataSource; | ||
|
@@ -165,6 +166,14 @@ public static DataSourcePlan forDataSource( | |
filterFields, | ||
broadcast | ||
); | ||
} else if (dataSource instanceof RestrictedDataSource) { | ||
return forRestricted( | ||
(RestrictedDataSource) dataSource, | ||
querySegmentSpecIntervals(querySegmentSpec), | ||
filter, | ||
filterFields, | ||
broadcast | ||
); | ||
} else if (dataSource instanceof ExternalDataSource) { | ||
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec); | ||
return forExternal((ExternalDataSource) dataSource, broadcast); | ||
|
@@ -329,7 +338,7 @@ private static JoinAlgorithm deduceJoinAlgorithm(JoinAlgorithm preferredJoinAlgo | |
|
||
/** | ||
* Checks if the sortMerge algorithm can execute a particular join condition. | ||
* | ||
* <p> | ||
* One check: join condition on two tables "table1" and "table2" is of the form | ||
* table1.columnA = table2.columnA && table1.columnB = table2.columnB && .... | ||
*/ | ||
|
@@ -365,6 +374,25 @@ private static DataSourcePlan forTable( | |
); | ||
} | ||
|
||
private static DataSourcePlan forRestricted( | ||
final RestrictedDataSource dataSource, | ||
final List<Interval> intervals, | ||
@Nullable final DimFilter filter, | ||
@Nullable final Set<String> filterFields, | ||
final boolean broadcast | ||
) | ||
{ | ||
// TODO: TableInputSpec should take row filter in policy into consideration. | ||
cecemei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return new DataSourcePlan( | ||
(broadcast && dataSource.isGlobal()) | ||
? dataSource | ||
: new RestrictedInputNumberDataSource(0, dataSource.getPolicy()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if its really necessary to delay the policy evaluation up until the point the cursor is created? |
||
Collections.singletonList(new TableInputSpec(dataSource.getBase().getName(), intervals, filter, filterFields)), | ||
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(), | ||
null | ||
); | ||
} | ||
|
||
private static DataSourcePlan forExternal( | ||
final ExternalDataSource dataSource, | ||
final boolean broadcast | ||
|
@@ -764,10 +792,10 @@ private static List<Interval> querySegmentSpecIntervals(final QuerySegmentSpec q | |
/** | ||
* Verify that the provided {@link QuerySegmentSpec} is a {@link MultipleIntervalSegmentSpec} with | ||
* interval {@link Intervals#ETERNITY}. If not, throw an {@link UnsupportedOperationException}. | ||
* | ||
* <p> | ||
* Anywhere this appears is a place that we do not support using the "intervals" parameter of a query | ||
* (i.e., {@link org.apache.druid.query.BaseQuery#getQuerySegmentSpec()}) for time filtering. | ||
* | ||
* <p> | ||
* We don't need to support this for anything that is not {@link DataSourceAnalysis#isTableBased()}, because | ||
* the SQL layer avoids "intervals" in other cases. See | ||
* {@link org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.msq.querykit; | ||
|
||
import com.fasterxml.jackson.annotation.JsonCreator; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.annotation.JsonTypeName; | ||
import com.google.common.base.Preconditions; | ||
import org.apache.druid.java.util.common.IAE; | ||
import org.apache.druid.query.DataSource; | ||
import org.apache.druid.query.Query; | ||
import org.apache.druid.query.planning.DataSourceAnalysis; | ||
import org.apache.druid.query.policy.Policy; | ||
import org.apache.druid.segment.RestrictedSegment; | ||
import org.apache.druid.segment.SegmentReference; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* Represents an input number, i.e., a positional index into | ||
* {@link org.apache.druid.msq.kernel.StageDefinition#getInputSpecs()}, with policy restriction. | ||
* <p> | ||
* Used by | ||
* <ul> | ||
* <li>{@link DataSourcePlan}, to note which inputs correspond to which datasources in the query being planned. | ||
* <li>{@link BroadcastJoinSegmentMapFnProcessor} to associate broadcast inputs with the correct datasources in a | ||
* join tree. | ||
*/ | ||
@JsonTypeName("restrictedInputNumber") | ||
public class RestrictedInputNumberDataSource implements DataSource | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's a preference, but i kinda feel |
||
{ | ||
private final int inputNumber; | ||
private final Policy policy; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be a list of policy no ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We agreed on a single policy approach for datasource, full discussion here: #17564 (comment) |
||
|
||
@JsonCreator | ||
public RestrictedInputNumberDataSource( | ||
@JsonProperty("inputNumber") int inputNumber, | ||
@JsonProperty("policy") Policy policy | ||
) | ||
{ | ||
this.inputNumber = inputNumber; | ||
this.policy = Preconditions.checkNotNull(policy, "Policy can't be null"); | ||
} | ||
|
||
@JsonProperty | ||
public int getInputNumber() | ||
{ | ||
return inputNumber; | ||
} | ||
|
||
@JsonProperty | ||
public Policy getPolicy() | ||
{ | ||
return policy; | ||
} | ||
|
||
@Override | ||
public Set<String> getTableNames() | ||
{ | ||
return Collections.emptySet(); | ||
} | ||
|
||
@Override | ||
public List<DataSource> getChildren() | ||
{ | ||
return Collections.emptyList(); | ||
} | ||
|
||
@Override | ||
public DataSource withChildren(final List<DataSource> children) | ||
{ | ||
if (!children.isEmpty()) { | ||
throw new IAE("Cannot accept children"); | ||
} | ||
|
||
return this; | ||
} | ||
|
||
@Override | ||
public boolean isCacheable(boolean isBroker) | ||
{ | ||
return false; | ||
} | ||
|
||
@Override | ||
public boolean isGlobal() | ||
{ | ||
return false; | ||
} | ||
|
||
@Override | ||
public boolean isConcrete() | ||
{ | ||
return true; | ||
} | ||
|
||
@Override | ||
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query) | ||
{ | ||
return baseSegment -> new RestrictedSegment(baseSegment, policy); | ||
} | ||
|
||
@Override | ||
public DataSource withUpdatedDataSource(DataSource newSource) | ||
{ | ||
return newSource; | ||
} | ||
|
||
@Override | ||
public byte[] getCacheKey() | ||
{ | ||
return null; | ||
} | ||
|
||
@Override | ||
public DataSourceAnalysis getAnalysis() | ||
{ | ||
return new DataSourceAnalysis(this, null, null, Collections.emptyList()); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) | ||
{ | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
RestrictedInputNumberDataSource that = (RestrictedInputNumberDataSource) o; | ||
return inputNumber == that.inputNumber && policy.equals(that.policy); | ||
} | ||
|
||
@Override | ||
public int hashCode() | ||
{ | ||
return Objects.hash(inputNumber, policy); | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "RestrictedInputNumberDataSource{" + | ||
"inputNumber=" + inputNumber + | ||
", policy=" + policy + "}"; | ||
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't equivalent, previously it would always add the taskId to the worker, now it only adds if the worker isn't there, is that ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this looks weird.