Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hash-map based GROUP BY can now process lazy inputs #1651

Merged
merged 7 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 89 additions & 67 deletions src/engine/GroupBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) {
if (useHashMapOptimization) {
const auto* child = _subtree->getRootOperation()->getChildren().at(0);
// Skip sorting
subresult = child->getResult();
subresult = child->getResult(true);
// Update runtime information
auto runTimeInfoChildren =
child->getRootOperation()->getRuntimeInfoPointer();
Expand Down Expand Up @@ -366,13 +366,25 @@ ProtoResult GroupBy::computeResult(bool requestLaziness) {
}

if (useHashMapOptimization) {
auto localVocab = subresult->getCopyOfLocalVocab();
IdTable idTable = CALL_FIXED_SIZE(
groupByCols.size(), &GroupBy::computeGroupByForHashMapOptimization,
this, metadataForUnsequentialData->aggregateAliases_,
subresult->idTable(), groupByCols, &localVocab);
auto computeWithHashMap = [this, &metadataForUnsequentialData,
&groupByCols](auto&& subresults) {
auto doCompute = [&]<int NumCols> {
return computeGroupByForHashMapOptimization<NumCols>(
metadataForUnsequentialData->aggregateAliases_, AD_FWD(subresults),
groupByCols);
};
return ad_utility::callFixedSize(groupByCols.size(), doCompute);
};

return {std::move(idTable), resultSortedOn(), std::move(localVocab)};
if (subresult->isFullyMaterialized()) {
// `computeWithHashMap` takes a range, so we artificially create one with
// a single input.
return computeWithHashMap(
std::array{std::pair{std::cref(subresult->idTable()),
std::cref(subresult->localVocab())}});
} else {
return computeWithHashMap(std::move(subresult->idTables()));
}
}

size_t inWidth = _subtree->getResultWidth();
Expand Down Expand Up @@ -846,7 +858,7 @@ std::optional<IdTable> GroupBy::computeGroupByForJoinWithFullScan() const {
const auto& index = getExecutionContext()->getIndex();

// TODO<joka921, C++23> Simplify the following pattern by using
// `ql::views::chunkd_by` and implement a lazy version of this view for
// `ql::views::chunk_by` and implement a lazy version of this view for
// input iterators.

// Take care of duplicate values in the input.
Expand Down Expand Up @@ -1487,78 +1499,88 @@ static constexpr auto makeProcessGroupsVisitor =

// _____________________________________________________________________________
template <size_t NUM_GROUP_COLUMNS>
IdTable GroupBy::computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases,
const IdTable& subresult, const std::vector<size_t>& columnIndices,
LocalVocab* localVocab) const {
AD_CONTRACT_CHECK(columnIndices.size() == NUM_GROUP_COLUMNS ||
NUM_GROUP_COLUMNS == 0);
Result GroupBy::computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases, auto subresults,
const std::vector<size_t>& columnIndices) const {
AD_CORRECTNESS_CHECK(columnIndices.size() == NUM_GROUP_COLUMNS ||
NUM_GROUP_COLUMNS == 0);
LocalVocab localVocab;

// Initialize aggregation data
HashMapAggregationData<NUM_GROUP_COLUMNS> aggregationData(
getExecutionContext()->getAllocator(), aggregateAliases,
columnIndices.size());

// Initialize evaluation context
sparqlExpression::EvaluationContext evaluationContext(
*getExecutionContext(), _subtree->getVariableColumns(), subresult,
getExecutionContext()->getAllocator(), *localVocab, cancellationHandle_,
deadline_);

evaluationContext._groupedVariables = ad_utility::HashSet<Variable>{
_groupByVariables.begin(), _groupByVariables.end()};
evaluationContext._isPartOfGroupBy = true;

ad_utility::Timer lookupTimer{ad_utility::Timer::Stopped};
ad_utility::Timer aggregationTimer{ad_utility::Timer::Stopped};
for (size_t i = 0; i < subresult.size(); i += GROUP_BY_HASH_MAP_BLOCK_SIZE) {
checkCancellation();

evaluationContext._beginIndex = i;
evaluationContext._endIndex =
std::min(i + GROUP_BY_HASH_MAP_BLOCK_SIZE, subresult.size());

auto currentBlockSize = evaluationContext.size();

// Perform HashMap lookup once for all groups in current block
using U = HashMapAggregationData<NUM_GROUP_COLUMNS>::template ArrayOrVector<
std::span<const Id>>;
U groupValues;
resizeIfVector(groupValues, columnIndices.size());

// TODO<C++23> use views::enumerate
size_t j = 0;
for (auto& idx : columnIndices) {
groupValues[j] = subresult.getColumn(idx).subspan(
evaluationContext._beginIndex, currentBlockSize);
++j;
}
lookupTimer.cont();
auto hashEntries = aggregationData.getHashEntries(groupValues);
lookupTimer.stop();

aggregationTimer.cont();
for (auto& aggregateAlias : aggregateAliases) {
for (auto& aggregate : aggregateAlias.aggregateInfo_) {
sparqlExpression::ExpressionResult expressionResult =
GroupBy::evaluateChildExpressionOfAggregateFunction(
aggregate, evaluationContext);

auto& aggregationDataVariant =
aggregationData.getAggregationDataVariant(
aggregate.aggregateDataIndex_);

std::visit(makeProcessGroupsVisitor(currentBlockSize,
&evaluationContext, hashEntries),
std::move(expressionResult), aggregationDataVariant);
for (const auto& [inputTableRef, inputLocalVocabRef] : subresults) {
// Also support `std::reference_wrapper` as the input.
const IdTable& inputTable = inputTableRef;
const LocalVocab& inputLocalVocab = inputLocalVocabRef;

localVocab.mergeWith(std::span{&inputLocalVocab, 1});
// Initialize evaluation context
sparqlExpression::EvaluationContext evaluationContext(
*getExecutionContext(), _subtree->getVariableColumns(), inputTable,
getExecutionContext()->getAllocator(), localVocab, cancellationHandle_,
deadline_);

evaluationContext._groupedVariables = ad_utility::HashSet<Variable>{
_groupByVariables.begin(), _groupByVariables.end()};
evaluationContext._isPartOfGroupBy = true;

for (size_t i = 0; i < inputTable.size();
i += GROUP_BY_HASH_MAP_BLOCK_SIZE) {
checkCancellation();

evaluationContext._beginIndex = i;
evaluationContext._endIndex =
std::min(i + GROUP_BY_HASH_MAP_BLOCK_SIZE, inputTable.size());

auto currentBlockSize = evaluationContext.size();

// Perform HashMap lookup once for all groups in current block
using U = HashMapAggregationData<
NUM_GROUP_COLUMNS>::template ArrayOrVector<std::span<const Id>>;
U groupValues;
resizeIfVector(groupValues, columnIndices.size());

// TODO<C++23> use views::enumerate
size_t j = 0;
for (auto& idx : columnIndices) {
groupValues[j] = inputTable.getColumn(idx).subspan(
evaluationContext._beginIndex, currentBlockSize);
++j;
}
lookupTimer.cont();
auto hashEntries = aggregationData.getHashEntries(groupValues);
lookupTimer.stop();

aggregationTimer.cont();
for (auto& aggregateAlias : aggregateAliases) {
for (auto& aggregate : aggregateAlias.aggregateInfo_) {
sparqlExpression::ExpressionResult expressionResult =
GroupBy::evaluateChildExpressionOfAggregateFunction(
aggregate, evaluationContext);

auto& aggregationDataVariant =
aggregationData.getAggregationDataVariant(
aggregate.aggregateDataIndex_);

std::visit(makeProcessGroupsVisitor(currentBlockSize,
&evaluationContext, hashEntries),
std::move(expressionResult), aggregationDataVariant);
}
}
aggregationTimer.stop();
}
aggregationTimer.stop();
}

runtimeInfo().addDetail("timeMapLookup", lookupTimer.msecs());
runtimeInfo().addDetail("timeAggregation", aggregationTimer.msecs());

return createResultFromHashMap(aggregationData, aggregateAliases, localVocab);
IdTable resultTable =
createResultFromHashMap(aggregationData, aggregateAliases, &localVocab);
return {std::move(resultTable), resultSortedOn(), std::move(localVocab)};
}

// _____________________________________________________________________________
Expand Down
6 changes: 3 additions & 3 deletions src/engine/GroupBy.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,10 @@ class GroupBy : public Operation {
// Create result IdTable by using a HashMap mapping groups to aggregation data
// and subsequently calling `createResultFromHashMap`.
template <size_t NUM_GROUP_COLUMNS>
IdTable computeGroupByForHashMapOptimization(
Result computeGroupByForHashMapOptimization(
std::vector<HashMapAliasInformation>& aggregateAliases,
const IdTable& subresult, const std::vector<size_t>& columnIndices,
LocalVocab* localVocab) const;
auto subresults,
const std::vector<size_t>& columnIndices) const;

using AggregationData =
std::variant<AvgAggregationData, CountAggregationData, MinAggregationData,
Expand Down
46 changes: 46 additions & 0 deletions test/GroupByTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Authors: Florian Kramer ([email protected])
// Johannes Kalmbach ([email protected])

#include <engine/SpatialJoinAlgorithms.h>
#include <gmock/gmock.h>

#include <cstdio>
Expand Down Expand Up @@ -36,6 +37,7 @@ using ::testing::Optional;

namespace {
auto I = IntId;
auto D = DoubleId;

// Return a matcher that checks, whether a given `std::optional<IdTable` has a
// value and that value is equal to `makeIdTableFromVector(table)`.
Expand Down Expand Up @@ -747,6 +749,50 @@ TEST_F(GroupByOptimizations, correctResultForHashMapOptimization) {
resultWithoutOptimization->asDebugString());
}

// _____________________________________________________________________________
TEST_F(GroupByOptimizations, hashMapOptimizationLazyAndMaterializedInputs) {
/* Setup query:
SELECT ?x (AVG(?y) as ?avg) WHERE {
# explicitly defined subresult.
} GROUP BY ?x
*/
// Setup three unsorted input blocks. The first column will be the grouped
// `?x`, and the second column the variable `?y` of which we compute the
// average.
auto runTest = [this](bool inputIsLazy) {
std::vector<IdTable> tables;
tables.push_back(makeIdTableFromVector({{3, 6}, {8, 27}, {5, 7}}, I));
tables.push_back(makeIdTableFromVector({{8, 27}, {5, 9}}, I));
tables.push_back(makeIdTableFromVector({{5, 2}, {3, 4}}, I));
// The expected averages are as follows: (3 -> 5.0), (5 -> 6.0), (8
// -> 27.0).
auto subtree = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, std::move(tables),
std::vector<std::optional<Variable>>{Variable{"?x"}, Variable{"?y"}});
auto& values =
dynamic_cast<ValuesForTesting&>(*subtree->getRootOperation());
values.forceFullyMaterialized() = !inputIsLazy;

SparqlExpressionPimpl avgYPimpl = makeAvgPimpl(varY);
std::vector<Alias> aliasesAvgY{Alias{avgYPimpl, Variable{"?avg"}}};

// Calculate result with optimization
qec->getQueryTreeCache().clearAll();
RuntimeParameters().set<"group-by-hash-map-enabled">(true);
GroupBy groupBy{qec, variablesOnlyX, aliasesAvgY, std::move(subtree)};
auto result = groupBy.computeResultOnlyForTesting();
ASSERT_TRUE(result.isFullyMaterialized());
EXPECT_THAT(
result.idTable(),
matchesIdTableFromVector({{I(3), D(5)}, {I(5), D(6)}, {I(8), D(27)}}));
};
runTest(true);
runTest(false);

// Disable optimization for following tests
RuntimeParameters().set<"group-by-hash-map-enabled">(false);
}

// _____________________________________________________________________________
TEST_F(GroupByOptimizations, correctResultForHashMapOptimizationForCountStar) {
/* Setup query:
Expand Down
2 changes: 2 additions & 0 deletions test/engine/ValuesForTesting.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class ValuesForTesting : public Operation {
}
bool supportsLimit() const override { return supportsLimit_; }

bool& forceFullyMaterialized() { return forceFullyMaterialized_; }

private:
// ___________________________________________________________________________
string getCacheKeyImpl() const override {
Expand Down
Loading