Skip to content

Commit

Permalink
Reduce usage of DataSourceAnalysis in interfaces (#17724)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgyrtkirk authored Feb 19, 2025
1 parent 98714c4 commit 11be3a9
Show file tree
Hide file tree
Showing 34 changed files with 140 additions and 145 deletions.
5 changes: 5 additions & 0 deletions .github/scripts/create-jacoco-coverage-report.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ set -x

echo "GITHUB_BASE_REF: ${GITHUB_BASE_REF}"

if [ "$GITHUB_BASE_REF" == "" ] ;then
echo "GITHUB_BASE_REF is not set; skipping this check!"
exit 0
fi

echo "Setting up git remote"
git remote set-branches --add origin ${GITHUB_BASE_REF}
git fetch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
Expand All @@ -81,7 +82,6 @@
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
Expand Down Expand Up @@ -504,9 +504,9 @@ void addSegmentToServer(DruidServer server, DataSegment segment)
}

@Override
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis)
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(TableDataSource table)
{
return Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName()));
return Optional.ofNullable(timelines.get(table.getName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public List<Query> optimize(Query query)
return Collections.singletonList(query);
}
String datasourceName = ((TableDataSource) query.getDataSource()).getName();
// get all derivatives for datasource in query. The derivatives set is sorted by average size of
// get all derivatives for datasource in query. The derivatives set is sorted by average size of
// per segment granularity.
Set<DerivativeDataSource> derivatives = DerivativeDataSourceManager.getDerivatives(datasourceName);

Expand Down Expand Up @@ -118,21 +118,21 @@ public List<Query> optimize(Query query)
}

List<Query> queries = new ArrayList<>();
List<Interval> remainingQueryIntervals = (List<Interval>) query.getIntervals();
List<Interval> remainingQueryIntervals = query.getIntervals();

for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) {
TableDataSource tableDataSource = new TableDataSource(derivativeDataSource.getName());
final List<Interval> derivativeIntervals = remainingQueryIntervals.stream()
.flatMap(interval -> serverView
.getTimeline(tableDataSource.getAnalysis())
.getTimeline(tableDataSource)
.orElseThrow(() -> new ISE("No timeline for dataSource: %s", derivativeDataSource.getName()))
.lookup(interval)
.stream()
.map(TimelineObjectHolder::getInterval)
)
.collect(Collectors.toList());
// if the derivative does not contain any parts of intervals in the query, the derivative will
// not be selected.
// not be selected.
if (derivativeIntervals.isEmpty()) {
continue;
}
Expand All @@ -154,7 +154,7 @@ public List<Query> optimize(Query query)
}

//after materialized view selection, the result of the remaining query interval will be computed based on
// the original datasource.
// the original datasource.
if (!remainingQueryIntervals.isEmpty()) {
queries.add(query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(remainingQueryIntervals)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@
import org.apache.druid.query.Result;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.movingaverage.test.TestConfig;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.MapJoinableFactory;
Expand Down Expand Up @@ -322,7 +322,7 @@ public long getMaxQueuedBytes()
new TimelineServerView()
{
@Override
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(DataSourceAnalysis analysis)
public Optional<? extends TimelineLookup<String, ServerSelector>> getTimeline(TableDataSource analysis)
{
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public List<InputSlice> sliceStatic(final InputSpec inputSpec, final int maxNumS
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
final TimelineLookup<String, ServerSelector> timeline =
serverView.getTimeline(new TableDataSource(tableInputSpec.getDataSource()).getAnalysis()).orElse(null);
serverView.getTimeline(new TableDataSource(tableInputSpec.getDataSource())).orElse(null);

if (timeline == null) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Supplier<ResourceHolder<CompleteSegment>> fetchSegment(

return () -> {
final Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>> timeline =
segmentManager.getTimeline(new TableDataSource(segmentId.getDataSource()).getAnalysis());
segmentManager.getTimeline(new TableDataSource(segmentId.getDataSource()));

if (!timeline.isPresent()) {
throw segmentNotFound(segmentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ void setUp()
}

Mockito.when(serverView.getDruidServerMetadatas()).thenReturn(SERVERS);
Mockito.when(serverView.getTimeline(new TableDataSource(DATASOURCE).getAnalysis()))
Mockito.when(serverView.getTimeline(new TableDataSource(DATASOURCE)))
.thenReturn(Optional.of(timeline));
Mockito.when(serverView.getTimeline(new TableDataSource(DATASOURCE_NONEXISTENT).getAnalysis()))
Mockito.when(serverView.getTimeline(new TableDataSource(DATASOURCE_NONEXISTENT)))
.thenReturn(Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.initialization.ServerConfig;
Expand Down Expand Up @@ -388,11 +388,10 @@ private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
QueryRunner<T> queryRunner = null;

if (runningItem != null) {
final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
final Task task = runningItem.getTask();
final TableDataSource queryTable = query.getDataSourceAnalysis().getBaseTableDataSource();

if (analysis.getBaseTableDataSource().isPresent()
&& task.getDataSource().equals(analysis.getBaseTableDataSource().get().getName())) {
if (task.getDataSource().equals(queryTable.getName())) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);

if (taskQueryRunner != null) {
Expand Down
4 changes: 1 addition & 3 deletions processing/src/main/java/org/apache/druid/query/Queries.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ public static <T> Query<T> withSpecificSegments(final Query<T> query, final List
final DataSourceAnalysis analysis = retDataSource.getAnalysis();

// Sanity check: query must be based on a single table.
if (!analysis.getBaseTableDataSource().isPresent()) {
throw new ISE("Unable to apply specific segments to non-table-based dataSource[%s]", query.getDataSource());
}
analysis.getBaseTableDataSource();

if (analysis.getBaseQuerySegmentSpec().isPresent()
&& !analysis.getBaseQuerySegmentSpec().get().equals(new MultipleSpecificSegmentSpec(descriptors))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public byte[] getCacheKey()
@Override
public DataSourceAnalysis getAnalysis()
{
return new DataSourceAnalysis(this, null, null, ImmutableList.of());
return base.getAnalysis();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.query.planning;

import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
Expand Down Expand Up @@ -111,23 +112,19 @@ public DataSource getBaseDataSource()
}

/**
* Returns the concrete base table.
* <ul>
* <li>If {@link #baseDataSource} is a {@link TableDataSource}, returns itself.
* <li>If {@link #baseDataSource} is a {@link RestrictedDataSource}, returns {@link RestrictedDataSource#getBase()}.
* <li>Otherwise, returns an empty Optional.
*</ul>
* Note that this can return empty even if {@link #isConcreteAndTableBased()} is true. This happens if the base
* datasource is a {@link UnionDataSource} or {@link UnnestDataSource}.
* Unwraps the {@link #getBaseDataSource()} if its a {@link TableDataSource}.
*
* @throws An error of type {@link DruidException.Category#DEFENSIVE} if the {@link BaseDataSource} is not a table.
*
* note that this may not be true even {@link #isConcreteAndTableBased()} is true - in cases when the base
* datasource is a {@link UnionDataSource} of {@link TableDataSource}.
*/
public Optional<TableDataSource> getBaseTableDataSource()
public TableDataSource getBaseTableDataSource()
{
if (baseDataSource instanceof TableDataSource) {
return Optional.of((TableDataSource) baseDataSource);
} else if (baseDataSource instanceof RestrictedDataSource) {
return Optional.of(((RestrictedDataSource) baseDataSource).getBase());
return (TableDataSource) baseDataSource;
} else {
return Optional.empty();
throw DruidException.defensive("Base dataSource[%s] is not a table!", baseDataSource);
}
}

Expand Down
18 changes: 8 additions & 10 deletions processing/src/test/java/org/apache/druid/query/QueriesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.query;

import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
Expand All @@ -33,25 +34,21 @@
import org.apache.druid.query.filter.TrueDimFilter;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.JoinType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertThrows;

/**
*
*/
public class QueriesTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testVerifyAggregations()
{
Expand Down Expand Up @@ -328,10 +325,11 @@ public void testWithSpecificSegmentsOnUnionIsAnError()
.granularity(Granularities.ALL)
.build();

expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Unable to apply specific segments to non-table-based dataSource");

final Query<Result<TimeseriesResultValue>> ignored = Queries.withSpecificSegments(query, descriptors);
DruidException e = assertThrows(
DruidException.class,
() -> Queries.withSpecificSegments(query, descriptors)
);
Assert.assertEquals("Base dataSource[LookupDataSource{lookupName='lookyloo'}] is not a table!", e.getMessage());
}

@Test
Expand Down
Loading

0 comments on commit 11be3a9

Please sign in to comment.