Skip to content

Commit

Permalink
test: really strict predication approach
Browse files Browse the repository at this point in the history
  • Loading branch information
gropaul committed Mar 6, 2024
1 parent 159a28d commit 752d8a5
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 46 deletions.
100 changes: 55 additions & 45 deletions src/execution/join_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ using ProbeSpillLocalState = JoinHashTable::ProbeSpillLocalAppendState;

JoinHashTable::ProbeState::ProbeState()
: hash_salts_v(LogicalType::UBIGINT), ht_offsets_v(LogicalType::UBIGINT), row_ptr_insert_to_v(LogicalType::POINTER),
key_no_match_sel(STANDARD_VECTOR_SIZE), salt_match_sel(STANDARD_VECTOR_SIZE) {
key_no_match_sel(STANDARD_VECTOR_SIZE), salt_match_sel(STANDARD_VECTOR_SIZE),
salt_no_match_sel(STANDARD_VECTOR_SIZE) {
}

JoinHashTable::InsertState::InsertState() : remaining_sel(STANDARD_VECTOR_SIZE) {
JoinHashTable::InsertState::InsertState() : key_remaining_sel(STANDARD_VECTOR_SIZE), empty_sel(STANDARD_VECTOR_SIZE) {
}

JoinHashTable::JoinHashTable(BufferManager &buffer_manager_p, const vector<JoinCondition> &conditions_p,
Expand Down Expand Up @@ -489,58 +490,65 @@ static void InsertHashesLoop(atomic<aggr_ht_entry_t> entries[], Vector row_locat
const SelectionVector *remaining_sel = FlatVector::IncrementalSelectionVector();
idx_t remaining_count = count;

while (true) {
while (remaining_count > 0) {

idx_t salt_match_count = 0;
idx_t salt_no_match_count = 0;

idx_t empty_count = 0;
idx_t key_no_match_count = 0;

// iterate over each entry to find out whether it belongs to an existing list or will start
// a new list
for (idx_t i = 0; i < remaining_count; i++) {

const auto row_index = remaining_sel->get_index(i);
auto &ht_offset = ht_offsets[row_index];
auto salt = hash_salts[row_index];

idx_t increment;
atomic<aggr_ht_entry_t> &atomic_entry = entries[ht_offset];
aggr_ht_entry_t entry = atomic_entry.load();

// increment the ht_offset of the entry as long as next entry is occupied and salt does not match
do {
atomic<aggr_ht_entry_t> &atomic_entry = entries[ht_offset];
aggr_ht_entry_t entry = atomic_entry.load();
bool occupied = entry.IsOccupied();
bool salt_match = entry.GetSalt() == hash_salts[row_index];

bool occupied = entry.IsOccupied();
bool empty = !occupied;
bool occupied_and_salt_match = occupied && salt_match;
bool occupied_and_salt_no_match = occupied && !salt_match;

// if the entry is empty, we can insert the row and stop the loop for this entry
if (!occupied) {
data_ptr_t row_ptr = row_ptrs_to_insert[row_index];
bool successful_insertion =
InsertRowToEntry<PARALLEL, true>(atomic_entry, row_ptr, salt, pointer_offset);

// if the insertion was successful, we can stop the loop for this entry
if (successful_insertion) {
break;
}
// if the insertion was not successful, the entry was occupied in the meantime
else {
occupied = true;
entry = atomic_entry.load();
}
}
state.empty_sel.set_index(empty_count, row_index);
empty_count += empty;

bool salt_match = entry.GetSalt() == hash_salts[row_index];
state.salt_match_sel.set_index(salt_match_count, row_index);
salt_match_count += salt_match;
state.salt_match_sel.set_index(salt_match_count, row_index);
salt_match_count += occupied_and_salt_match;

// condition for incrementing the ht_offset: occupied and salt does not match -> move to next entry
increment = !salt_match;
IncrementAndWrap(ht_offset, increment, bitmask);
} while (increment);
state.salt_no_match_sel.set_index(salt_no_match_count, row_index);
salt_no_match_count += occupied_and_salt_no_match;

// we can increment the ones with no match right away
IncrementAndWrap(ht_offset, occupied_and_salt_no_match, bitmask);
}

// for each empty entry, insert the row
for (idx_t i = 0; i < empty_count; i++) {
const auto row_index = state.empty_sel.get_index(i);

data_ptr_t row_ptr = row_ptrs_to_insert[row_index];
idx_t &ht_offset = ht_offsets[row_index];
hash_t salt = hash_salts[row_index];

atomic<aggr_ht_entry_t> &atomic_entry = entries[ht_offset];
bool successful_insertion = InsertRowToEntry<PARALLEL, true>(atomic_entry, row_ptr, salt, pointer_offset);

// if the insertion is not successful, the entry was occupied in the meantime (only in parallel mode)
if (PARALLEL) {
state.salt_match_sel.set_index(salt_match_count, row_index);
salt_match_count += !successful_insertion;
}
}

// at this step, for all the pointers_result_v we stepped either until we found an empty entry or an entry with
// a matching salt, we need to compare the keys for the ones that have a matching salt
if (salt_match_count == 0) {
break;
} else {
if (salt_match_count != 0) {

// Get the data for the rows that need to be compared
DataChunk lhs_data;
Expand Down Expand Up @@ -568,15 +576,12 @@ static void InsertHashesLoop(atomic<aggr_ht_entry_t> entries[], Vector row_locat
row_ptr_insert_to[need_compare_idx] = entry.load().GetPointer();
}

// todo: Gather only the columns that are needed for the comparison
// Perform row comparisons

SelectionVector match_sel(STANDARD_VECTOR_SIZE);
for (idx_t i = 0; i < salt_match_count; i++) {
match_sel.set_index(i, i);
}

idx_t key_no_match_count = 0;
key_no_match_count = 0;
idx_t key_match_count =
row_matcher_build.Match(lhs_data, lhs_formats, match_sel, salt_match_count, layout,
state.row_ptr_insert_to_v, &state.key_no_match_sel, key_no_match_count);
Expand Down Expand Up @@ -606,14 +611,19 @@ static void InsertHashesLoop(atomic<aggr_ht_entry_t> entries[], Vector row_locat
ht_offset = 0;
}

state.remaining_sel.set_index(i, entry_index);
state.key_remaining_sel.set_index(i, entry_index);
}
}

// update the overall selection vector to only point the entries that still need to be inserted
// as there was no match found for them yet
remaining_sel = &state.remaining_sel;
remaining_count = key_no_match_count;
// the remaining are the ones where the salt did not match or the ones that keys did not match
// add the salt no match to the key remaining sel
for (idx_t i = 0; i < salt_no_match_count; i++) {
const auto row_index = state.salt_no_match_sel.get_index(i);
state.key_remaining_sel.set_index(key_no_match_count + i, row_index);
}

remaining_count = salt_no_match_count + key_no_match_count;
remaining_sel = &state.key_remaining_sel;
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/include/duckdb/execution/join_hashtable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,15 @@ class JoinHashTable {
// 2. Entry is full and salt matches -> compare the keys
// 3. Entry is full and salt does not match -> continue probing
SelectionVector salt_match_sel;
SelectionVector salt_no_match_sel;
};

struct InsertState : ProbeState {
InsertState();

/// Because of the index hick up
SelectionVector remaining_sel;
SelectionVector key_remaining_sel;
SelectionVector empty_sel;
};

JoinHashTable(BufferManager &buffer_manager, const vector<JoinCondition> &conditions,
Expand Down

0 comments on commit 752d8a5

Please sign in to comment.