Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

predication test #3

Draft
wants to merge 37 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
52ef66d
found way to get row data as chunk during build
gropaul Feb 13, 2024
1662a5a
linear insertion now working
gropaul Feb 13, 2024
c78531b
during probing also linear probing is done
gropaul Feb 16, 2024
f0f43d1
fixed bug when probe side contains null values
gropaul Feb 19, 2024
dd2073b
added parallel insertion
gropaul Feb 19, 2024
5e68c55
bug fix: Proper handle null values during probing
gropaul Feb 20, 2024
ada1356
bug fix: Only chain based on equi conditions
gropaul Feb 20, 2024
cff2c19
bug fix: column types misaligned in row matcher for column filter
gropaul Feb 20, 2024
cfc12dd
bug fix: parallel insertion fixed by properly using atomics
gropaul Feb 21, 2024
9ef8fc1
Merge branch 'main' into adapt-hash-join
gropaul Feb 21, 2024
f029c97
bug fix: added default constructor for aggr_ht_entry_t for atomic usage
gropaul Feb 21, 2024
4f643fb
added thread local state for reduced allocations
gropaul Feb 22, 2024
ff2f9d5
added thread local state for reduced allocations
gropaul Feb 22, 2024
4e5faa9
first try
gropaul Feb 27, 2024
fba2ab1
improvement: increasing until match in the end
gropaul Feb 27, 2024
a16f4d9
improvement: only process key-comparison rows in the loop
gropaul Feb 27, 2024
58736cd
improvement: cache ht entries
gropaul Feb 27, 2024
8182be7
removed caching but keept loop optimization
gropaul Feb 28, 2024
746982d
optimization: simplfied probing logic
gropaul Feb 28, 2024
0da5369
optimization: simplfied probing logic
gropaul Feb 28, 2024
c4bfd72
Merge remote-tracking branch 'origin/adapt-hash-join-predication' int…
gropaul Feb 28, 2024
1b192a8
improvement: selecting non empty rows during probing
gropaul Feb 29, 2024
457d8ef
improvement: hacky allocation mitgations
gropaul Feb 29, 2024
f742018
added some debug prints
gropaul Feb 29, 2024
040ecc0
Merge remote-tracking branch 'origin/adapt-hash-join-predication' int…
gropaul Feb 29, 2024
25fb23c
bugfix: validity mask drag
gropaul Mar 4, 2024
9507dd5
bugfix: row index during constant hash vector neglected
gropaul Mar 4, 2024
d281127
bugfix: different keys in same chain because of simultaneous insertion
gropaul Mar 4, 2024
f7c9b47
formatting
gropaul Mar 4, 2024
dd56278
Merge branch 'main' into adapt-hash-join-predication
gropaul Mar 4, 2024
ed9f6a0
tmp commit
gropaul Mar 5, 2024
2eaf0dc
Merge branch 'main' into adapt-hash-join-predication
gropaul Mar 5, 2024
6abb903
all tests from make unittest run locally
gropaul Mar 5, 2024
4a2fe1d
bugfix: insert into empty
gropaul Mar 5, 2024
08aaa6a
improvements: smaller improvements during probing
gropaul Mar 6, 2024
159a28d
improvements: insertion now similar to probing
gropaul Mar 6, 2024
752d8a5
test: really strict predication approach
gropaul Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/regression/join.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
benchmark/micro/join/asof_join.benchmark
benchmark/micro/join/blockwise_nl_join.benchmark
benchmark/micro/join/range_join_small_rhs.benchmark
117 changes: 117 additions & 0 deletions scripts/local_regression_test_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import subprocess
import os
import sys
from typing import Literal

# Builds the benchmark runner of the current and the main branch and runs the benchmarks.


DEFAULT_RUNNER_PATH = "build/release/benchmark/benchmark_runner" # this is whats getting build by default
NEW_RUNNER_PATH = "build/release/benchmark/benchmark_runner_new" # from local branch
OLD_RUNNER_PATH = "build/release/benchmark/benchmark_runner_old" # from main branch


def build(stash_changes: bool = False):
original_branch = get_current_branch()

# Execute git status with the --porcelain option
output = subprocess.check_output(['git', 'status', '--porcelain']).decode('utf-8')

# Filter out lines that start with "??" (untracked files)
changes = [line for line in output.strip().split('\n') if not line.startswith('??')]
auto_stashed = False
if changes and stash_changes:
print("Stashing changes")
subprocess.check_output(['git', 'stash'])
auto_stashed = True
elif changes:
print("There are uncommitted changes. Please commit or stash them or use --stash to stash them automatically")
exit(1)

# checkout the main branch and build the runner
subprocess.check_output(['git', 'checkout', 'main'])
print("Building runner on main branch...")
build_runner()
subprocess.check_output(['cp', DEFAULT_RUNNER_PATH, OLD_RUNNER_PATH])

# checkout the original branch and build the runner
subprocess.check_output(['git', 'checkout', original_branch])

if auto_stashed:
print("Unstashing changes")
subprocess.check_output(['git', 'stash', 'pop'])

print(f"Building runner on branch {original_branch}...")
build_runner()
subprocess.check_output(['cp', DEFAULT_RUNNER_PATH, NEW_RUNNER_PATH])


def get_current_branch():
return subprocess.check_output(['git', 'rev-parse', '--abbrev-ref', 'HEAD']).decode('utf-8').strip()


def build_runner():
# set env variables to
env = {"BUILD_BENCHMARK": "1", "BUILD_TPCH": "1", "BUILD_HTTPFS": "1"}
# Add the current environment
env.update(os.environ)
subprocess.run(["make"], env=env)


def run_benchmark(old_runner, new_runner, benchmark_file):
"Expected usage: python3 scripts/regression_test_runner.py --old=/old/benchmark_runner --new=/new/benchmark_runner --benchmarks=/benchmark/list.csv"

if not os.path.isfile(old_runner):
print(f"Failed to find old runner {old_runner}")
exit(1)

if not os.path.isfile(new_runner):
print(f"Failed to find new runner {new_runner}")
exit(1)

command = [
'python3',
'scripts/regression_test_runner.py',
f'--old={old_runner}',
f'--new={new_runner}',
f'--benchmarks={benchmark_file}',
'--threads=4',
]

print(f"Running command: {' '.join(command)}")

# start the existing runner, make sure to pipe the output to the console
subprocess.run(command, check=True)


def main():
benchmark_file = None
stash_changes = False
for arg in sys.argv:
if arg.startswith("--benchmarks="):
benchmark_file = arg.replace("--benchmarks=", "")
elif arg == "--stash":
stash_changes = True

elif arg == "--help":
print("Expected usage: python3 scripts/local_regression_test_runner.py --benchmarks=/benchmark/list.csv")
print("Optional: --stas: Stash changes before running the benchmarks")
exit(1)

# make sure that we are in the root directory of the project
if not os.path.isfile("scripts/local_regression_test_runner.py"):
print("Please run this script from the root directory of the project")
exit(1)

if benchmark_file is None:
print(
"Expected usage: python3 scripts/local_regression_test_runner.py ---benchmarks=.github/regression/imdb.csv"
)
exit(1)

build(stash_changes)
run_benchmark(OLD_RUNNER_PATH, NEW_RUNNER_PATH, benchmark_file)


if __name__ == "__main__":
main()
35 changes: 33 additions & 2 deletions src/common/row_operations/row_matcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,43 @@ void RowMatcher::Initialize(const bool no_match_sel, const TupleDataLayout &layo
}
}

void RowMatcher::Initialize(const bool no_match_sel, const TupleDataLayout &layout, const Predicates &predicates,
vector<column_t> &column_ids_p) {

// The column_ids must have the same size as the predicates vector
D_ASSERT(column_ids_p.size() == predicates.size());

// The largest column_id must be smaller than the number of columns in the layout
D_ASSERT(*max_element(column_ids_p.begin(), column_ids_p.end()) < layout.ColumnCount());

column_ids = make_uniq<vector<column_t>>(column_ids_p);

match_functions.reserve(predicates.size());
for (idx_t idx = 0; idx < predicates.size(); idx++) {
column_t col_idx = (*column_ids)[idx];
match_functions.push_back(GetMatchFunction(no_match_sel, layout.GetTypes()[col_idx], predicates[idx]));
}
}

idx_t RowMatcher::Match(DataChunk &lhs, const vector<TupleDataVectorFormat> &lhs_formats, SelectionVector &sel,
idx_t count, const TupleDataLayout &rhs_layout, Vector &rhs_row_locations,
SelectionVector *no_match_sel, idx_t &no_match_count) {
D_ASSERT(!match_functions.empty());
for (idx_t col_idx = 0; col_idx < match_functions.size(); col_idx++) {
const auto &match_function = match_functions[col_idx];

if (column_ids) {
// The column_ids must have the same size as the match_functions vector
D_ASSERT(column_ids->size() == match_functions.size());

// The largest column_id must be smaller than the number of columns in the lhs
D_ASSERT(*max_element(column_ids->begin(), column_ids->end()) < lhs.ColumnCount());
}

for (idx_t fun_idx = 0; fun_idx < match_functions.size(); fun_idx++) {
// if we only care about specific columns, we need to use the column_ids to get the correct column index
// otherwise, we just use the fun_idx
const auto col_idx = column_ids ? (*column_ids)[fun_idx] : fun_idx;

const auto &match_function = match_functions[fun_idx];
count =
match_function.function(lhs.data[col_idx], lhs_formats[col_idx], sel, count, rhs_layout, rhs_row_locations,
col_idx, match_function.child_functions, no_match_sel, no_match_count);
Expand Down
11 changes: 11 additions & 0 deletions src/common/types/row/tuple_data_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,17 @@ void TupleDataCollection::InitializeChunk(DataChunk &chunk) const {
chunk.Initialize(allocator->GetAllocator(), layout.GetTypes());
}

void TupleDataCollection::InitializeChunk(DataChunk &chunk, const vector<column_t> &column_ids) const {
vector<LogicalType> chunk_types(column_ids.size());
// keep the order of the columns
for (idx_t i = 0; i < column_ids.size(); i++) {
auto column_idx = column_ids[i];
D_ASSERT(column_idx < layout.ColumnCount());
chunk_types[i] = layout.GetTypes()[column_idx];
}
chunk.Initialize(allocator->GetAllocator(), chunk_types);
}

void TupleDataCollection::InitializeScanChunk(TupleDataScanState &state, DataChunk &chunk) const {
auto &column_ids = state.chunk_state.column_ids;
D_ASSERT(!column_ids.empty());
Expand Down
Loading
Loading