Skip to content

Commit

Permalink
Paralelization works correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
Learus committed Jan 17, 2019
1 parent 1823b6a commit a492265
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"CC": "g++",
"CCFLAGS": ["-Wall", "-Wextra", "-std=c++17", "-g3"],
"CCFLAGS": ["-Wall", "-Wextra", "-std=c++17", "-O3"],
"LIBS": ["-lpthread"],
"PATH_INC": "./inc",
"PATH_SRC": "./src",
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
bin/
data/
/Makefile
workload/input.txt
2 changes: 2 additions & 0 deletions inc/list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ namespace utility
void push_front(T&&);
void pop_front();

void merge(list&&);

template <typename ...Args>
void emplace_back(Args&&...);

Expand Down
22 changes: 22 additions & 0 deletions inc/list.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,28 @@ inline void utility::list<T>::pop_front()
s--;
}

template <typename T>
inline void utility::list<T>::merge(list&& new_list)
{
if (!head)
{
if (new_list.head) {
head = new_list.head;
tail = new_list.tail;
}
}
else
{
if (new_list.head) {
tail->next = new_list.head;
new_list.head->prev = tail;
tail = new_list.tail;
}
}

s += new_list.size();
}

template <typename T>
template <typename ...Args>
inline void utility::list<T>::emplace_back(Args&&... args)
Expand Down
3 changes: 2 additions & 1 deletion inc/result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <list.hpp>
#include <iosfwd> // std::ostream

#define CAPACITY (1024UL * 1024UL / sizeof(RHJ::Result))
#define CAPACITY (128UL * 1024UL / sizeof(RHJ::Result))

namespace RHJ
{
Expand Down Expand Up @@ -55,6 +55,7 @@ namespace RHJ
friend std::ostream& operator<<(std::ostream&, const Results&);

void push_back(tuple_key_t, tuple_key_t);
// void merge(Results&&);
};

std::ostream& operator<<(std::ostream&, const Results&);
Expand Down
173 changes: 163 additions & 10 deletions src/histhash.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@

#include <histhash.hpp>
#include <thread_pool.hpp>
#include <cmath> // std::pow
#include <utility> // std::move
#include <vector>
#include <iostream>

#if defined (__PSUM_DEBUG__)
#include <fstream>
Expand Down Expand Up @@ -46,6 +49,60 @@ RHJ::PsumTable::Bucket& RHJ::PsumTable::Bucket::operator=(Bucket&& other) noexce
return *this;
}

struct HistogramJobContainer {
RHJ::Relation::Tuple * tuples;
std::size_t size;

radix_t radix;
std::size_t * histogram;
std::size_t *hashes;
};

void HistogramJob(void * data) {

HistogramJobContainer *container = (HistogramJobContainer *)data;

for (std::size_t i = 0; i < container->size; i++) {
// std::size_t hash = HASH(container->tuples[i].payload, container->radix);
container->histogram[ container->hashes[i] = HASH(container->tuples[i].payload, container->radix) ]++;
// container->histogram[ hash ]++;
}
}

struct PartitionJobContainer {
RHJ::Relation::Tuple * tuples;
std::size_t relation_size;

RHJ::Relation::Tuple * reordered_tuples;
std::size_t size;

std::size_t psum_size;
std::size_t * histogram;
std::size_t * psum;
std::size_t *hashes;
};

void PartitionJob(void * data) {
PartitionJobContainer *container = (PartitionJobContainer *)data;

// for (int i = 0; i < container->size; i++) {
// std::cout << "hashes - " << container->hashes[i] << " ";
// }
// std::cout << std::endl;

for (std::size_t i = 0UL; i < container->size; i++) {

std::size_t hash = container->hashes[i];

std::size_t index = container->psum[hash];
container->psum[hash]++;

container->reordered_tuples[index] = container->tuples[i];

container->histogram[hash]--;
}
}

RHJ::PsumTable::PsumTable(const Relation& rel, radix_t _radix, std::size_t _psum_size)
:
#if defined (__VERBOSE__)
Expand All @@ -56,37 +113,133 @@ RHJ::PsumTable::PsumTable(const Relation& rel, radix_t _radix, std::size_t _psum
{
std::size_t *histogram = new std::size_t[psum_size]{0UL};

// <SINGLE THREAD IMPLEMENTATION> //
// Creating a table which contains hashes of each tuple
std::size_t *hashes = new std::size_t[rel.size];
// std::size_t *hashes = new std::size_t[rel.size];
// </SINGLE THREAD IMPLEMENTATION> //

std::size_t num_threads = 16;
thread_pool::create(num_threads);

std::size_t curOffset = 0;
std::size_t offSet = rel.size / num_threads;

std::vector<HistogramJobContainer *> histogram_containers;

std::size_t actual_threads = 0;

for (std::size_t i = 0; i < num_threads; i++) {

for (std::size_t i = 0UL; i < rel.size; i++)
histogram[ hashes[i] = HASH(rel.tuples[i].payload, radix) ]++;
HistogramJobContainer *data = new HistogramJobContainer;

if (offSet == 0)
data->size = rel.size;
else if (i == num_threads - 1) {
data->size = rel.size - curOffset;
}
else
data->size = offSet;


data->radix = this->radix;
data->histogram = new std::size_t[this->psum_size]{0UL};
data->tuples = &(rel.tuples[curOffset]);
data->hashes = new std::size_t[data->size];

histogram_containers.push_back(data);

curOffset += (offSet == 0 ? rel.size : offSet);

thread_pool::schedule(HistogramJob, (void *)(data) );

actual_threads++;

if (curOffset >= rel.size) break;
}

thread_pool::block();

this->psum = new std::size_t[psum_size];

std::size_t sum = 0UL;

for (std::size_t i = 0UL; i < psum_size; i++) {
for (std::size_t i = 0UL; i < this->psum_size; i++) {
for (std::size_t j = 0UL; j < histogram_containers.size(); j++) {
histogram[i] += histogram_containers[j]->histogram[i];
}
this->psum[i] = sum;
sum += histogram[i];
}


// <SINGLE THREAD IMPLEMENTATION> //
// for (std::size_t i = 0UL; i < rel.size; i++) {
// histogram[ hashes[i] = HASH(rel.tuples[i].payload, radix) ]++;
// }
// </SINGLE THREAD IMPLEMENTATION> //

for (std::size_t i = 0UL; i < rel.size; i++) {
std::vector<PartitionJobContainer *> partition_containers;

std::size_t * next_sum = new std::size_t[this->psum_size];
for (std::size_t j = 0; j < psum_size; j++) {
next_sum[j] = psum[j];
}

for (std::size_t i = 0; i < actual_threads; i++) {
PartitionJobContainer *data = new PartitionJobContainer;

std::size_t hash = hashes[i];
data->hashes = histogram_containers[i]->hashes;
data->histogram = histogram_containers[i]->histogram;
data->psum = next_sum;

std::size_t index = (hash < this->psum_size - 1UL ? this->psum[hash + 1UL] : rel.size) - histogram[hash];
std::size_t *temp_sum = new std::size_t[this->psum_size];
for (std::size_t j = 0; j < psum_size; j++) {
temp_sum[j] = next_sum[j] + histogram_containers[i]->histogram[j];
}

this->table.tuples[index] = rel.tuples[i];
next_sum = temp_sum;

histogram[hash]--;
data->psum_size = psum_size;
data->tuples = histogram_containers[i]->tuples;
data->reordered_tuples = this->table.tuples;
data->size = histogram_containers[i]->size;
data->relation_size = rel.size;

thread_pool::schedule(PartitionJob, (void *)(data) );
partition_containers.push_back(data);
}

thread_pool::block();

// <SINGLE THREAD IMPLEMENTATION> //
// for (std::size_t i = 0UL; i < rel.size; i++) {

// std::size_t hash = HASH(rel.tuples[i].payload, this->radix);

// std::size_t index = (hash < this->psum_size - 1UL ? this->psum[hash + 1UL] : rel.size) - histogram[hash];

// this->table.tuples[index] = rel.tuples[i];

// histogram[hash]--;

// }

// delete[] hashes
// </SINGLE THREAD IMPLEMENTATION> //


delete[] histogram;
delete[] hashes;
delete[] next_sum;

for (std::size_t i = 0; i < actual_threads; i++) {
delete[] partition_containers[i]->hashes;
delete[] partition_containers[i]->histogram;
delete[] partition_containers[i]->psum;
delete partition_containers[i];
delete histogram_containers[i];
}

thread_pool::destroy();
}

RHJ::PsumTable::~PsumTable() {
Expand Down
1 change: 1 addition & 0 deletions src/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ bool isPrime(std::size_t candidate)
return true;
}


void RHJ::Index::join(const RHJ::PsumTable::Bucket& bucket, RHJ::Results& results, const RHJ::PsumTable& lpsum) const
{
std::uintptr_t min = reinterpret_cast<std::uintptr_t>(lpsum.table.tuples);
Expand Down
83 changes: 69 additions & 14 deletions src/relation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include <cmath> // std::ceil, std::log
#include <fstream> // std::ostream
#include <iostream>
#include <vector>
#include <thread_pool.hpp>
#include <list.hpp>

#if defined (__VERBOSE__)
#include <iomanip> // std::setw, std::setfill, std::left
Expand Down Expand Up @@ -57,6 +60,32 @@ std::ostream& RHJ::operator<<(std::ostream& os, const RHJ::Relation& relation)
return os;
}

struct JoinJobContainer {
RHJ::PsumTable * hashTableR;
RHJ::PsumTable * hashTableS;
std::size_t hash;
RHJ::Results * results;
};

void JoinJob(void * data) {
JoinJobContainer *container = (JoinJobContainer *)data;

RHJ::PsumTable::Bucket r( (*(container->hashTableR))[container->hash]);
RHJ::PsumTable::Bucket s( (*(container->hashTableS))[container->hash]);

if (!r.size || !s.size)
return;

if (r.size < s.size)
{
RHJ::Index index(r); index.join(s, *(container->results), *(container->hashTableR));
}
else
{
RHJ::Index index(s); index.join(r, *(container->results), *(container->hashTableR));
}
}

RHJ::Results RHJ::Relation::RadixHashJoin(const RHJ::Relation& relR, const RHJ::Relation& relS) {

radix_t radix; std::size_t range;
Expand All @@ -73,23 +102,49 @@ RHJ::Results RHJ::Relation::RadixHashJoin(const RHJ::Relation& relR, const RHJ::
Results results;
#endif

std::size_t num_threads = range;
thread_pool::create(num_threads);

std::vector<JoinJobContainer *> join_containers;


for (std::size_t hash = 0UL; hash < range; hash++)
{
PsumTable::Bucket r(hashTableR[hash]);
PsumTable::Bucket s(hashTableS[hash]);

if (!r.size || !s.size)
continue;

if (r.size < s.size)
{
Index index(r); index.join(s, results, hashTableR);
}
else
{
Index index(s); index.join(r, results, hashTableR);
}
Results *results = new Results();
JoinJobContainer *data = new JoinJobContainer;
data->hash = hash;
data->hashTableR = &hashTableR;
data->hashTableS = &hashTableS;
data->results = results;

thread_pool::schedule(JoinJob, (void *)data);
join_containers.push_back(data);

// <SINGLE_THREAD_IMPLEMENTATION> //
// PsumTable::Bucket r(hashTableR[hash]);
// PsumTable::Bucket s(hashTableS[hash]);

// if (!r.size || !s.size)
// continue;

// if (r.size < s.size)
// {
// Index index(r); index.join(s, results, hashTableR);
// }
// else
// {
// Index index(s); index.join(r, results, hashTableR);
// }
// </SINGLE_THREAD_IMPLEMENTATION> //
}

thread_pool::block();

for (std::size_t i = 0; i < join_containers.size(); i++) {
results.merge(static_cast<utility::list<Buffer>&&>(*(join_containers[i]->results) ));
}

thread_pool::destroy();

return results;
}
Loading

0 comments on commit a492265

Please sign in to comment.