Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable querying policy-enabled table in MSQ, and use RestrictedDataSource as a base in DataSourceAnalysis. #17666

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion extensions-core/multi-stage-query/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Package `org.apache.druid.msq.sql` contains code related to integration with Dru

Main classes:

- [SqlTaskResource](src/main/java/org/apache/druid/msq/counters/CounterTracker.java) offers the endpoint
- [SqlTaskResource](src/main/java/org/apache/druid/msq/sql/resources/SqlTaskResource.java) offers the endpoint
`/druid/v2/sql/task`, where SQL queries are executed as multi-stage query tasks.
- [MSQTaskSqlEngine](src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java) is a SqlEngine implementation that
executes SQL queries as multi-stage query tasks. It is injected into the SqlTaskResource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public QueryResponse<Object[]> runQuery(DruidQuery druidQuery)
}
final MSQSpec querySpec = MSQTaskQueryMaker.makeQuerySpec(
null,
druidQuery.getQuery(),
druidQuery,
fieldMapping,
plannerContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.druid.msq.exec;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -1020,19 +1018,7 @@ public ListenableFuture<OutputChannels> build()
throw new ISE("Not initialized");
}

return Futures.transformAsync(
pipelineFuture,
resultAndChannels ->
Futures.transform(
resultAndChannels.getResultFuture(),
(Function<Object, OutputChannels>) input -> {
sanityCheckOutputChannels(resultAndChannels.getOutputChannels());
return resultAndChannels.getOutputChannels();
},
Execs.directExecutor()
),
Execs.directExecutor()
);
return Futures.transformAsync(pipelineFuture, ResultAndChannels::waitResultReady, Execs.directExecutor());
}

/**
Expand Down Expand Up @@ -1143,25 +1129,6 @@ private void pushAsync(final ExceptionalFunction<ResultAndChannels<?>, Listenabl
)
);
}

/**
* Verifies there is exactly one channel per partition.
*/
private void sanityCheckOutputChannels(final OutputChannels outputChannels)
{
for (int partitionNumber : outputChannels.getPartitionNumbers()) {
final List<OutputChannel> outputChannelsForPartition =
outputChannels.getChannelsForPartition(partitionNumber);

Preconditions.checkState(partitionNumber >= 0, "Expected partitionNumber >= 0, but got [%s]", partitionNumber);
Preconditions.checkState(
outputChannelsForPartition.size() == 1,
"Expected one channel for partition [%s], but got [%s]",
partitionNumber,
outputChannelsForPartition.size()
);
}
}
}

private static class ResultAndChannels<T>
Expand All @@ -1187,6 +1154,11 @@ public OutputChannels getOutputChannels()
{
return outputChannels;
}

public ListenableFuture<OutputChannels> waitResultReady()
{
return Futures.transform(resultFuture, unused -> outputChannels.verifySingleChannel(), Execs.directExecutor());
}
}

private interface ExceptionalFunction<T, R>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private enum State
private final AtomicReference<State> state = new AtomicReference<>(State.NEW);
private final AtomicBoolean cancelTasksOnStop = new AtomicBoolean();

// Set by launchTasksIfNeeded.
// Set by launchWorkersIfNeeded.
@GuardedBy("taskIds")
private int desiredTaskCount = 0;

Expand Down Expand Up @@ -459,13 +459,7 @@ private void runNewTasks()
);

taskTrackers.put(task.getId(), new TaskTracker(i, task));
workerToTaskIds.compute(i, (workerId, taskIds) -> {
if (taskIds == null) {
taskIds = new ArrayList<>();
}
taskIds.add(task.getId());
return taskIds;
});
workerToTaskIds.computeIfAbsent(i, (unused) -> (new ArrayList<>())).add(task.getId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't equivalent, previously it would always add the taskId to the worker, now it only adds if the worker isn't there, is that ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this looks weird.


FutureUtils.getUnchecked(overlordClient.runTask(task.getId(), task), true);

Expand Down Expand Up @@ -769,7 +763,7 @@ private void sleep(final long sleepMillis, final boolean shuttingDown) throws In
} else {
// wait on taskIds so we can wake up early if needed.
synchronized (taskIds) {
// desiredTaskCount is set by launchTasksIfNeeded, and acknowledgedDesiredTaskCount is set by mainLoop when
// desiredTaskCount is set by launchWorkersIfNeeded, and acknowledgedDesiredTaskCount is set by mainLoop when
// it acknowledges a new target. If these are not equal, do another run immediately and launch more tasks.
if (acknowledgedDesiredTaskCount == desiredTaskCount) {
taskIds.wait(sleepMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.RestrictedDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
Expand Down Expand Up @@ -165,6 +166,14 @@ public static DataSourcePlan forDataSource(
filterFields,
broadcast
);
} else if (dataSource instanceof RestrictedDataSource) {
return forRestricted(
(RestrictedDataSource) dataSource,
querySegmentSpecIntervals(querySegmentSpec),
filter,
filterFields,
broadcast
);
} else if (dataSource instanceof ExternalDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forExternal((ExternalDataSource) dataSource, broadcast);
Expand Down Expand Up @@ -329,7 +338,7 @@ private static JoinAlgorithm deduceJoinAlgorithm(JoinAlgorithm preferredJoinAlgo

/**
* Checks if the sortMerge algorithm can execute a particular join condition.
*
* <p>
* One check: join condition on two tables "table1" and "table2" is of the form
* table1.columnA = table2.columnA && table1.columnB = table2.columnB && ....
*/
Expand Down Expand Up @@ -365,6 +374,25 @@ private static DataSourcePlan forTable(
);
}

private static DataSourcePlan forRestricted(
final RestrictedDataSource dataSource,
final List<Interval> intervals,
@Nullable final DimFilter filter,
@Nullable final Set<String> filterFields,
final boolean broadcast
)
{
// TODO: TableInputSpec should take row filter in policy into consideration.
cecemei marked this conversation as resolved.
Show resolved Hide resolved
return new DataSourcePlan(
(broadcast && dataSource.isGlobal())
? dataSource
: new RestrictedInputNumberDataSource(0, dataSource.getPolicy()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if its really necessary to delay the policy evaluation up until the point the cursor is created?
if so - wouldn't it be an option to just save the required details to interpret the policy fully insted of forcing it to be the 1st parent of TableDataSource - because that requires to work around things which should be working already and need classes which are more-or-less just copies of others.

Collections.singletonList(new TableInputSpec(dataSource.getBase().getName(), intervals, filter, filterFields)),
broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
null
);
}

private static DataSourcePlan forExternal(
final ExternalDataSource dataSource,
final boolean broadcast
Expand Down Expand Up @@ -764,10 +792,10 @@ private static List<Interval> querySegmentSpecIntervals(final QuerySegmentSpec q
/**
* Verify that the provided {@link QuerySegmentSpec} is a {@link MultipleIntervalSegmentSpec} with
* interval {@link Intervals#ETERNITY}. If not, throw an {@link UnsupportedOperationException}.
*
* <p>
* Anywhere this appears is a place that we do not support using the "intervals" parameter of a query
* (i.e., {@link org.apache.druid.query.BaseQuery#getQuerySegmentSpec()}) for time filtering.
*
* <p>
* We don't need to support this for anything that is not {@link DataSourceAnalysis#isTableBased()}, because
* the SQL layer avoids "intervals" in other cases. See
* {@link org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@
/**
* Represents an input number, i.e., a positional index into
* {@link org.apache.druid.msq.kernel.StageDefinition#getInputSpecs()}.
*
* Used by {@link DataSourcePlan} to note which inputs correspond to which datasources in the query being planned.
*
* Used by {@link BroadcastJoinSegmentMapFnProcessor} to associate broadcast inputs with the correct datasources in a
* <p>
* Used by
* <ul>
* <li>{@link DataSourcePlan}, to note which inputs correspond to which datasources in the query being planned.
* <li>{@link BroadcastJoinSegmentMapFnProcessor} to associate broadcast inputs with the correct datasources in a
* join tree.
*/
@JsonTypeName("inputNumber")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.querykit;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.policy.Policy;
import org.apache.druid.segment.RestrictedSegment;
import org.apache.druid.segment.SegmentReference;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;

/**
* Represents an input number, i.e., a positional index into
* {@link org.apache.druid.msq.kernel.StageDefinition#getInputSpecs()}, with policy restriction.
* <p>
* Used by
* <ul>
* <li>{@link DataSourcePlan}, to note which inputs correspond to which datasources in the query being planned.
* <li>{@link BroadcastJoinSegmentMapFnProcessor} to associate broadcast inputs with the correct datasources in a
* join tree.
*/
@JsonTypeName("restrictedInputNumber")
public class RestrictedInputNumberDataSource implements DataSource
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be InputNumberRestrictedDataSource instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a preference, but i kinda feel RestrictedInputNumberDataSource emphasis more on the Restricted part

{
private final int inputNumber;
private final Policy policy;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a list of policy no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed on a single policy approach for datasource, full discussion here: #17564 (comment)


@JsonCreator
public RestrictedInputNumberDataSource(
@JsonProperty("inputNumber") int inputNumber,
@JsonProperty("policy") Policy policy
)
{
this.inputNumber = inputNumber;
this.policy = Preconditions.checkNotNull(policy, "Policy can't be null");
}

@JsonProperty
public int getInputNumber()
{
return inputNumber;
}

@JsonProperty
public Policy getPolicy()
{
return policy;
}

@Override
public Set<String> getTableNames()
{
return Collections.emptySet();
}

@Override
public List<DataSource> getChildren()
{
return Collections.emptyList();
}

@Override
public DataSource withChildren(final List<DataSource> children)
{
if (!children.isEmpty()) {
throw new IAE("Cannot accept children");
}

return this;
}

@Override
public boolean isCacheable(boolean isBroker)
{
return false;
}

@Override
public boolean isGlobal()
{
return false;
}

@Override
public boolean isConcrete()
{
return true;
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
return baseSegment -> new RestrictedSegment(baseSegment, policy);
}

@Override
public DataSource withUpdatedDataSource(DataSource newSource)
{
return newSource;
}

@Override
public byte[] getCacheKey()
{
return null;
}

@Override
public DataSourceAnalysis getAnalysis()
{
return new DataSourceAnalysis(this, null, null, Collections.emptyList());
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RestrictedInputNumberDataSource that = (RestrictedInputNumberDataSource) o;
return inputNumber == that.inputNumber && policy.equals(that.policy);
}

@Override
public int hashCode()
{
return Objects.hash(inputNumber, policy);
}

@Override
public String toString()
{
return "RestrictedInputNumberDataSource{" +
"inputNumber=" + inputNumber +
", policy=" + policy + "}";

}
}
Loading
Loading