-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable querying policy-enabled table in MSQ, and use RestrictedDataSource as a base in DataSourceAnalysis. #17666
base: master
Are you sure you want to change the base?
Conversation
…le policy restriction in MSQ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had a first pass and have some questions and thoughts.
Also, maybe you could try to avoid reformatting entire files, all of these unrelated formatting changes make review harder than it should be. I know its just the tooling doing it to adhere to the style stuff, but my preference at least would be to do these cosmetic changes as you notice them as standalone PR to keep reviews simple.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
Outdated
Show resolved
Hide resolved
workerToTaskIds.compute(i, (workerId, taskIds) -> { | ||
if (taskIds == null) { | ||
taskIds = new ArrayList<>(); | ||
} | ||
taskIds.add(task.getId()); | ||
return taskIds; | ||
}); | ||
workerToTaskIds.computeIfAbsent(i, (unused) -> (new ArrayList<>())).add(task.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't equivalent, previously it would always add the taskId to the worker, now it only adds if the worker isn't there, is that ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this looks weird.
...ge-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
Outdated
Show resolved
Hide resolved
* join tree. | ||
*/ | ||
@JsonTypeName("restrictedInputNumber") | ||
public class RestrictedInputNumberDataSource implements DataSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be InputNumberRestrictedDataSource
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a preference, but i kinda feel RestrictedInputNumberDataSource
emphasis more on the Restricted
part
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
Outdated
Show resolved
Hide resolved
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
Outdated
Show resolved
Hide resolved
...i-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please have the formatting/refactors as part of another PR. The PR is quite hard to review with all the refactors and the formatting changes.
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
Outdated
Show resolved
Hide resolved
...i-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
Outdated
Show resolved
Hide resolved
workerToTaskIds.compute(i, (workerId, taskIds) -> { | ||
if (taskIds == null) { | ||
taskIds = new ArrayList<>(); | ||
} | ||
taskIds.add(task.getId()); | ||
return taskIds; | ||
}); | ||
workerToTaskIds.computeIfAbsent(i, (unused) -> (new ArrayList<>())).add(task.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this looks weird.
...sions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
Show resolved
Hide resolved
public class RestrictedInputNumberDataSource implements DataSource | ||
{ | ||
private final int inputNumber; | ||
private final Policy policy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a list of policy no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We agreed on a single policy approach for datasource, full discussion here: #17564 (comment)
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
Show resolved
Hide resolved
@@ -571,9 +570,6 @@ private static Triple<DataSource, DimFilter, List<PreJoinableClause>> flattenJoi | |||
} else if (current instanceof UnnestDataSource) { | |||
final UnnestDataSource unnestDataSource = (UnnestDataSource) current; | |||
current = unnestDataSource.getBase(); | |||
} else if (current instanceof RestrictedDataSource) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain the removal here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added in first security pr, in that pr, the TableDataSource is used as a base in DataSourceAnalysis. But that's not a good approach, security filter might get lost in withUpdatedDataSource
. In this new pr, RestrictedDataSource is used as a base, this guarantees the security filter stays in place
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
Outdated
Show resolved
Hide resolved
return new DataSourcePlan( | ||
(broadcast && dataSource.isGlobal()) | ||
? dataSource | ||
: new RestrictedInputNumberDataSource(0, dataSource.getPolicy()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if its really necessary to delay the policy evaluation up until the point the cursor is created?
if so - wouldn't it be an option to just save the required details to interpret the policy fully insted of forcing it to be the 1st parent of TableDataSource
- because that requires to work around things which should be working already and need classes which are more-or-less just copies of others.
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
Fixed
Show fixed
Hide fixed
@@ -1603,6 +1606,39 @@ | |||
.verifyResults(); | |||
} | |||
|
|||
@MethodSource("data") | |||
@ParameterizedTest(name = "{index}:with context {0}") | |||
public void testInsertOnRestricted(String contextName, Map<String, Object> context) |
Check notice
Code scanning / CodeQL
Useless parameter Note test
@@ -704,6 +707,81 @@ | |||
.verifyResults(); | |||
} | |||
|
|||
@MethodSource("data") | |||
@ParameterizedTest(name = "{index}:with context {0}") | |||
public void testReplaceOnRestricted(String contextName, Map<String, Object> context) |
Check notice
Code scanning / CodeQL
Useless parameter Note test
|
||
@MethodSource("data") | ||
@ParameterizedTest(name = "{index}:with context {0}") | ||
public void testExport(String unusedContextName, Map<String, Object> context) throws IOException |
Check notice
Code scanning / CodeQL
Useless parameter Note test
public void testExport2() throws IOException | ||
@MethodSource("data") | ||
@ParameterizedTest(name = "{index}:with context {0}") | ||
public void testExport2(String unusedContextName, Map<String, Object> context) throws IOException |
Check notice
Code scanning / CodeQL
Useless parameter Note test
public void testNumberOfRowsPerFile() | ||
@MethodSource("data") | ||
@ParameterizedTest(name = "{index}:with context {0}") | ||
public void testExportRestricted(String unusedContextName, Map<String, Object> context) throws IOException |
Check notice
Code scanning / CodeQL
Useless parameter Note test
|
||
@MethodSource("data") | ||
@ParameterizedTest(name = "{index}:with context {0}") | ||
public void testNumberOfRowsPerFile(String unusedContextName, Map<String, Object> context) throws IOException |
Check notice
Code scanning / CodeQL
Useless parameter Note test
void testExportComplexColumns() throws IOException | ||
@MethodSource("data") | ||
@ParameterizedTest(name = "{index}:with context {0}") | ||
public void testExportComplexColumns(String unusedContextName, Map<String, Object> context) throws IOException |
Check notice
Code scanning / CodeQL
Useless parameter Note test
void testExportSketchColumns() throws IOException | ||
@MethodSource("data") | ||
@ParameterizedTest(name = "{index}:with context {0}") | ||
public void testExportSketchColumns(String unusedContextName, Map<String, Object> context) throws IOException |
Check notice
Code scanning / CodeQL
Useless parameter Note test
|
||
@MethodSource("data") | ||
@ParameterizedTest(name = "{index}:with context {0}") | ||
public void testEmptyExport(String unusedContextName, Map<String, Object> context) throws IOException |
Check notice
Code scanning / CodeQL
Useless parameter Note test
|
||
@MethodSource("data") | ||
@ParameterizedTest(name = "{index}:with context {0}") | ||
public void testExportWithLimit(String unusedContextName, Map<String, Object> context) throws IOException |
Check notice
Code scanning / CodeQL
Useless parameter Note test
Description
This PR enables querying policy-enabled table in MSQ.
Key changed/added classes in this PR
DataSourceAnalysis
,getBaseTableDataSource
can now return the base ofRestrictedDataSource
. This is a more robust solution than using the underlying table as base.MSQTaskQueryMaker
would add policies to the query, instead of throw permission error.DataSourcePlan
can handleRestrictedDataSource
.RestrictedInputNumberDataSource
, which basically wraps a NumberDataSource with a policy, and itsSegmentMapFn
can be used to create aRestrictedSegment
.RunWorkOrder
, try to make a few refactors to make the code clear, no behavior change.ShufflePipelineBuilder.build()
, it was not clear before that the channel future should only be returned when the resultFuture is ready. Also, the sanity check is moved toOutputChannels
.MSQSelectTest
,MSQReplaceTest
,MSQInsertTest
,MSQExportTest
.This PR has: