Skip to content

Commit

Permalink
Introduction of collision bit
Browse files Browse the repository at this point in the history
  • Loading branch information
gropaul committed Oct 31, 2024
1 parent 4ba2e66 commit b3b1790
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 20 deletions.
65 changes: 54 additions & 11 deletions src/execution/join_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ static inline void GetRowPointersInternal(DataChunk &keys, TupleDataChunkState &
const auto row_index = remaining_sel->get_index(i);

idx_t &ht_offset = ht_offsets[row_index];

bool occupied;
bool salt_match;
bool entry_has_collision;

ht_entry_t entry;

if (USE_SALTS) {
Expand All @@ -231,14 +235,20 @@ static inline void GetRowPointersInternal(DataChunk &keys, TupleDataChunkState &
while (true) {
entry = entries[ht_offset];
occupied = entry.IsOccupied();
bool salt_match = entry.GetSalt() == row_salt;
salt_match = entry.GetSalt() == row_salt;

entry_has_collision = entry.HasCollision();

// condition for incrementing the ht_offset: occupied and row_salt does not match -> move to next
// entry
// condition for incrementing the ht_offset: occupied and salt does not match and entry has
// collision reverse the condition to break out of the loop
if (!occupied || salt_match) {
break;
}

if (!entry_has_collision) {
break;
}

IncrementAndWrap(ht_offset, ht->bitmask);
}
} else {
Expand All @@ -249,11 +259,26 @@ static inline void GetRowPointersInternal(DataChunk &keys, TupleDataChunkState &
// the entries we need to process in the next iteration are the ones that are occupied and the row_salt
// does not match, the ones that are empty need no further processing
state.salt_match_sel.set_index(salt_match_count, row_index);
salt_match_count += occupied;

// entry might be empty, so the pointer in the entry is nullptr, but this does not matter as the row
// will not be compared anyway as with an empty entry we are already done
row_ptr_insert_to[row_index] = entry.GetPointerOrNull();
// will not be compared anyway as with an empty entry we are already done. However, if we leave the loop
// because there is no collision, we also have no result, therefore we set the pointer to nullptr if
// entry_has_collision is false
if (USE_SALTS){

if (occupied && !salt_match && !entry_has_collision) {
// this is the case where we stopped because we had no collision
row_ptr_insert_to[row_index] = nullptr;
salt_match_count += 0;
} else {
// here we stopped because (a) we found an empty entry or (b) we found a matching salt
row_ptr_insert_to[row_index] = entry.GetPointerOrNull();
salt_match_count += occupied;
}
} else {
row_ptr_insert_to[row_index] = entry.GetPointerOrNull();
salt_match_count += occupied;
}
}

if (salt_match_count != 0) {
Expand Down Expand Up @@ -288,6 +313,7 @@ static inline void GetRowPointersInternal(DataChunk &keys, TupleDataChunkState &
}

inline bool JoinHashTable::UseSalt() const {
return true;
// only use salt for large hash tables and if there is only one equality condition as otherwise
// we potentially need to compare multiple keys
return this->capacity > USE_SALT_THRESHOLD && this->equality_predicate_columns.size() == 1;
Expand Down Expand Up @@ -456,7 +482,7 @@ static inline data_ptr_t InsertRowToEntry(atomic<ht_entry_t> &entry, const data_
// add nullptr to the end of the list to mark the end
StorePointer(nullptr, row_ptr_to_insert + pointer_offset);

ht_entry_t new_empty_entry = ht_entry_t::GetDesiredEntry(row_ptr_to_insert, salt);
ht_entry_t new_empty_entry = ht_entry_t::GetNewEntry(row_ptr_to_insert, salt);
ht_entry_t expected_empty_entry = ht_entry_t::GetEmptyEntry();
entry.compare_exchange_strong(expected_empty_entry, new_empty_entry, std::memory_order_acquire,
std::memory_order_relaxed);
Expand All @@ -469,13 +495,14 @@ static inline data_ptr_t InsertRowToEntry(atomic<ht_entry_t> &entry, const data_
// if we expect the entry to be full, we know that even if the insert fails the keys still match so we can
// just keep trying until we succeed
ht_entry_t expected_current_entry = entry.load(std::memory_order_relaxed);
ht_entry_t desired_new_entry = ht_entry_t::GetDesiredEntry(row_ptr_to_insert, salt);
D_ASSERT(expected_current_entry.IsOccupied());

ht_entry_t desired_updated_entry = ht_entry_t::UpdateWithPointer(expected_current_entry, row_ptr_to_insert);

do {
data_ptr_t current_row_pointer = expected_current_entry.GetPointer();
StorePointer(current_row_pointer, row_ptr_to_insert + pointer_offset);
} while (!entry.compare_exchange_weak(expected_current_entry, desired_new_entry, std::memory_order_release,
} while (!entry.compare_exchange_weak(expected_current_entry, desired_updated_entry, std::memory_order_release,
std::memory_order_relaxed));

return nullptr;
Expand All @@ -485,7 +512,12 @@ static inline data_ptr_t InsertRowToEntry(atomic<ht_entry_t> &entry, const data_
ht_entry_t current_entry = entry.load(std::memory_order_relaxed);
data_ptr_t current_row_pointer = current_entry.GetPointerOrNull();
StorePointer(current_row_pointer, row_ptr_to_insert + pointer_offset);
entry = ht_entry_t::GetDesiredEntry(row_ptr_to_insert, salt);

if (EXPECT_EMPTY) {
entry = ht_entry_t::GetNewEntry(row_ptr_to_insert, salt);
} else {
entry = ht_entry_t::UpdateWithPointer(current_entry, row_ptr_to_insert);
}
return nullptr;
}
}
Expand Down Expand Up @@ -537,15 +569,25 @@ static inline void InsertMatchesAndIncrementMisses(atomic<ht_entry_t> entries[],
InsertRowToEntry<PARALLEL, false>(entry, row_ptr_to_insert, salt, ht.pointer_offset);
}

// Linear probing: each of the entries that do not match move to the next entry in the HT
// Linear probing: each of the entries that do not match move to the next entry in the HT, also we mark them with
// the collision bit
for (idx_t i = 0; i < key_no_match_count; i++) {
const auto need_compare_idx = state.key_no_match_sel.get_index(i);
const auto entry_index = state.salt_match_sel.get_index(need_compare_idx);

// mark the entry as collided, we don't need to care about thread synchronisation as the mark is an OR operation
// and the worst case is that we mark the same entry multiple times
const auto &ht_offset = ht_offsets_and_salts[entry_index] & ht_entry_t::POINTER_MASK;
auto &atomic_entry = entries[ht_offset];
ht_entry_t::MarkAsCollided(atomic_entry);

// increment the ht_offset of the entry
idx_t &ht_offset_and_salt = ht_offsets_and_salts[entry_index];
IncrementAndWrap(ht_offset_and_salt, capacity_mask);

// add the entry to the remaining sel vector to get processed in the next loop iteration
state.remaining_sel.set_index(i, entry_index);

}
}

Expand Down Expand Up @@ -621,6 +663,7 @@ static void InsertHashesLoop(atomic<ht_entry_t> entries[], Vector &row_locations
break;
}

ht_entry_t::MarkAsCollided(atomic_entry);
IncrementAndWrap(ht_offset_and_salt, capacity_mask);
}

Expand Down
10 changes: 10 additions & 0 deletions src/include/duckdb/common/types/hash.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,17 @@ struct interval_t; // NOLINT
// bias
// see: https://nullprogram.com/blog/2018/07/31/

inline hash_t TempMod10(uint64_t x) {
uint64_t modulo = x % 10;
uint64_t hash = modulo;
// add salt as well, same as module but from bit 48
uint64_t salt = x << 48;

return hash + salt;
}

inline hash_t MurmurHash64(uint64_t x) {
// return TempMod10(x);
x ^= x >> 32;
x *= 0xd6e8feb86659fd93U;
x ^= x >> 32;
Expand Down
55 changes: 50 additions & 5 deletions src/include/duckdb/execution/ht_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ namespace duckdb {
*/
struct ht_entry_t { // NOLINT
public:
static constexpr const hash_t COLLISION_BIT_MASK = 0x8000000000000000;
//! Upper 16 bits are salt
static constexpr const hash_t SALT_MASK = 0xFFFF000000000000;
static constexpr const hash_t SALT_MASK = 0x7FFF000000000000;
//! Lower 48 bits are the pointer
static constexpr const hash_t POINTER_MASK = 0x0000FFFFFFFFFFFF;

Expand All @@ -37,6 +38,22 @@ struct ht_entry_t { // NOLINT
return value != 0;
}

inline bool HasCollision() const {
return (value & COLLISION_BIT_MASK) != 0;
}

inline static void MarkAsCollided(std::atomic<ht_entry_t> &entry) {

auto current = entry.load(std::memory_order_relaxed);
ht_entry_t desired_entry;

do {
auto desired_value = current.value | COLLISION_BIT_MASK;
desired_entry = ht_entry_t(desired_value);
} while (!entry.compare_exchange_weak(current, desired_entry, std::memory_order_relaxed));
}


// Returns a pointer based on the stored value without checking cell occupancy.
// This can return a nullptr if the cell is not occupied.
inline data_ptr_t GetPointerOrNull() const {
Expand All @@ -58,12 +75,12 @@ struct ht_entry_t { // NOLINT
value &= cast_pointer_to_uint64(pointer) | SALT_MASK;
}

// Returns the salt, leaves upper salt bits intact, sets lower bits to all 1's
// Returns the salt, leaves upper salt bits intact, sets other bits to all 1's
static inline hash_t ExtractSalt(hash_t hash) {
return hash | POINTER_MASK;
return hash | ~SALT_MASK;
}

// Returns the salt, leaves upper salt bits intact, sets lower bits to all 0's
// Returns the salt, leaves upper salt bits intact, sets other bits to all 0's
static inline hash_t ExtractSaltWithNulls(hash_t hash) {
return hash & SALT_MASK;
}
Expand All @@ -81,11 +98,39 @@ struct ht_entry_t { // NOLINT
value = salt;
}

static inline ht_entry_t GetDesiredEntry(const data_ptr_t &pointer, const hash_t &salt) {


static inline ht_entry_t GetNewEntry(const data_ptr_t &pointer, const hash_t &salt) {
auto desired = cast_pointer_to_uint64(pointer) | (salt & SALT_MASK);
return ht_entry_t(desired);
}

/// Keeps the salt and the Collision bit intact, but updates the pointer
static inline ht_entry_t UpdateWithPointer(const ht_entry_t &entry, const data_ptr_t &pointer) {

// slot must be occupied and a pointer and salt
D_ASSERT(entry.IsOccupied());
data_ptr_t current_pointer = entry.GetPointer();
D_ASSERT(current_pointer != nullptr);
hash_t salt = ExtractSaltWithNulls(entry.value);
D_ASSERT(salt != 0);

// set the pointer bits in entry to zero
auto value_without_pointer = entry.value & ~POINTER_MASK;

// now update the bits with the new pointer
auto desired_value = value_without_pointer | cast_pointer_to_uint64(pointer);
auto desired = ht_entry_t(desired_value);

// check if the collision bit is kept intact
bool has_collision = entry.HasCollision();
bool has_collision_desired = desired.HasCollision();
D_ASSERT(has_collision == has_collision_desired);

return desired;

}

static inline ht_entry_t GetEmptyEntry() {
return ht_entry_t(0);
}
Expand Down
4 changes: 0 additions & 4 deletions test/sql/copy/csv/auto/test_auto_cranlogs.test
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ SELECT * FROM cranlogs LIMIT 5;
2013-06-15 00:58:50 501915 2.15.3 x86_64 linux-gnu animation 2.2 IN 4
2013-06-15 00:14:52 254933 3.0.1 x86_64 linux-gnu foreign 0.8-54 HK 5


statement ok
PRAGMA verify_parallelism

statement ok
CREATE TABLE cranlogs2 AS SELECT * FROM read_csv_auto ('data/csv/real/tmp2013-06-15.csv.gz');

Expand Down

0 comments on commit b3b1790

Please sign in to comment.