Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into regex-arbitrary-chi…
Browse files Browse the repository at this point in the history
…ld-expression
  • Loading branch information
Hannah Bast committed Oct 27, 2024
2 parents e053d4f + d60b610 commit 60523c1
Show file tree
Hide file tree
Showing 57 changed files with 1,749 additions and 728 deletions.
162 changes: 147 additions & 15 deletions src/engine/Distinct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

#include "./Distinct.h"

#include <sstream>

#include "engine/CallFixedSize.h"
#include "engine/Engine.h"
#include "engine/QueryExecutionTree.h"

using std::endl;
Expand All @@ -19,13 +16,13 @@ size_t Distinct::getResultWidth() const { return subtree_->getResultWidth(); }
// _____________________________________________________________________________
Distinct::Distinct(QueryExecutionContext* qec,
std::shared_ptr<QueryExecutionTree> subtree,
const vector<ColumnIndex>& keepIndices)
: Operation(qec), subtree_(std::move(subtree)), _keepIndices(keepIndices) {}
const std::vector<ColumnIndex>& keepIndices)
: Operation{qec}, subtree_{std::move(subtree)}, keepIndices_{keepIndices} {}

// _____________________________________________________________________________
string Distinct::getCacheKeyImpl() const {
return absl::StrCat("DISTINCT (", subtree_->getCacheKey(), ") (",
absl::StrJoin(_keepIndices, ","), ")");
absl::StrJoin(keepIndices_, ","), ")");
}

// _____________________________________________________________________________
Expand All @@ -37,16 +34,151 @@ VariableToColumnMap Distinct::computeVariableToColumnMap() const {
}

// _____________________________________________________________________________
ProtoResult Distinct::computeResult([[maybe_unused]] bool requestLaziness) {
IdTable idTable{getExecutionContext()->getAllocator()};
template <size_t WIDTH>
Result::Generator Distinct::lazyDistinct(Result::Generator input,
bool yieldOnce) const {
IdTable aggregateTable{subtree_->getResultWidth(), allocator()};
LocalVocab aggregateVocab{};
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow =
std::nullopt;
for (auto& [idTable, localVocab] : input) {
IdTable result = distinct<WIDTH>(std::move(idTable), previousRow);
if (!result.empty()) {
previousRow.emplace(result.asStaticView<WIDTH>().back());
if (yieldOnce) {
aggregateVocab.mergeWith(std::array{std::move(localVocab)});
aggregateTable.insertAtEnd(result);
} else {
co_yield {std::move(result), std::move(localVocab)};
}
}
}
if (yieldOnce) {
co_yield {std::move(aggregateTable), std::move(aggregateVocab)};
}
}

// _____________________________________________________________________________
ProtoResult Distinct::computeResult(bool requestLaziness) {
LOG(DEBUG) << "Getting sub-result for distinct result computation..." << endl;
std::shared_ptr<const Result> subRes = subtree_->getResult();
std::shared_ptr<const Result> subRes = subtree_->getResult(true);

LOG(DEBUG) << "Distinct result computation..." << endl;
idTable.setNumColumns(subRes->idTable().numColumns());
size_t width = subRes->idTable().numColumns();
CALL_FIXED_SIZE(width, &Engine::distinct, subRes->idTable(), _keepIndices,
&idTable);
LOG(DEBUG) << "Distinct result computation done." << endl;
return {std::move(idTable), resultSortedOn(), subRes->getSharedLocalVocab()};
size_t width = subtree_->getResultWidth();
if (subRes->isFullyMaterialized()) {
IdTable idTable = CALL_FIXED_SIZE(width, &Distinct::outOfPlaceDistinct,
this, subRes->idTable());
LOG(DEBUG) << "Distinct result computation done." << endl;
return {std::move(idTable), resultSortedOn(),
subRes->getSharedLocalVocab()};
}

auto generator =
CALL_FIXED_SIZE(width, &Distinct::lazyDistinct, this,
std::move(subRes->idTables()), !requestLaziness);
return requestLaziness
? ProtoResult{std::move(generator), resultSortedOn()}
: ProtoResult{cppcoro::getSingleElement(std::move(generator)),
resultSortedOn()};
}

// _____________________________________________________________________________
bool Distinct::matchesRow(const auto& a, const auto& b) const {
return std::ranges::all_of(keepIndices_,
[&a, &b](ColumnIndex i) { return a[i] == b[i]; });
}

// _____________________________________________________________________________
template <size_t WIDTH>
IdTable Distinct::distinct(
IdTable dynInput,
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow) const {
AD_CONTRACT_CHECK(keepIndices_.size() <= dynInput.numColumns());
LOG(DEBUG) << "Distinct on " << dynInput.size() << " elements.\n";
IdTableStatic<WIDTH> result = std::move(dynInput).toStatic<WIDTH>();

// Variant of `std::ranges::unique` that allows to skip the begin rows of
// elements found in the previous table.
auto begin =
std::ranges::find_if(result, [this, &previousRow](const auto& row) {
// Without explicit this clang seems to
// think the this capture is redundant.
return !previousRow.has_value() ||
!this->matchesRow(row, previousRow.value());
});
auto end = result.end();

auto dest = result.begin();
if (begin == dest) {
// Optimization to avoid redundant move operations.
begin = std::ranges::adjacent_find(begin, end,
[this](const auto& a, const auto& b) {
// Without explicit this clang seems to
// think the this capture is redundant.
return this->matchesRow(a, b);
});
dest = begin;
if (begin != end) {
++begin;
}
} else if (begin != end) {
*dest = std::move(*begin);
}

if (begin != end) {
while (++begin != end) {
if (!matchesRow(*dest, *begin)) {
*++dest = std::move(*begin);
checkCancellation();
}
}
++dest;
}
checkCancellation();
result.erase(dest, end);
checkCancellation();

LOG(DEBUG) << "Distinct done.\n";
return std::move(result).toDynamic();
}

// _____________________________________________________________________________
template <size_t WIDTH>
IdTable Distinct::outOfPlaceDistinct(const IdTable& dynInput) const {
AD_CONTRACT_CHECK(keepIndices_.size() <= dynInput.numColumns());
LOG(DEBUG) << "Distinct on " << dynInput.size() << " elements.\n";
auto inputView = dynInput.asStaticView<WIDTH>();
IdTableStatic<WIDTH> output{dynInput.numColumns(), allocator()};

auto begin = inputView.begin();
auto end = inputView.end();
while (begin < end) {
int64_t allowedOffset = std::min(end - begin, CHUNK_SIZE);
begin = std::ranges::unique_copy(begin, begin + allowedOffset,
std::back_inserter(output),
[this](const auto& a, const auto& b) {
// Without explicit this clang seems to
// think the this capture is redundant.
return this->matchesRow(a, b);
})
.in;
checkCancellation();
// Skip to next unique value
do {
allowedOffset = std::min(end - begin, CHUNK_SIZE);
// This can only be called when dynInput is not empty, so `begin[-1]` is
// always valid.
auto lastRow = begin[-1];
begin = std::ranges::find_if(begin, begin + allowedOffset,
[this, &lastRow](const auto& row) {
// Without explicit this clang seems to
// think the this capture is redundant.
return !this->matchesRow(row, lastRow);
});
checkCancellation();
} while (begin != end && matchesRow(*begin, begin[-1]));
}

LOG(DEBUG) << "Distinct done.\n";
return std::move(output).toDynamic();
}
46 changes: 37 additions & 9 deletions src/engine/Distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@
// Author: Björn Buchhold ([email protected])
#pragma once

#include <utility>
#include <vector>

#include "engine/Operation.h"
#include "engine/QueryExecutionTree.h"
#include "parser/ParsedQuery.h"

using std::vector;

class Distinct : public Operation {
private:
std::shared_ptr<QueryExecutionTree> subtree_;
vector<ColumnIndex> _keepIndices;
std::vector<ColumnIndex> keepIndices_;

static constexpr int64_t CHUNK_SIZE = 100'000;

public:
Distinct(QueryExecutionContext* qec,
std::shared_ptr<QueryExecutionTree> subtree,
const vector<ColumnIndex>& keepIndices);
const std::vector<ColumnIndex>& keepIndices);

[[nodiscard]] size_t getResultWidth() const override;

[[nodiscard]] string getDescriptor() const override;

[[nodiscard]] vector<ColumnIndex> resultSortedOn() const override {
[[nodiscard]] std::vector<ColumnIndex> resultSortedOn() const override {
return subtree_->resultSortedOn();
}

Expand All @@ -46,15 +44,45 @@ class Distinct : public Operation {

bool knownEmptyResult() override { return subtree_->knownEmptyResult(); }

vector<QueryExecutionTree*> getChildren() override {
std::vector<QueryExecutionTree*> getChildren() override {
return {subtree_.get()};
}

protected:
[[nodiscard]] string getCacheKeyImpl() const override;

private:
ProtoResult computeResult([[maybe_unused]] bool requestLaziness) override;
ProtoResult computeResult(bool requestLaziness) override;

VariableToColumnMap computeVariableToColumnMap() const override;

// Helper function that only compares rows on the columns in `keepIndices_`.
bool matchesRow(const auto& a, const auto& b) const;

// Return a generator that applies an in-place unique algorithm to the
// `IdTables`s yielded by the input generator. The `yieldOnce` flag controls
// if every `IdTable` from `input` should yield it's own `IdTable` or if all
// of them should get aggregated into a single big `IdTable`.
template <size_t WIDTH>
Result::Generator lazyDistinct(Result::Generator input, bool yieldOnce) const;

// Removes all duplicates from input with regards to the columns
// in keepIndices. The input needs to be sorted on the keep indices,
// otherwise the result of this function is undefined. The argument
// `previousRow` might hold a row representing the last row of the previous
// `IdTable`, so that the `IdTable` that will be returned doesn't return
// values that were already returned in the previous `IdTable`.
template <size_t WIDTH>
IdTable distinct(
IdTable dynInput,
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow) const;

// Out-of-place implementation of the unique algorithm. Does only copy values
// if they're actually unique.
template <size_t WIDTH>
IdTable outOfPlaceDistinct(const IdTable& dynInput) const;

FRIEND_TEST(Distinct, distinct);
FRIEND_TEST(Distinct, distinctWithEmptyInput);
FRIEND_TEST(Distinct, testChunkEdgeCases);
};
31 changes: 0 additions & 31 deletions src/engine/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,37 +140,6 @@ class Engine {

static void sort(IdTable& idTable, const std::vector<ColumnIndex>& sortCols);

/**
* @brief Removes all duplicates from input with regards to the columns
* in keepIndices. The input needs to be sorted on the keep indices,
* otherwise the result of this function is undefined.
**/
template <size_t WIDTH>
static void distinct(const IdTable& dynInput,
const std::vector<ColumnIndex>& keepIndices,
IdTable* dynResult) {
LOG(DEBUG) << "Distinct on " << dynInput.size() << " elements.\n";
const IdTableView<WIDTH> input = dynInput.asStaticView<WIDTH>();
IdTableStatic<WIDTH> result = std::move(*dynResult).toStatic<WIDTH>();
result = input.clone();
if (!input.empty()) {
AD_CONTRACT_CHECK(keepIndices.size() <= input.numColumns());

auto last = std::unique(result.begin(), result.end(),
[&keepIndices](const auto& a, const auto& b) {
for (ColumnIndex i : keepIndices) {
if (a[i] != b[i]) {
return false;
}
}
return true;
});
result.erase(last, result.end());
}
*dynResult = std::move(result).toDynamic();
LOG(DEBUG) << "Distinct done.\n";
}

// Return the number of distinct rows in the `input`. The input must have all
// duplicates adjacent to each other (e.g. by being sorted), otherwise the
// behavior is undefined. `checkCancellation()` is invoked regularly and can
Expand Down
33 changes: 30 additions & 3 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "engine/LazyGroupBy.h"
#include "engine/Sort.h"
#include "engine/sparqlExpressions/AggregateExpression.h"
#include "engine/sparqlExpressions/CountStarExpression.h"
#include "engine/sparqlExpressions/GroupConcatExpression.h"
#include "engine/sparqlExpressions/LiteralExpression.h"
#include "engine/sparqlExpressions/SampleExpression.h"
Expand Down Expand Up @@ -1015,6 +1016,10 @@ GroupBy::isSupportedAggregate(sparqlExpression::SparqlExpression* expr) {

if (dynamic_cast<AvgExpression*>(expr)) return H{AVG};
if (dynamic_cast<CountExpression*>(expr)) return H{COUNT};
// We reuse the COUNT implementation which works, but leaves some optimization
// potential on the table because `COUNT(*)` doesn't need to check for
// undefined values.
if (dynamic_cast<CountStarExpression*>(expr)) return H{COUNT};
if (dynamic_cast<MinExpression*>(expr)) return H{MIN};
if (dynamic_cast<MaxExpression*>(expr)) return H{MAX};
if (dynamic_cast<SumExpression*>(expr)) return H{SUM};
Expand Down Expand Up @@ -1371,6 +1376,29 @@ void GroupBy::evaluateAlias(
}
}

// _____________________________________________________________________________
sparqlExpression::ExpressionResult
GroupBy::evaluateChildExpressionOfAggregateFunction(
const HashMapAggregateInformation& aggregate,
sparqlExpression::EvaluationContext& evaluationContext) {
// The code below assumes that DISTINCT is not supported yet.
AD_CORRECTNESS_CHECK(aggregate.expr_->isAggregate() ==
sparqlExpression::SparqlExpression::AggregateStatus::
NonDistinctAggregate);
// Evaluate child expression on block
auto exprChildren = aggregate.expr_->children();
// `COUNT(*)` is the only expression without children, so we fake the
// expression result in this case by providing an arbitrary, constant and
// defined value. This value will be verified as non-undefined by the
// `CountExpression` class and ignored afterward as long as `DISTINCT` is
// not set (which is not supported yet).
bool isCountStar =
dynamic_cast<sparqlExpression::CountStarExpression*>(aggregate.expr_);
AD_CORRECTNESS_CHECK(isCountStar || exprChildren.size() == 1);
return isCountStar ? Id::makeFromBool(true)
: exprChildren[0]->evaluate(&evaluationContext);
}

// _____________________________________________________________________________
template <size_t NUM_GROUP_COLUMNS>
IdTable GroupBy::createResultFromHashMap(
Expand Down Expand Up @@ -1497,10 +1525,9 @@ IdTable GroupBy::computeGroupByForHashMapOptimization(
aggregationTimer.cont();
for (auto& aggregateAlias : aggregateAliases) {
for (auto& aggregate : aggregateAlias.aggregateInfo_) {
// Evaluate child expression on block
auto exprChildren = aggregate.expr_->children();
sparqlExpression::ExpressionResult expressionResult =
exprChildren[0]->evaluate(&evaluationContext);
GroupBy::evaluateChildExpressionOfAggregateFunction(
aggregate, evaluationContext);

auto& aggregationDataVariant =
aggregationData.getAggregationDataVariant(
Expand Down
8 changes: 8 additions & 0 deletions src/engine/GroupBy.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,14 @@ class GroupBy : public Operation {
const HashMapAggregationData<NUM_GROUP_COLUMNS>& aggregationData,
LocalVocab* localVocab, const Allocator& allocator);

// Helper function to evaluate the child expression of an aggregate function.
// Only `COUNT(*)` does not have a single child, so we make a special case for
// it.
static sparqlExpression::ExpressionResult
evaluateChildExpressionOfAggregateFunction(
const HashMapAggregateInformation& aggregate,
sparqlExpression::EvaluationContext& evaluationContext);

// Sort the HashMap by key and create result table.
template <size_t NUM_GROUP_COLUMNS>
IdTable createResultFromHashMap(
Expand Down
Loading

0 comments on commit 60523c1

Please sign in to comment.