Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linear probing hash join clean #8

Open
wants to merge 11 commits into
base: my-feature
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/regression/micro_extended.csv
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ benchmark/micro/join/asof_join.benchmark
benchmark/micro/join/asof_join_small_probe.benchmark
benchmark/micro/join/blockwise_nl_join.benchmark
benchmark/micro/join/delim_join_no_blowup.benchmark
benchmark/micro/join/hashjoin_dups_rhs.benchmark
benchmark/micro/join/hashjoin_highcardinality.benchmark
benchmark/micro/join/hashjoin_lhsarithmetic.benchmark
benchmark/micro/join/iejoin_employees.benchmark
Expand Down
1 change: 1 addition & 0 deletions .sanitizer-thread-suppressions.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
deadlock:InitializeIndexes
race:NextInnerJoin
race:NextRightSemiOrAntiJoin
race:duckdb_moodycamel
race:duckdb_jemalloc
race:AddToEvictionQueue
Expand Down
16 changes: 16 additions & 0 deletions benchmark/micro/join/hashjoin_dups_rhs.benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# name: benchmark/micro/join/hashjoin_dups_rhs.benchmark
# description: Inner hash join using string comparisons with 4x duplicates on the rhs and 4096x duplicates on the lhs
# group: [join]

name Inner Join (dups on rhs)
group join

load
create table t1 as select 'verylargestring' || range % 32768 i from range(131072);
create table t2 as select 'verylargestring' || range % 32768 i from range(134217728);

run
select count(*) from t1 join t2 using (i)

result I
536870912
40 changes: 40 additions & 0 deletions src/common/row_operations/row_matcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,22 @@ void RowMatcher::Initialize(const bool no_match_sel, const TupleDataLayout &layo
}
}

void RowMatcher::Initialize(const bool no_match_sel, const TupleDataLayout &layout, const Predicates &predicates,
vector<column_t> &columns) {

// The columns must have the same size as the predicates vector
D_ASSERT(columns.size() == predicates.size());

// The largest column_id must be smaller than the number of types to not cause an out-of-bounds error
D_ASSERT(*max_element(columns.begin(), columns.end()) < layout.GetTypes().size());

match_functions.reserve(predicates.size());
for (idx_t idx = 0; idx < predicates.size(); idx++) {
column_t col_idx = columns[idx];
match_functions.push_back(GetMatchFunction(no_match_sel, layout.GetTypes()[col_idx], predicates[idx]));
}
}

idx_t RowMatcher::Match(DataChunk &lhs, const vector<TupleDataVectorFormat> &lhs_formats, SelectionVector &sel,
idx_t count, const TupleDataLayout &rhs_layout, Vector &rhs_row_locations,
SelectionVector *no_match_sel, idx_t &no_match_count) {
Expand All @@ -211,6 +227,30 @@ idx_t RowMatcher::Match(DataChunk &lhs, const vector<TupleDataVectorFormat> &lhs
return count;
}

idx_t RowMatcher::Match(DataChunk &lhs, const vector<TupleDataVectorFormat> &lhs_formats, SelectionVector &sel,
idx_t count, const TupleDataLayout &rhs_layout, Vector &rhs_row_locations,
SelectionVector *no_match_sel, idx_t &no_match_count, const vector<column_t> &columns) {
D_ASSERT(!match_functions.empty());

// The column_ids must have the same size as the match_functions vector
D_ASSERT(columns.size() == match_functions.size());

// The largest column_id must be smaller than the number columns to not cause an out-of-bounds error
D_ASSERT(*max_element(columns.begin(), columns.end()) < lhs.ColumnCount());

for (idx_t fun_idx = 0; fun_idx < match_functions.size(); fun_idx++) {
// if we only care about specific columns, we need to use the column_ids to get the correct column index
// otherwise, we just use the fun_idx
const auto col_idx = columns[fun_idx];

const auto &match_function = match_functions[fun_idx];
count =
match_function.function(lhs.data[col_idx], lhs_formats[col_idx], sel, count, rhs_layout, rhs_row_locations,
col_idx, match_function.child_functions, no_match_sel, no_match_count);
}
return count;
}

MatchFunction RowMatcher::GetMatchFunction(const bool no_match_sel, const LogicalType &type,
const ExpressionType predicate) {
return no_match_sel ? GetMatchFunction<true>(type, predicate) : GetMatchFunction<false>(type, predicate);
Expand Down
11 changes: 11 additions & 0 deletions src/common/types/row/tuple_data_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ void TupleDataCollection::InitializeChunk(DataChunk &chunk) const {
chunk.Initialize(allocator->GetAllocator(), layout.GetTypes());
}

void TupleDataCollection::InitializeChunk(DataChunk &chunk, const vector<column_t> &columns) const {
vector<LogicalType> chunk_types(columns.size());
// keep the order of the columns
for (idx_t i = 0; i < columns.size(); i++) {
auto column_idx = columns[i];
D_ASSERT(column_idx < layout.ColumnCount());
chunk_types[i] = layout.GetTypes()[column_idx];
}
chunk.Initialize(allocator->GetAllocator(), chunk_types);
}

void TupleDataCollection::InitializeScanChunk(TupleDataScanState &state, DataChunk &chunk) const {
auto &column_ids = state.chunk_state.column_ids;
D_ASSERT(!column_ids.empty());
Expand Down
13 changes: 7 additions & 6 deletions src/execution/aggregate_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "duckdb/common/types/row/tuple_data_iterator.hpp"
#include "duckdb/common/vector_operations/vector_operations.hpp"
#include "duckdb/execution/expression_executor.hpp"
#include "duckdb/execution/ht_entry.hpp"
#include "duckdb/planner/expression/bound_aggregate_expression.hpp"

namespace duckdb {
Expand Down Expand Up @@ -146,15 +147,15 @@ void GroupedAggregateHashTable::Verify() {
continue;
}
auto hash = Load<hash_t>(entry.GetPointer() + hash_offset);
D_ASSERT(entry.GetSalt() == aggr_ht_entry_t::ExtractSalt(hash));
D_ASSERT(entry.GetSalt() == ht_entry_t::ExtractSalt(hash));
total_count++;
}
D_ASSERT(total_count == Count());
#endif
}

void GroupedAggregateHashTable::ClearPointerTable() {
std::fill_n(entries, capacity, aggr_ht_entry_t(0));
std::fill_n(entries, capacity, ht_entry_t::GetEmptyEntry());
}

void GroupedAggregateHashTable::ResetCount() {
Expand All @@ -173,8 +174,8 @@ void GroupedAggregateHashTable::Resize(idx_t size) {
}

capacity = size;
hash_map = buffer_manager.GetBufferAllocator().Allocate(capacity * sizeof(aggr_ht_entry_t));
entries = reinterpret_cast<aggr_ht_entry_t *>(hash_map.get());
hash_map = buffer_manager.GetBufferAllocator().Allocate(capacity * sizeof(ht_entry_t));
entries = reinterpret_cast<ht_entry_t *>(hash_map.get());
ClearPointerTable();
bitmask = capacity - 1;

Expand All @@ -201,7 +202,7 @@ void GroupedAggregateHashTable::Resize(idx_t size) {
}
auto &entry = entries[entry_idx];
D_ASSERT(!entry.IsOccupied());
entry.SetSalt(aggr_ht_entry_t::ExtractSalt(hash));
entry.SetSalt(ht_entry_t::ExtractSalt(hash));
entry.SetPointer(row_location);
D_ASSERT(entry.IsOccupied());
}
Expand Down Expand Up @@ -333,7 +334,7 @@ idx_t GroupedAggregateHashTable::FindOrCreateGroupsInternal(DataChunk &groups, V
const auto &hash = hashes[r];
ht_offsets[r] = ApplyBitMask(hash);
D_ASSERT(ht_offsets[r] == hash % capacity);
hash_salts[r] = aggr_ht_entry_t::ExtractSalt(hash);
hash_salts[r] = ht_entry_t::ExtractSalt(hash);
}

// we start out with all entries [0, 1, 2, ..., groups.size()]
Expand Down
Loading
Loading