diff --git a/.github/regression/join.csv b/.github/regression/join.csv new file mode 100644 index 000000000000..ab0131209be1 --- /dev/null +++ b/.github/regression/join.csv @@ -0,0 +1,3 @@ +benchmark/micro/join/asof_join.benchmark +benchmark/micro/join/blockwise_nl_join.benchmark +benchmark/micro/join/range_join_small_rhs.benchmark diff --git a/scripts/local_regression_test_runner.py b/scripts/local_regression_test_runner.py new file mode 100644 index 000000000000..64466edb5bed --- /dev/null +++ b/scripts/local_regression_test_runner.py @@ -0,0 +1,117 @@ +import subprocess +import os +import sys +from typing import Literal + +# Builds the benchmark runner of the current and the main branch and runs the benchmarks. + + +DEFAULT_RUNNER_PATH = "build/release/benchmark/benchmark_runner" # this is whats getting build by default +NEW_RUNNER_PATH = "build/release/benchmark/benchmark_runner_new" # from local branch +OLD_RUNNER_PATH = "build/release/benchmark/benchmark_runner_old" # from main branch + + +def build(stash_changes: bool = False): + original_branch = get_current_branch() + + # Execute git status with the --porcelain option + output = subprocess.check_output(['git', 'status', '--porcelain']).decode('utf-8') + + # Filter out lines that start with "??" (untracked files) + changes = [line for line in output.strip().split('\n') if not line.startswith('??')] + auto_stashed = False + if changes and stash_changes: + print("Stashing changes") + subprocess.check_output(['git', 'stash']) + auto_stashed = True + elif changes: + print("There are uncommitted changes. Please commit or stash them or use --stash to stash them automatically") + exit(1) + + # checkout the main branch and build the runner + subprocess.check_output(['git', 'checkout', 'main']) + print("Building runner on main branch...") + build_runner() + subprocess.check_output(['cp', DEFAULT_RUNNER_PATH, OLD_RUNNER_PATH]) + + # checkout the original branch and build the runner + subprocess.check_output(['git', 'checkout', original_branch]) + + if auto_stashed: + print("Unstashing changes") + subprocess.check_output(['git', 'stash', 'pop']) + + print(f"Building runner on branch {original_branch}...") + build_runner() + subprocess.check_output(['cp', DEFAULT_RUNNER_PATH, NEW_RUNNER_PATH]) + + +def get_current_branch(): + return subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD']).decode('utf-8').strip() + + +def build_runner(): + # set env variables to + env = {"BUILD_BENCHMARK": "1", "BUILD_TPCH": "1", "BUILD_HTTPFS": "1"} + # Add the current environment + env.update(os.environ) + subprocess.run(["make"], env=env) + + +def run_benchmark(old_runner, new_runner, benchmark_file): + "Expected usage: python3 scripts/regression_test_runner.py --old=/old/benchmark_runner --new=/new/benchmark_runner --benchmarks=/benchmark/list.csv" + + if not os.path.isfile(old_runner): + print(f"Failed to find old runner {old_runner}") + exit(1) + + if not os.path.isfile(new_runner): + print(f"Failed to find new runner {new_runner}") + exit(1) + + command = [ + 'python3', + 'scripts/regression_test_runner.py', + f'--old={old_runner}', + f'--new={new_runner}', + f'--benchmarks={benchmark_file}', + '--threads=4', + ] + + print(f"Running command: {' '.join(command)}") + + # start the existing runner, make sure to pipe the output to the console + subprocess.run(command, check=True) + + +def main(): + benchmark_file = None + stash_changes = False + for arg in sys.argv: + if arg.startswith("--benchmarks="): + benchmark_file = arg.replace("--benchmarks=", "") + elif arg == "--stash": + stash_changes = True + + elif arg == "--help": + print("Expected usage: python3 scripts/local_regression_test_runner.py --benchmarks=/benchmark/list.csv") + print("Optional: --stas: Stash changes before running the benchmarks") + exit(1) + + # make sure that we are in the root directory of the project + if not os.path.isfile("scripts/local_regression_test_runner.py"): + print("Please run this script from the root directory of the project") + exit(1) + + if benchmark_file is None: + print( + "Expected usage: python3 scripts/local_regression_test_runner.py ---benchmarks=.github/regression/imdb.csv" + ) + exit(1) + + build(stash_changes) + run_benchmark(OLD_RUNNER_PATH, NEW_RUNNER_PATH, benchmark_file) + + +if __name__ == "__main__": + main() diff --git a/src/common/row_operations/row_matcher.cpp b/src/common/row_operations/row_matcher.cpp index 8d6770042d4e..53e6792661ef 100644 --- a/src/common/row_operations/row_matcher.cpp +++ b/src/common/row_operations/row_matcher.cpp @@ -197,12 +197,43 @@ void RowMatcher::Initialize(const bool no_match_sel, const TupleDataLayout &layo } } +void RowMatcher::Initialize(const bool no_match_sel, const TupleDataLayout &layout, const Predicates &predicates, + vector &column_ids_p) { + + // The column_ids must have the same size as the predicates vector + D_ASSERT(column_ids_p.size() == predicates.size()); + + // The largest column_id must be smaller than the number of columns in the layout + D_ASSERT(*max_element(column_ids_p.begin(), column_ids_p.end()) < layout.ColumnCount()); + + column_ids = make_uniq>(column_ids_p); + + match_functions.reserve(predicates.size()); + for (idx_t idx = 0; idx < predicates.size(); idx++) { + column_t col_idx = (*column_ids)[idx]; + match_functions.push_back(GetMatchFunction(no_match_sel, layout.GetTypes()[col_idx], predicates[idx])); + } +} + idx_t RowMatcher::Match(DataChunk &lhs, const vector &lhs_formats, SelectionVector &sel, idx_t count, const TupleDataLayout &rhs_layout, Vector &rhs_row_locations, SelectionVector *no_match_sel, idx_t &no_match_count) { D_ASSERT(!match_functions.empty()); - for (idx_t col_idx = 0; col_idx < match_functions.size(); col_idx++) { - const auto &match_function = match_functions[col_idx]; + + if (column_ids) { + // The column_ids must have the same size as the match_functions vector + D_ASSERT(column_ids->size() == match_functions.size()); + + // The largest column_id must be smaller than the number of columns in the lhs + D_ASSERT(*max_element(column_ids->begin(), column_ids->end()) < lhs.ColumnCount()); + } + + for (idx_t fun_idx = 0; fun_idx < match_functions.size(); fun_idx++) { + // if we only care about specific columns, we need to use the column_ids to get the correct column index + // otherwise, we just use the fun_idx + const auto col_idx = column_ids ? (*column_ids)[fun_idx] : fun_idx; + + const auto &match_function = match_functions[fun_idx]; count = match_function.function(lhs.data[col_idx], lhs_formats[col_idx], sel, count, rhs_layout, rhs_row_locations, col_idx, match_function.child_functions, no_match_sel, no_match_count); diff --git a/src/common/types/row/tuple_data_collection.cpp b/src/common/types/row/tuple_data_collection.cpp index a15a5b93a2e0..acbe9274e2d1 100644 --- a/src/common/types/row/tuple_data_collection.cpp +++ b/src/common/types/row/tuple_data_collection.cpp @@ -405,6 +405,17 @@ void TupleDataCollection::InitializeChunk(DataChunk &chunk) const { chunk.Initialize(allocator->GetAllocator(), layout.GetTypes()); } +void TupleDataCollection::InitializeChunk(DataChunk &chunk, const vector &column_ids) const { + vector chunk_types(column_ids.size()); + // keep the order of the columns + for (idx_t i = 0; i < column_ids.size(); i++) { + auto column_idx = column_ids[i]; + D_ASSERT(column_idx < layout.ColumnCount()); + chunk_types[i] = layout.GetTypes()[column_idx]; + } + chunk.Initialize(allocator->GetAllocator(), chunk_types); +} + void TupleDataCollection::InitializeScanChunk(TupleDataScanState &state, DataChunk &chunk) const { auto &column_ids = state.chunk_state.column_ids; D_ASSERT(!column_ids.empty()); diff --git a/src/execution/join_hashtable.cpp b/src/execution/join_hashtable.cpp index 2b74b5bd7e4e..da4f762722e4 100644 --- a/src/execution/join_hashtable.cpp +++ b/src/execution/join_hashtable.cpp @@ -8,19 +8,28 @@ #include "duckdb/storage/buffer_manager.hpp" namespace duckdb { - using ValidityBytes = JoinHashTable::ValidityBytes; using ScanStructure = JoinHashTable::ScanStructure; using ProbeSpill = JoinHashTable::ProbeSpill; using ProbeSpillLocalState = JoinHashTable::ProbeSpillLocalAppendState; +JoinHashTable::ProbeState::ProbeState() + : hash_salts_v(LogicalType::UBIGINT), ht_offsets_v(LogicalType::UBIGINT), row_ptr_insert_to_v(LogicalType::POINTER), + key_no_match_sel(STANDARD_VECTOR_SIZE), salt_match_sel(STANDARD_VECTOR_SIZE), + salt_no_match_sel(STANDARD_VECTOR_SIZE) { +} + +JoinHashTable::InsertState::InsertState() : key_remaining_sel(STANDARD_VECTOR_SIZE), empty_sel(STANDARD_VECTOR_SIZE) { +} + JoinHashTable::JoinHashTable(BufferManager &buffer_manager_p, const vector &conditions_p, vector btypes, JoinType type_p, const vector &output_columns_p) : buffer_manager(buffer_manager_p), conditions(conditions_p), build_types(std::move(btypes)), output_columns(output_columns_p), entry_size(0), tuple_size(0), vfound(Value::BOOLEAN(false)), join_type(type_p), finalized(false), has_null(false), radix_bits(INITIAL_RADIX_BITS), partition_start(0), partition_end(0) { - for (auto &condition : conditions) { + for (idx_t i = 0; i < conditions.size(); ++i) { + auto &condition = conditions[i]; D_ASSERT(condition.left->return_type == condition.right->return_type); auto type = condition.left->return_type; if (condition.comparison == ExpressionType::COMPARE_EQUAL || @@ -30,9 +39,15 @@ JoinHashTable::JoinHashTable(BufferManager &buffer_manager_p, const vector(new RowMatcher()); + row_matcher_probe_no_match_sel = unique_ptr(new RowMatcher()); + + row_matcher_probe->Initialize(false, layout, non_equality_predicates, non_equality_predicate_columns); + row_matcher_probe_no_match_sel->Initialize(true, layout, non_equality_predicates, + non_equality_predicate_columns); + + needs_chain_matcher = true; + } else { + needs_chain_matcher = false; + } + + row_matcher_build.Initialize(true, layout, equality_predicates); const auto &offsets = layout.GetOffsets(); tuple_size = offsets[condition_types.size() + build_types.size()]; @@ -86,32 +116,183 @@ void JoinHashTable::Merge(JoinHashTable &other) { sink_collection->Combine(*other.sink_collection); } -void JoinHashTable::ApplyBitmask(Vector &hashes, idx_t count) { - if (hashes.GetVectorType() == VectorType::CONSTANT_VECTOR) { - D_ASSERT(!ConstantVector::IsNull(hashes)); - auto indices = ConstantVector::GetData(hashes); - *indices = *indices & bitmask; +static void ApplyBitmaskAndGetSalt(Vector &hashes_v, const idx_t &count, const idx_t &bitmask, Vector &ht_offsets_v, + Vector &hash_salts_v) { + + auto hash_salts = FlatVector::GetData(hash_salts_v); + auto ht_offsets = FlatVector::GetData(ht_offsets_v); + + if (hashes_v.GetVectorType() == VectorType::CONSTANT_VECTOR) { + + D_ASSERT(!ConstantVector::IsNull(hashes_v)); + + auto indices = ConstantVector::GetData(hashes_v); + hash_t salt = aggr_ht_entry_t::ExtractSalt(*indices); + idx_t offset = *indices & bitmask; + hashes_v.Flatten(count); + + for (idx_t i = 0; i < count; i++) { + hash_salts[i] = salt; + ht_offsets[i] = offset; + } } else { - hashes.Flatten(count); - auto indices = FlatVector::GetData(hashes); + UnifiedVectorFormat hashes_v_unified; + hashes_v.ToUnifiedFormat(count, hashes_v_unified); + + auto hash_data = UnifiedVectorFormat::GetData(hashes_v_unified); + for (idx_t i = 0; i < count; i++) { - indices[i] &= bitmask; + auto hash_index = hashes_v_unified.sel->get_index(i); + auto hash = hash_data[hash_index]; + + auto ht_offset = hash & bitmask; + ht_offsets[i] = ht_offset; + hash_salts[i] = aggr_ht_entry_t::ExtractSalt(hash); } } } -void JoinHashTable::ApplyBitmask(Vector &hashes, const SelectionVector &sel, idx_t count, Vector &pointers) { - UnifiedVectorFormat hdata; - hashes.ToUnifiedFormat(count, hdata); +static void ApplyBitmaskAndGetSalt(Vector &hashes_v, const idx_t &count, const idx_t &bitmask, Vector &ht_offsets_v, + Vector &hash_salts_v, const SelectionVector &sel) { - auto hash_data = UnifiedVectorFormat::GetData(hdata); - auto result_data = FlatVector::GetData(pointers); - auto main_ht = reinterpret_cast(hash_map.get()); - for (idx_t i = 0; i < count; i++) { - auto rindex = sel.get_index(i); - auto hindex = hdata.sel->get_index(rindex); - auto hash = hash_data[hindex]; - result_data[rindex] = main_ht + (hash & bitmask); + auto hash_salts = FlatVector::GetData(hash_salts_v); + auto ht_offsets = FlatVector::GetData(ht_offsets_v); + + if (hashes_v.GetVectorType() == VectorType::CONSTANT_VECTOR) { + D_ASSERT(!ConstantVector::IsNull(hashes_v)); + auto indices = ConstantVector::GetData(hashes_v); + hash_t salt = aggr_ht_entry_t::ExtractSalt(*indices); + idx_t offset = *indices & bitmask; + hashes_v.Flatten(count); + + for (idx_t i = 0; i < count; i++) { + auto row_index = sel.get_index(i); + hash_salts[row_index] = salt; + ht_offsets[row_index] = offset; + } + } else { + UnifiedVectorFormat hashes_v_unified; + hashes_v.ToUnifiedFormat(count, hashes_v_unified); + + auto hash_data = UnifiedVectorFormat::GetData(hashes_v_unified); + + for (idx_t i = 0; i < count; i++) { + auto row_index = sel.get_index(i); + auto hash_index = hashes_v_unified.sel->get_index(row_index); + auto hash = hash_data[hash_index]; + + auto ht_offset = hash & bitmask; + ht_offsets[row_index] = ht_offset; + hash_salts[row_index] = aggr_ht_entry_t::ExtractSalt(hash); + } + } +} + +// uses an AND operation to apply the bitmask instead of an in condition +inline void IncrementAndWrap(idx_t &value, idx_t increment, uint64_t bitmask) { + value += increment; + value &= bitmask; +} + +inline void IncrementAndWrap(idx_t &value, uint64_t bitmask) { + IncrementAndWrap(value, 1, bitmask); +} + +void JoinHashTable::GetRowPointers(DataChunk &keys, TupleDataChunkState &key_state, ProbeState &state, Vector &hashes_v, + const SelectionVector &sel, idx_t &count, Vector &pointers_result_v, + SelectionVector &match_sel) { + + ApplyBitmaskAndGetSalt(hashes_v, count, bitmask, state.ht_offsets_v, state.hash_salts_v, sel); + + auto ht_offsets = FlatVector::GetData(state.ht_offsets_v); + auto hash_salts = FlatVector::GetData(state.hash_salts_v); + auto pointers_result = FlatVector::GetData(pointers_result_v); + auto row_ptr_insert_to = FlatVector::GetData(state.row_ptr_insert_to_v); + + const SelectionVector *remaining_sel = &sel; + idx_t remaining_count = count; + + idx_t &match_count = count; + match_count = 0; + + while (true) { + + idx_t salt_match_count = 0; + + // for each entry, linear probing until + // a) an empty entry is found -> return nullptr (do nothing, as vector is zeroed) + // b) an entry is found where the salt matches -> need to compare the keys + for (idx_t i = 0; i < remaining_count; i++) { + const auto row_index = remaining_sel->get_index(i); + auto &ht_offset = ht_offsets[row_index]; + + idx_t increment; + + // increment the ht_offset of the entry as long as next entry is occupied and salt does not match + do { + + auto &entry = entries[ht_offset]; + bool occupied = entry.IsOccupied(); + + // no need to do anything, as the vector is zeroed + if (!occupied) { + break; + } + + bool salt_match = entry.GetSalt() == hash_salts[row_index]; + + // the entries we need to process in the next iteration are the ones that are occupied and the salt + // does not match, the ones that are empty need no further processing + state.salt_match_sel.set_index(salt_match_count, row_index); + salt_match_count += salt_match; + + // condition for incrementing the ht_offset: occupied and salt does not match -> move to next entry + increment = !salt_match; + IncrementAndWrap(ht_offset, increment, bitmask); + + } while (increment); + } + + // at this step, for all the pointers_result_v we stepped either until we found an empty entry or an entry with + // a matching salt, we need to compare the keys for the ones that have a matching salt + if (salt_match_count == 0) { + break; + } else { + // Get the pointers_result_v to the rows that need to be compared + for (idx_t need_compare_idx = 0; need_compare_idx < salt_match_count; need_compare_idx++) { + const auto row_index = state.salt_match_sel.get_index(need_compare_idx); + const auto &entry = entries[ht_offsets[row_index]]; + row_ptr_insert_to[row_index] = entry.GetPointer(); + } + + // Perform row comparisons, after function call salt_match_sel will point to the keys that match + idx_t key_no_match_count = 0; + idx_t key_match_count = + row_matcher_build.Match(keys, key_state.vector_data, state.salt_match_sel, salt_match_count, layout, + state.row_ptr_insert_to_v, &state.key_no_match_sel, key_no_match_count); + + D_ASSERT(key_match_count + key_no_match_count == salt_match_count); + + // Set a pointer to the matching row + for (idx_t i = 0; i < key_match_count; i++) { + const auto row_index = state.salt_match_sel.get_index(i); + pointers_result[row_index] = row_ptr_insert_to[row_index]; + + match_sel.set_index(match_count, row_index); + match_count++; + } + + // update the ht_offset to point to the next entry for the ones that did not match + for (idx_t i = 0; i < key_no_match_count; i++) { + const auto row_index = state.key_no_match_sel.get_index(i); + auto &ht_offset = ht_offsets[row_index]; + + IncrementAndWrap(ht_offset, 1, bitmask); + } + + remaining_sel = &state.key_no_match_sel; + remaining_count = key_no_match_count; + } } } @@ -243,70 +424,253 @@ idx_t JoinHashTable::PrepareKeys(DataChunk &keys, vector return added_count; } -template -static inline void InsertHashesLoop(atomic pointers[], const hash_t indices[], const idx_t count, - const data_ptr_t key_locations[], const idx_t pointer_offset) { - for (idx_t i = 0; i < count; i++) { - const auto index = indices[i]; - if (PARALLEL) { - data_ptr_t head; +template +static inline bool InsertRowToEntry(atomic &entry, data_ptr_t row_ptr_to_insert, const hash_t salt, + const idx_t pointer_offset) { + + if (PARALLEL) { + + // if we expect the entry to be empty, if the operation fails we need to cancel the whole operation as another + // key might have been inserted in the meantime that does not match the current key + if (EXPECT_EMPTY) { + + // add nullptr to the end of the list to mark the end + Store(nullptr, row_ptr_to_insert + pointer_offset); + + aggr_ht_entry_t new_empty_entry = aggr_ht_entry_t::GetDesiredEntry(row_ptr_to_insert, salt); + aggr_ht_entry_t expected_empty_entry = aggr_ht_entry_t::GetEmptyEntry(); + bool successful_swap = std::atomic_compare_exchange_weak(&entry, &expected_empty_entry, new_empty_entry); + return successful_swap; + } + + // if we expect the entry to be full, we know that even if the insert fails the keys still match so we can + // just keep trying until we succeed + else { + + aggr_ht_entry_t expected_current_entry = entry.load(); + aggr_ht_entry_t desired_new_entry; + D_ASSERT(expected_current_entry.IsOccupied()); + do { - head = pointers[index]; - Store(head, key_locations[i] + pointer_offset); - } while (!std::atomic_compare_exchange_weak(&pointers[index], &head, key_locations[i])); - } else { - // set prev in current key to the value (NOTE: this will be nullptr if there is none) - Store(pointers[index], key_locations[i] + pointer_offset); + data_ptr_t current_row_pointer = expected_current_entry.GetPointer(); + Store(current_row_pointer, row_ptr_to_insert + pointer_offset); + desired_new_entry = aggr_ht_entry_t::GetDesiredEntry(row_ptr_to_insert, salt); + } while (!std::atomic_compare_exchange_weak(&entry, &expected_current_entry, desired_new_entry)); - // set pointer to current tuple - pointers[index] = key_locations[i]; + return true; } } + // if we are not in parallel mode, we can just do the operation without any checks + else { + aggr_ht_entry_t current_entry = entry.load(); + data_ptr_t current_row_pointer = current_entry.GetPointerOrNull(); + Store(current_row_pointer, row_ptr_to_insert + pointer_offset); + entry = aggr_ht_entry_t::GetDesiredEntry(row_ptr_to_insert, salt); + return true; + } } -void JoinHashTable::InsertHashes(Vector &hashes, idx_t count, data_ptr_t key_locations[], bool parallel) { - D_ASSERT(hashes.GetType().id() == LogicalType::HASH); +template +static void InsertHashesLoop(atomic entries[], Vector row_locations, Vector &hashes_v, + const idx_t &count, JoinHashTable::InsertState &state, const idx_t &pointer_offset, + const idx_t &capacity, unique_ptr &data_collection, + RowMatcher &row_matcher_build, const vector &equality_predicate_columns, + const TupleDataLayout &layout, const idx_t &bitmask) { + + D_ASSERT(hashes_v.GetType().id() == LogicalType::HASH); + + ApplyBitmaskAndGetSalt(hashes_v, count, bitmask, state.ht_offsets_v, state.hash_salts_v); + + // the offset for each row to insert + auto ht_offsets = FlatVector::GetData(state.ht_offsets_v); + auto hash_salts = FlatVector::GetData(state.hash_salts_v); + auto row_ptr_insert_to = FlatVector::GetData(state.row_ptr_insert_to_v); + auto row_ptrs_to_insert = FlatVector::GetData(row_locations); + + const SelectionVector *remaining_sel = FlatVector::IncrementalSelectionVector(); + idx_t remaining_count = count; + + while (remaining_count > 0) { + + idx_t salt_match_count = 0; + idx_t salt_no_match_count = 0; + + idx_t empty_count = 0; + idx_t key_no_match_count = 0; + + // iterate over each entry to find out whether it belongs to an existing list or will start + // a new list + for (idx_t i = 0; i < remaining_count; i++) { + + const auto row_index = remaining_sel->get_index(i); + auto &ht_offset = ht_offsets[row_index]; + + atomic &atomic_entry = entries[ht_offset]; + aggr_ht_entry_t entry = atomic_entry.load(); + + bool occupied = entry.IsOccupied(); + bool salt_match = entry.GetSalt() == hash_salts[row_index]; + + bool empty = !occupied; + bool occupied_and_salt_match = occupied && salt_match; + bool occupied_and_salt_no_match = occupied && !salt_match; + + state.empty_sel.set_index(empty_count, row_index); + empty_count += empty; + + state.salt_match_sel.set_index(salt_match_count, row_index); + salt_match_count += occupied_and_salt_match; - // use bitmask to get position in array - ApplyBitmask(hashes, count); + state.salt_no_match_sel.set_index(salt_no_match_count, row_index); + salt_no_match_count += occupied_and_salt_no_match; + + // we can increment the ones with no match right away + IncrementAndWrap(ht_offset, occupied_and_salt_no_match, bitmask); + } + + // for each empty entry, insert the row + for (idx_t i = 0; i < empty_count; i++) { + const auto row_index = state.empty_sel.get_index(i); + + data_ptr_t row_ptr = row_ptrs_to_insert[row_index]; + idx_t &ht_offset = ht_offsets[row_index]; + hash_t salt = hash_salts[row_index]; + + atomic &atomic_entry = entries[ht_offset]; + bool successful_insertion = InsertRowToEntry(atomic_entry, row_ptr, salt, pointer_offset); + + // if the insertion is not successful, the entry was occupied in the meantime (only in parallel mode) + if (PARALLEL) { + state.salt_match_sel.set_index(salt_match_count, row_index); + salt_match_count += !successful_insertion; + } + } + + // at this step, for all the pointers_result_v we stepped either until we found an empty entry or an entry with + // a matching salt, we need to compare the keys for the ones that have a matching salt + if (salt_match_count != 0) { + + // Get the data for the rows that need to be compared + DataChunk lhs_data; + data_collection->InitializeChunk(lhs_data, + equality_predicate_columns); // makes sure DataChunk has the right format + lhs_data.SetCardinality(count); // and the right size + + TupleDataChunkState chunk_state; + data_collection->InitializeChunkState(chunk_state, equality_predicate_columns); + + // The target selection vector says where to write the results into the lhs_data, we just want to write + // sequentially as otherwise we trigger a bug in the Gather function + data_collection->Gather(row_locations, state.salt_match_sel, salt_match_count, equality_predicate_columns, + lhs_data, *FlatVector::IncrementalSelectionVector(), + chunk_state.cached_cast_vectors); + + TupleDataCollection::ToUnifiedFormat(chunk_state, lhs_data); + + vector &lhs_formats = chunk_state.vector_data; + + // Get the pointers to the rows that need to be compared + for (idx_t need_compare_idx = 0; need_compare_idx < salt_match_count; need_compare_idx++) { + const auto entry_index = state.salt_match_sel.get_index(need_compare_idx); + const auto &entry = entries[ht_offsets[entry_index]]; + row_ptr_insert_to[need_compare_idx] = entry.load().GetPointer(); + } + + SelectionVector match_sel(STANDARD_VECTOR_SIZE); + for (idx_t i = 0; i < salt_match_count; i++) { + match_sel.set_index(i, i); + } + + key_no_match_count = 0; + idx_t key_match_count = + row_matcher_build.Match(lhs_data, lhs_formats, match_sel, salt_match_count, layout, + state.row_ptr_insert_to_v, &state.key_no_match_sel, key_no_match_count); + + D_ASSERT(key_match_count + key_no_match_count == salt_match_count); + // Insert the rows that match + for (idx_t i = 0; i < key_match_count; i++) { + const auto need_compare_idx = match_sel.get_index(i); + const auto entry_index = state.salt_match_sel.get_index(need_compare_idx); + + auto &entry = entries[ht_offsets[entry_index]]; + data_ptr_t row_ptr = row_ptrs_to_insert[entry_index]; + auto salt = hash_salts[entry_index]; + InsertRowToEntry(entry, row_ptr, salt, pointer_offset); + } - hashes.Flatten(count); - D_ASSERT(hashes.GetVectorType() == VectorType::FLAT_VECTOR); + // Linear probing: each of the entries that do not match move to the next entry in the HT + for (idx_t i = 0; i < key_no_match_count; i++) { - auto pointers = reinterpret_cast *>(hash_map.get()); - auto indices = FlatVector::GetData(hashes); + const auto need_compare_idx = state.key_no_match_sel.get_index(i); + const auto entry_index = state.salt_match_sel.get_index(need_compare_idx); + + auto &ht_offset = ht_offsets[entry_index]; + + ht_offset++; + if (ht_offset >= capacity) { + ht_offset = 0; + } + + state.key_remaining_sel.set_index(i, entry_index); + } + } + + // the remaining are the ones where the salt did not match or the ones that keys did not match + // add the salt no match to the key remaining sel + for (idx_t i = 0; i < salt_no_match_count; i++) { + const auto row_index = state.salt_no_match_sel.get_index(i); + state.key_remaining_sel.set_index(key_no_match_count + i, row_index); + } + + remaining_count = salt_no_match_count + key_no_match_count; + remaining_sel = &state.key_remaining_sel; + } +} + +void JoinHashTable::InsertHashes(Vector &hashes_v, idx_t count, TupleDataChunkState &chunk_state, + InsertState &insert_state, bool parallel) { + auto atomic_entries = reinterpret_cast *>(this->entries); + auto row_locations = chunk_state.row_locations; if (parallel) { - InsertHashesLoop(pointers, indices, count, key_locations, pointer_offset); + InsertHashesLoop(atomic_entries, row_locations, hashes_v, count, insert_state, pointer_offset, capacity, + data_collection, row_matcher_build, equality_predicate_columns, layout, bitmask); } else { - InsertHashesLoop(pointers, indices, count, key_locations, pointer_offset); + InsertHashesLoop(atomic_entries, row_locations, hashes_v, count, insert_state, pointer_offset, capacity, + data_collection, row_matcher_build, equality_predicate_columns, layout, bitmask); } } void JoinHashTable::InitializePointerTable() { - idx_t capacity = PointerTableCapacity(Count()); + capacity = PointerTableCapacity(Count()); D_ASSERT(IsPowerOfTwo(capacity)); if (hash_map.get()) { // There is already a hash map - auto current_capacity = hash_map.GetSize() / sizeof(data_ptr_t); - if (capacity != current_capacity) { - // Different size, re-allocate + auto current_capacity = hash_map.GetSize() / sizeof(aggr_ht_entry_t); + if (capacity > current_capacity) { + // Need more space hash_map = buffer_manager.GetBufferAllocator().Allocate(capacity * sizeof(data_ptr_t)); + entries = reinterpret_cast(hash_map.get()); + } else { + // Just use the current hash map + capacity = current_capacity; } } else { // Allocate a hash map - hash_map = buffer_manager.GetBufferAllocator().Allocate(capacity * sizeof(data_ptr_t)); + hash_map = buffer_manager.GetBufferAllocator().Allocate(capacity * sizeof(aggr_ht_entry_t)); + entries = reinterpret_cast(hash_map.get()); } - D_ASSERT(hash_map.GetSize() == capacity * sizeof(data_ptr_t)); + D_ASSERT(hash_map.GetSize() == capacity * sizeof(aggr_ht_entry_t)); // initialize HT with all-zero entries - std::fill_n(reinterpret_cast(hash_map.get()), capacity, nullptr); + std::fill_n(entries, capacity, aggr_ht_entry_t(0)); bitmask = capacity - 1; } void JoinHashTable::Finalize(idx_t chunk_idx_from, idx_t chunk_idx_to, bool parallel) { + // Pointer table should be allocated D_ASSERT(hash_map.get()); @@ -316,12 +680,17 @@ void JoinHashTable::Finalize(idx_t chunk_idx_from, idx_t chunk_idx_to, bool para TupleDataChunkIterator iterator(*data_collection, TupleDataPinProperties::KEEP_EVERYTHING_PINNED, chunk_idx_from, chunk_idx_to, false); const auto row_locations = iterator.GetRowLocations(); + + InsertState insert_state; do { + const auto count = iterator.GetCurrentChunkCount(); for (idx_t i = 0; i < count; i++) { hash_data[i] = Load(row_locations[i] + pointer_offset); } - InsertHashes(hashes, count, row_locations, parallel); + TupleDataChunkState &chunk_state = iterator.GetChunkState(); + + InsertHashes(hashes, count, chunk_state, insert_state, parallel); } while (iterator.Next()); } @@ -344,7 +713,7 @@ unique_ptr JoinHashTable::InitializeScanStructure(DataChunk &keys return ss; } -unique_ptr JoinHashTable::Probe(DataChunk &keys, TupleDataChunkState &key_state, +unique_ptr JoinHashTable::Probe(DataChunk &keys, TupleDataChunkState &key_state, ProbeState &probe_state, Vector *precomputed_hashes) { const SelectionVector *current_sel; auto ss = InitializeScanStructure(keys, key_state, current_sel); @@ -353,24 +722,23 @@ unique_ptr JoinHashTable::Probe(DataChunk &keys, TupleDataChunkSt } if (precomputed_hashes) { - ApplyBitmask(*precomputed_hashes, *current_sel, ss->count, ss->pointers); + GetRowPointers(keys, key_state, probe_state, *precomputed_hashes, *current_sel, ss->count, ss->pointers, + ss->sel_vector); } else { // hash all the keys Vector hashes(LogicalType::HASH); Hash(keys, *current_sel, ss->count, hashes); // now initialize the pointers of the scan structure based on the hashes - ApplyBitmask(hashes, *current_sel, ss->count, ss->pointers); + GetRowPointers(keys, key_state, probe_state, hashes, *current_sel, ss->count, ss->pointers, ss->sel_vector); } - // create the selection vector linking to only non-empty entries - ss->InitializeSelectionVector(current_sel); - return ss; } ScanStructure::ScanStructure(JoinHashTable &ht_p, TupleDataChunkState &key_state_p) - : key_state(key_state_p), pointers(LogicalType::POINTER), sel_vector(STANDARD_VECTOR_SIZE), ht(ht_p), + : key_state(key_state_p), pointers(LogicalType::POINTER), sel_vector(STANDARD_VECTOR_SIZE), + chain_match_sel_vector(STANDARD_VECTOR_SIZE), chain_no_match_sel_vector(STANDARD_VECTOR_SIZE), ht(ht_p), finished(false) { } @@ -406,7 +774,7 @@ void ScanStructure::Next(DataChunk &keys, DataChunk &left, DataChunk &result) { } } -bool ScanStructure::PointersExhausted() { +bool ScanStructure::PointersExhausted() const { // AdvancePointers creates a "new_count" for every pointer advanced during the // previous advance pointers call. If no pointers are advanced, new_count = 0. // count is then set ot new_count. @@ -414,20 +782,36 @@ bool ScanStructure::PointersExhausted() { } idx_t ScanStructure::ResolvePredicates(DataChunk &keys, SelectionVector &match_sel, SelectionVector *no_match_sel) { - // Start with the scan selection + + // Initialize the found_match array to the current sel_vector for (idx_t i = 0; i < this->count; ++i) { match_sel.set_index(i, this->sel_vector.get_index(i)); } - idx_t no_match_count = 0; - auto &matcher = no_match_sel ? ht.row_matcher_no_match_sel : ht.row_matcher; - return matcher.Match(keys, key_state.vector_data, match_sel, this->count, ht.layout, pointers, no_match_sel, - no_match_count); + // If there is a matcher for the probing side because of non-equality predicates, use it + if (ht.needs_chain_matcher) { + + idx_t no_match_count = 0; + + auto &matcher = no_match_sel ? ht.row_matcher_probe_no_match_sel : ht.row_matcher_probe; + + D_ASSERT(matcher); + + // we need to only use the vectors with the indices of the columns that are used in the probe phase, namely + // the non-equality columns + + return matcher->Match(keys, key_state.vector_data, match_sel, this->count, ht.layout, pointers, no_match_sel, + no_match_count); + } else { + // no match sel is the opposite of match sel + return this->count; + } } idx_t ScanStructure::ScanInnerJoin(DataChunk &keys, SelectionVector &result_vector) { while (true) { - // resolve the predicates for this set of keys + + // resolve the equality_predicates for this set of keys idx_t result_count = ResolvePredicates(keys, result_vector, nullptr); // after doing all the comparisons set the found_match vector @@ -462,20 +846,6 @@ void ScanStructure::AdvancePointers(const SelectionVector &sel, idx_t sel_count) this->count = new_count; } -void ScanStructure::InitializeSelectionVector(const SelectionVector *¤t_sel) { - idx_t non_empty_count = 0; - auto ptrs = FlatVector::GetData(pointers); - auto cnt = count; - for (idx_t i = 0; i < cnt; i++) { - const auto idx = current_sel->get_index(i); - ptrs[idx] = Load(ptrs[idx]); - if (ptrs[idx]) { - sel_vector.set_index(non_empty_count++, idx); - } - } - count = non_empty_count; -} - void ScanStructure::AdvancePointers() { AdvancePointers(this->sel_vector, this->count); } @@ -499,17 +869,17 @@ void ScanStructure::NextInnerJoin(DataChunk &keys, DataChunk &left, DataChunk &r return; } - SelectionVector result_vector(STANDARD_VECTOR_SIZE); + idx_t result_count = ScanInnerJoin(keys, chain_match_sel_vector); - idx_t result_count = ScanInnerJoin(keys, result_vector); if (result_count > 0) { if (PropagatesBuildSide(ht.join_type)) { // full/right outer join: mark join matches as FOUND in the HT auto ptrs = FlatVector::GetData(pointers); for (idx_t i = 0; i < result_count; i++) { - auto idx = result_vector.get_index(i); - // NOTE: threadsan reports this as a data race because this can be set concurrently by separate threads - // Technically it is, but it does not matter, since the only value that can be written is "true" + auto idx = chain_match_sel_vector.get_index(i); + // NOTE: threadsan reports this as a data race because this can be set concurrently by separate + // threads Technically it is, but it does not matter, since the only value that can be written is + // "true" Store(true, ptrs[idx] + ht.tuple_size); } } @@ -518,14 +888,14 @@ void ScanStructure::NextInnerJoin(DataChunk &keys, DataChunk &left, DataChunk &r // matches were found // construct the result // on the LHS, we create a slice using the result vector - result.Slice(left, result_vector, result_count); + result.Slice(left, chain_match_sel_vector, result_count); // on the RHS, we need to fetch the data from the hash table for (idx_t i = 0; i < ht.output_columns.size(); i++) { auto &vector = result.data[left.ColumnCount() + i]; const auto output_col_idx = ht.output_columns[i]; D_ASSERT(vector.GetType() == ht.layout.GetTypes()[output_col_idx]); - GatherResult(vector, result_vector, result_count, output_col_idx); + GatherResult(vector, chain_match_sel_vector, result_count, output_col_idx); } } AdvancePointers(); @@ -538,18 +908,20 @@ void ScanStructure::ScanKeyMatches(DataChunk &keys) { // we handle the entire chunk in one call to Next(). // for every pointer, we keep chasing pointers and doing comparisons. // this results in a boolean array indicating whether or not the tuple has a match - SelectionVector match_sel(STANDARD_VECTOR_SIZE), no_match_sel(STANDARD_VECTOR_SIZE); + // Start with the scan selection + while (this->count > 0) { - // resolve the predicates for the current set of pointers - idx_t match_count = ResolvePredicates(keys, match_sel, &no_match_sel); + + // resolve the equality_predicates for the current set of pointers + idx_t match_count = ResolvePredicates(keys, chain_match_sel_vector, &chain_no_match_sel_vector); idx_t no_match_count = this->count - match_count; // mark each of the matches as found for (idx_t i = 0; i < match_count; i++) { - found_match[match_sel.get_index(i)] = true; + found_match[chain_match_sel_vector.get_index(i)] = true; } // continue searching for the ones where we did not find a match yet - AdvancePointers(no_match_sel, no_match_count); + AdvancePointers(chain_no_match_sel_vector, no_match_count); } } @@ -749,21 +1121,22 @@ void ScanStructure::NextSingleJoin(DataChunk &keys, DataChunk &input, DataChunk // (2) we return NULL for that data if there is no match idx_t result_count = 0; SelectionVector result_sel(STANDARD_VECTOR_SIZE); - SelectionVector match_sel(STANDARD_VECTOR_SIZE), no_match_sel(STANDARD_VECTOR_SIZE); + while (this->count > 0) { - // resolve the predicates for the current set of pointers - idx_t match_count = ResolvePredicates(keys, match_sel, &no_match_sel); + + // resolve the equality_predicates for the current set of pointers + idx_t match_count = ResolvePredicates(keys, chain_match_sel_vector, &chain_no_match_sel_vector); idx_t no_match_count = this->count - match_count; // mark each of the matches as found for (idx_t i = 0; i < match_count; i++) { // found a match for this index - auto index = match_sel.get_index(i); + auto index = chain_match_sel_vector.get_index(i); found_match[index] = true; result_sel.set_index(result_count++, index); } // continue searching for the ones where we did not find a match yet - AdvancePointers(no_match_sel, no_match_count); + AdvancePointers(chain_no_match_sel_vector, no_match_count); } // reference the columns of the left side from the result D_ASSERT(input.ColumnCount() > 0); @@ -1019,8 +1392,8 @@ static void CreateSpillChunk(DataChunk &spill_chunk, DataChunk &keys, DataChunk } unique_ptr JoinHashTable::ProbeAndSpill(DataChunk &keys, TupleDataChunkState &key_state, - DataChunk &payload, ProbeSpill &probe_spill, - ProbeSpillLocalAppendState &spill_state, + ProbeState &probe_state, DataChunk &payload, + ProbeSpill &probe_spill, ProbeSpillLocalAppendState &spill_state, DataChunk &spill_chunk) { // hash all the keys Vector hashes(LogicalType::HASH); @@ -1054,10 +1427,7 @@ unique_ptr JoinHashTable::ProbeAndSpill(DataChunk &keys, TupleDat } // now initialize the pointers of the scan structure based on the hashes - ApplyBitmask(hashes, *current_sel, ss->count, ss->pointers); - - // create the selection vector linking to only non-empty entries - ss->InitializeSelectionVector(current_sel); + GetRowPointers(keys, key_state, probe_state, hashes, *current_sel, ss->count, ss->pointers, ss->sel_vector); return ss; } diff --git a/src/execution/operator/join/physical_hash_join.cpp b/src/execution/operator/join/physical_hash_join.cpp index f3cd8d9a641b..f4405793e21a 100644 --- a/src/execution/operator/join/physical_hash_join.cpp +++ b/src/execution/operator/join/physical_hash_join.cpp @@ -535,6 +535,7 @@ class HashJoinOperatorState : public CachingOperatorState { bool initialized; JoinHashTable::ProbeSpillLocalAppendState spill_state; + JoinHashTable::ProbeState probe_state; //! Chunk to sink data into for external join DataChunk spill_chunk; @@ -612,10 +613,11 @@ OperatorResultType PhysicalHashJoin::ExecuteInternal(ExecutionContext &context, // perform the actual probe if (sink.external) { - state.scan_structure = sink.hash_table->ProbeAndSpill(state.join_keys, state.join_key_state, input, - *sink.probe_spill, state.spill_state, state.spill_chunk); + state.scan_structure = + sink.hash_table->ProbeAndSpill(state.join_keys, state.join_key_state, state.probe_state, input, + *sink.probe_spill, state.spill_state, state.spill_chunk); } else { - state.scan_structure = sink.hash_table->Probe(state.join_keys, state.join_key_state); + state.scan_structure = sink.hash_table->Probe(state.join_keys, state.join_key_state, state.probe_state); } state.scan_structure->Next(state.join_keys, input, chunk); return OperatorResultType::HAVE_MORE_OUTPUT; @@ -718,11 +720,13 @@ class HashJoinLocalSourceState : public LocalSourceState { DataChunk join_keys; DataChunk payload; TupleDataChunkState join_key_state; + //! Column indices to easily reference the join keys/payload columns in probe_chunk vector join_key_indices; vector payload_indices; //! Scan structure for the external probe unique_ptr scan_structure; + JoinHashTable::ProbeState probe_state; bool empty_ht_probe_in_progress; //! Chunks assigned to this thread for a full/outer scan @@ -993,7 +997,7 @@ void HashJoinLocalSourceState::ExternalProbe(HashJoinGlobalSinkState &sink, Hash } // Perform the probe - scan_structure = sink.hash_table->Probe(join_keys, join_key_state, precomputed_hashes); + scan_structure = sink.hash_table->Probe(join_keys, join_key_state, probe_state, precomputed_hashes); scan_structure->Next(join_keys, payload, chunk); } diff --git a/src/include/duckdb/common/row_operations/row_matcher.hpp b/src/include/duckdb/common/row_operations/row_matcher.hpp index 006e2f6dcb23..77f0b3dca59e 100644 --- a/src/include/duckdb/common/row_operations/row_matcher.hpp +++ b/src/include/duckdb/common/row_operations/row_matcher.hpp @@ -34,8 +34,13 @@ struct RowMatcher { public: using Predicates = vector; - //! Initializes the RowMatcher, filling match_functions using layout and predicates - void Initialize(const bool no_match_sel, const TupleDataLayout &layout, const Predicates &predicates); + //! Initializes the RowMatcher, filling match_functions using layout and equality_predicates + void Initialize(bool no_match_sel, const TupleDataLayout &layout, const Predicates &predicates); + + //! Initializes the RowMatcher, filling match_functions using layout and equality_predicates but only for the given + //! column_ids + void Initialize(bool no_match_sel, const TupleDataLayout &layout, const Predicates &predicates, + vector &column_ids); //! Given a DataChunk on the LHS, on which we've called TupleDataCollection::ToUnifiedFormat, //! we match it with rows on the RHS, according to the given layout and locations. //! Initially, 'sel' has 'count' entries which point to what needs to be compared. @@ -58,6 +63,8 @@ struct RowMatcher { private: vector match_functions; + + unique_ptr> column_ids; }; } // namespace duckdb diff --git a/src/include/duckdb/common/types/row/tuple_data_collection.hpp b/src/include/duckdb/common/types/row/tuple_data_collection.hpp index 01fc20ec33d9..a633c6d32e4f 100644 --- a/src/include/duckdb/common/types/row/tuple_data_collection.hpp +++ b/src/include/duckdb/common/types/row/tuple_data_collection.hpp @@ -145,6 +145,8 @@ class TupleDataCollection { //! Initializes a chunk with the correct types that can be used to call Append/Scan void InitializeChunk(DataChunk &chunk) const; + //! Initializes a chunk with the correct types that can be used to call Append/Scan + void InitializeChunk(DataChunk &chunk, const vector &column_ids) const; //! Initializes a chunk with the correct types for a given scan state void InitializeScanChunk(TupleDataScanState &state, DataChunk &chunk) const; //! Initializes a Scan state for scanning all columns diff --git a/src/include/duckdb/execution/aggregate_hashtable.hpp b/src/include/duckdb/execution/aggregate_hashtable.hpp index d05b3576386d..f8b602e56bd6 100644 --- a/src/include/duckdb/execution/aggregate_hashtable.hpp +++ b/src/include/duckdb/execution/aggregate_hashtable.hpp @@ -31,17 +31,29 @@ struct FlushMoveState; struct aggr_ht_entry_t { public: - explicit aggr_ht_entry_t(hash_t value_p) : value(value_p) { + explicit inline aggr_ht_entry_t(hash_t value_p) : value(value_p) { + } + + // Add a default constructor for 32 bit linux test case + aggr_ht_entry_t() : value(0) { } inline bool IsOccupied() const { return value != 0; } + // Returns a pointer based on the stored value without checking cell occupancy. + // This can return a nullptr if the cell is not occupied. + inline data_ptr_t GetPointerOrNull() const { + return reinterpret_cast(value & POINTER_MASK); + } + + // Will only return if cell is occupied inline data_ptr_t GetPointer() const { D_ASSERT(IsOccupied()); return reinterpret_cast(value & POINTER_MASK); } + inline void SetPointer(const data_ptr_t &pointer) { // Pointer shouldn't use upper bits D_ASSERT((reinterpret_cast(pointer) & SALT_MASK) == 0); @@ -67,6 +79,15 @@ struct aggr_ht_entry_t { value = salt; } + static inline aggr_ht_entry_t GetDesiredEntry(const data_ptr_t &pointer, const hash_t &salt) { + auto desired = reinterpret_cast(pointer) | (salt & SALT_MASK); + return aggr_ht_entry_t(desired); + } + + static inline aggr_ht_entry_t GetEmptyEntry() { + return aggr_ht_entry_t(0); + } + private: //! Upper 16 bits are salt static constexpr const hash_t SALT_MASK = 0xFFFF000000000000; diff --git a/src/include/duckdb/execution/join_hashtable.hpp b/src/include/duckdb/execution/join_hashtable.hpp index b4910a554229..11f5df9196e3 100644 --- a/src/include/duckdb/execution/join_hashtable.hpp +++ b/src/include/duckdb/execution/join_hashtable.hpp @@ -54,8 +54,9 @@ struct JoinHTScanState { [POINTER] [POINTER] [POINTER] - The pointers are either NULL + The pointers are either NULL if the cell is free or point to the start of the linked list. */ + class JoinHashTable { public: using ValidityBytes = TemplatedValidityMask; @@ -66,9 +67,13 @@ class JoinHashTable { //! probe. struct ScanStructure { TupleDataChunkState &key_state; + //! Directly point to the entry in the hash table Vector pointers; idx_t count; SelectionVector sel_vector; + SelectionVector chain_match_sel_vector; + SelectionVector chain_no_match_sel_vector; + // whether or not the given tuple has found a match unsafe_unique_array found_match; JoinHashTable &ht; @@ -78,7 +83,7 @@ class JoinHashTable { //! Get the next batch of data from the scan structure void Next(DataChunk &keys, DataChunk &left, DataChunk &result); //! Are pointer chains all pointing to NULL? - bool PointersExhausted(); + bool PointersExhausted() const; private: //! Next operator for the inner join @@ -105,7 +110,6 @@ class JoinHashTable { idx_t ScanInnerJoin(DataChunk &keys, SelectionVector &result_vector); public: - void InitializeSelectionVector(const SelectionVector *¤t_sel); void AdvancePointers(); void AdvancePointers(const SelectionVector &sel, idx_t sel_count); void GatherResult(Vector &result, const SelectionVector &result_vector, const SelectionVector &sel_vector, @@ -115,6 +119,32 @@ class JoinHashTable { }; public: + struct ProbeState { + + ProbeState(); + + Vector hash_salts_v; + Vector ht_offsets_v; + Vector row_ptr_insert_to_v; + + SelectionVector key_no_match_sel; + + // Selection vectors for the find entries loop. There are three options: + // 1. Entry is empty -> return null (do nothing, vector is already null) + // 2. Entry is full and salt matches -> compare the keys + // 3. Entry is full and salt does not match -> continue probing + SelectionVector salt_match_sel; + SelectionVector salt_no_match_sel; + }; + + struct InsertState : ProbeState { + InsertState(); + + /// Because of the index hick up + SelectionVector key_remaining_sel; + SelectionVector empty_sel; + }; + JoinHashTable(BufferManager &buffer_manager, const vector &conditions, vector build_types, JoinType type, const vector &output_columns); ~JoinHashTable(); @@ -132,7 +162,7 @@ class JoinHashTable { //! ever called. void Finalize(idx_t chunk_idx_from, idx_t chunk_idx_to, bool parallel); //! Probe the HT with the given input chunk, resulting in the given result - unique_ptr Probe(DataChunk &keys, TupleDataChunkState &key_state, + unique_ptr Probe(DataChunk &keys, TupleDataChunkState &key_state, ProbeState &probe_state, Vector *precomputed_hashes = nullptr); //! Scan the HT to construct the full outer join result void ScanFullOuter(JoinHTScanState &state, Vector &addresses, DataChunk &result); @@ -167,18 +197,33 @@ class JoinHashTable { vector build_types; //! Positions of the columns that need to output const vector &output_columns; - //! The comparison predicates - vector predicates; + //! The comparison predicates that only contain equality predicates + vector equality_predicates; + //! The comparison predicates that contain non-equality predicates + vector non_equality_predicates; + + //! The column indices of the equality predicates to be used to compare the rows + vector equality_predicate_columns; + //! The column indices of the non-equality predicates to be used to compare the rows + vector non_equality_predicate_columns; //! Data column layout TupleDataLayout layout; - //! Efficiently matches rows - RowMatcher row_matcher; - RowMatcher row_matcher_no_match_sel; + //! Matches the equal condition rows during the build phase of the hash join to prevent + //! duplicates in a list because of hash-collisions + RowMatcher row_matcher_build; + //! Efficiently matches the non-equi rows during the probing phase, only there if non_equality_predicates is not + //! empty + unique_ptr row_matcher_probe; + //! Matches the same rows as the row_matcher, but also returns a vector for no matches + unique_ptr row_matcher_probe_no_match_sel; + //! Is true if there are predicates that are not equality predicates and we need to use the matchers during probing + bool needs_chain_matcher; + //! The size of an entry as stored in the HashTable idx_t entry_size; //! The total tuple size idx_t tuple_size; - //! Next pointer offset in tuple + //! Next pointer offset in tuple, also used for the position of the hash, which then gets overwritten by the pointer idx_t pointer_offset; //! A constant false column for initialising right outer joins Vector vfound; @@ -213,13 +258,16 @@ class JoinHashTable { const SelectionVector *¤t_sel); void Hash(DataChunk &keys, const SelectionVector &sel, idx_t count, Vector &hashes); - //! Apply a bitmask to the hashes - void ApplyBitmask(Vector &hashes, idx_t count); - void ApplyBitmask(Vector &hashes, const SelectionVector &sel, idx_t count, Vector &pointers); + //! Gets a pointer to the entry in the HT for each of the hashes_v using linear probing. Will update the match_sel + //! vectorand the count argument to the number and position of the matches + void GetRowPointers(DataChunk &keys, TupleDataChunkState &key_state, ProbeState &state, Vector &hashes_v, + const SelectionVector &sel, idx_t &count, Vector &pointers_result_v, + SelectionVector &match_sel); private: - //! Insert the given set of locations into the HT with the given set of hashes - void InsertHashes(Vector &hashes, idx_t count, data_ptr_t key_locations[], bool parallel); + //! Insert the given set of locations into the HT with the given set of hashes_v + void InsertHashes(Vector &hashes_v, idx_t count, TupleDataChunkState &chunk_state, InsertState &insert_statebool, + bool parallel); idx_t PrepareKeys(DataChunk &keys, vector &vector_data, const SelectionVector *¤t_sel, SelectionVector &sel, bool build_side); @@ -230,8 +278,13 @@ class JoinHashTable { unique_ptr sink_collection; //! The DataCollection holding the main data of the hash table unique_ptr data_collection; + + //! The capacity of the HT. Is the same as hash_map.GetSize() / sizeof(aggr_ht_entry_t) + idx_t capacity; + //! The hash map of the HT, created after finalization AllocatedData hash_map; + aggr_ht_entry_t *entries; // todo: Maybe rename to ht_entry_t and put into separate file //! Whether or not NULL values are considered equal in each of the comparisons vector null_values_are_equal; @@ -329,9 +382,9 @@ class JoinHashTable { //! Build HT for the next partitioned probe round bool PrepareExternalFinalize(const idx_t max_ht_size); //! Probe whatever we can, sink the rest into a thread-local HT - unique_ptr ProbeAndSpill(DataChunk &keys, TupleDataChunkState &key_state, DataChunk &payload, - ProbeSpill &probe_spill, ProbeSpillLocalAppendState &spill_state, - DataChunk &spill_chunk); + unique_ptr ProbeAndSpill(DataChunk &keys, TupleDataChunkState &key_state, ProbeState &probe_state, + DataChunk &payload, ProbeSpill &probe_spill, + ProbeSpillLocalAppendState &spill_state, DataChunk &spill_chunk); private: //! The current number of radix bits used to partition diff --git a/test/playground/bug_fix_left.test b/test/playground/bug_fix_left.test new file mode 100644 index 000000000000..dad2643fb041 --- /dev/null +++ b/test/playground/bug_fix_left.test @@ -0,0 +1,40 @@ +# name: test/playground/bug_fix_left.test +# group: [playground] + +statement ok +CREATE TABLE departments ( + dept_id INTEGER PRIMARY KEY, + dept_name VARCHAR(50) +) + +statement ok +CREATE TABLE employees ( + emp_id INTEGER PRIMARY KEY, + emp_name VARCHAR(50), + dept_id INTEGER, + FOREIGN KEY (dept_id) REFERENCES departments(dept_id) +) + +statement ok +INSERT INTO departments (dept_id, dept_name) VALUES +(1, 'Marketing'), +(2, 'Engineering') + +statement ok +INSERT INTO employees (emp_id, emp_name, dept_id) VALUES +(1, 'Alice', 1), +(2, 'Bob', 2), +(3, 'Charlie', NULL), +(4, 'Tom', 1) + + +statement ok +SELECT + e.emp_name AS EmployeeName, + d.dept_name AS DepartmentName +FROM + employees e +LEFT JOIN + departments d ON e.dept_id = d.dept_id +ORDER BY + e.emp_id \ No newline at end of file