Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support lock-free hashmap backend #436

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions HugeCTR/include/hps/hash_map_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <thread_pool.hpp>
#include <unordered_map>
#include <vector>
#include <atomic>

namespace HugeCTR {

Expand Down Expand Up @@ -115,6 +116,35 @@ class HashMapBackend final : public VolatileBackend<Key, HashMapBackendParams> {
uint64_t access_count;
};
ValuePtr value;
std::atomic<int8_t> lck;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically speaking I have nothing against this change in general. But need to carefully think that this will not break the MultiProcessHashmap. Maybe it is better to splice the code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is more relevant is that despite you using int_8 here, the memory usage should still increase by 8 here due to alignment. Can we un-align the data structure without loss of performance?


Payload() { lck = -1; }

explicit Payload(const Payload& p) {
value = p.value;
lck.store(p.lck.load());
}

Payload(Payload&& p) {
value = std::exchange(p.value, nullptr);
lck.store(p.lck.load());
}

Payload& operator=(const Payload& p) {
if (this != &p) {
value = p.value;
lck.store(p.lck.load());
}
return *this;
}

Payload& operator=(Payload&& p) {
if (this != &p) {
value = std::exchange(p.value, nullptr);
lck.store(p.lck.load());
}
return *this;
}
};
using Entry = std::pair<const Key, Payload>;

Expand All @@ -126,21 +156,53 @@ class HashMapBackend final : public VolatileBackend<Key, HashMapBackendParams> {
std::vector<ValuePage> value_pages;
std::vector<ValuePtr> value_slots;

mutable std::shared_mutex read_write_lck;

// Key -> Payload map.
phmap::flat_hash_map<Key, Payload> entries;
phmap::parallel_flat_hash_map<Key, Payload, phmap::priv::hash_default_hash<Key>,
phmap::priv::hash_default_eq<Key>,
phmap::priv::Allocator<phmap::priv::Pair<const Key, Payload>>,
4, std::shared_mutex> entries;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will avoid breaking the updates, but partly due to reasons that I wanted to avoid.

Why parallel_hash_map does with this configuration is to split the hash table into 4 independent pieces.

The submap is this thing in their code:

struct Inner : public Lockable

Upon query each call ends up with a structure like this,

typename Lockable::UniqueLock m;
auto res = this->find_or_prepare_insert(k, m);

which in turn executes:

Inner& inner = sets_[subidx(hashval)];
auto&  set   = inner.set_;
mutexlock    = std::move(typename Lockable::UniqueLock(inner));

. So in other words. It will select one of the 4 independent submaps and FULLY lock it. So there is no parallelism possible on that submap. So in the very BEST case when you have 4 keys, each being hashed by subidx to different sub-hashmaps, you can accomodate up to 4 threads. Arguably, that is better than now. But you need to be aware this the worst-case still exists. Hence the reason why we didn't use their parallelized hashmap implementation yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingcanw We can take over this part of the change without much issues. In my new streamlined hashmap we will use a similar method to speed things up.


Partition() = delete;

Partition(const uint32_t value_size, const HashMapBackendParams& params)
: value_size{value_size}, allocation_rate{params.allocation_rate} {}

explicit Partition(const Partition& p) {
value_size = p.value_size;
allocation_rate = p.allocation_rate;

value_pages = p.value_pages;
value_slots = p.value_slots;

entries = entries;
}

Partition& operator=(Partition&& p) {
if (this != &p) {
// TODO(robertzhu)
// std::scoped_lock lock(read_write_lck, p.read_write_lck);

value_size = p.value_size;
allocation_rate = p.allocation_rate;

value_pages = std::move(p.value_pages);
value_slots = std::move(p.value_slots);

entries = std::move(entries);
}

return *this;
}
};

// Actual data.
CharAllocator char_allocator_;
std::unordered_map<std::string, std::vector<Partition>> tables_;

// Access control.
mutable std::shared_mutex read_write_guard_;
// mutable std::shared_mutex read_write_guard_;

// Overflow resolution.
size_t resolve_overflow_(const std::string& table_name, size_t part_index, Partition& part);
Expand Down
16 changes: 15 additions & 1 deletion HugeCTR/include/hps/hash_map_backend_detail.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <hps/inference_utils.hpp>
#include <thread_pool.hpp>
#include <type_traits>
#include <atomic>

namespace HugeCTR {

Expand Down Expand Up @@ -55,7 +56,9 @@ namespace HugeCTR {
const Payload& payload{it->second}; \
\
/* Stash pointer and reference in map. */ \
std::unique_lock lock(part.read_write_lck); \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingcanw What worries me about this is mostly the strain under heavy load. We would need to measure if it doesn't kill insert speed. The new hashmap will fix this one.

part.value_slots.emplace_back(payload.value); \
lock.unlock(); \
part.entries.erase(it); \
++num_deletions; \
} \
Expand Down Expand Up @@ -91,6 +94,7 @@ namespace HugeCTR {
\
/* Race-conditions here are deliberately ignored because insignificant in practice. */ \
__VA_ARGS__; \
while (payload.lck.load(std::memory_order_relaxed) != 0); \
std::copy_n(payload.value, part.value_size, &values[(k - keys) * value_stride]); \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here is a logical error.

What if payload.value is rapidly changed. The lck doesn't guarantee that. During the copy_n payload value might change or become invalid.

} else { \
on_miss(k - keys); \
Expand Down Expand Up @@ -135,13 +139,15 @@ namespace HugeCTR {
std::is_same_v<decltype(k), const Key* const>); \
static_assert(std::is_same_v<decltype(values), const char* const>); \
\
/* TODO(robertzhu): use thread safe api */ \
const auto& res{part.entries.try_emplace(*k)}; \
Payload& payload{res.first->second}; \
\
__VA_ARGS__; \
\
/* If new insertion. */ \
if (res.second) { \
std::unique_lock<std::shared_mutex> lock(part.read_write_lck); \
/* If no free space, allocate another buffer, and fill pointer queue. */ \
if (part.value_slots.empty()) { \
const size_t stride{(value_size + value_page_alignment - 1) / value_page_alignment * \
Expand All @@ -152,6 +158,7 @@ namespace HugeCTR {
/* Get more memory. */ \
part.value_pages.emplace_back(num_values* stride, char_allocator_); \
ValuePage& value_page{part.value_pages.back()}; \
/*HCTR_LOG_C(DEBUG, WORLD, "insert value_page: num_values ", num_values, "; stride ", stride, "; value_page ", value_page.capacity(), ".\n"); \*/
\
/* Stock up slot references. */ \
part.value_slots.reserve(part.value_slots.size() + num_values); \
Expand All @@ -165,9 +172,16 @@ namespace HugeCTR {
payload.value = part.value_slots.back(); \
part.value_slots.pop_back(); \
++num_inserts; \
lock.unlock(); \
} \
\
if (payload.lck.load(std::memory_order_relaxed) != -1) { \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it could work. It is a little bit fuzzy. Can you clarify the intended lifecycle of lck. What value should follow after what?

int8_t expected = 0; \
while (!payload.lck.compare_exchange_weak(expected, 1, std::memory_order_release, \
std::memory_order_relaxed)); \
} \
std::copy_n(&values[(k - keys) * value_stride], value_size, payload.value); \
payload.lck.store(0, std::memory_order_relaxed); \
} while (0)

/**
Expand Down Expand Up @@ -198,4 +212,4 @@ namespace HugeCTR {
// TODO: Remove me!
#pragma GCC diagnostic pop

} // namespace HugeCTR
} // namespace HugeCTR
20 changes: 10 additions & 10 deletions HugeCTR/src/hps/hash_map_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ HashMapBackend<Key>::HashMapBackend(const HashMapBackendParams& params) : Base(p

template <typename Key>
size_t HashMapBackend<Key>::size(const std::string& table_name) const {
const std::shared_lock lock(read_write_guard_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes below are fine under the assumption that we takeover the above changes.

// const std::shared_lock lock(read_write_guard_);

// Locate the partitions.
const auto& tables_it{tables_.find(table_name)};
Expand All @@ -54,7 +54,7 @@ size_t HashMapBackend<Key>::contains(const std::string& table_name, const size_t
const Key* const keys,
const std::chrono::nanoseconds& time_budget) const {
const auto begin{std::chrono::high_resolution_clock::now()};
const std::shared_lock lock(read_write_guard_);
// const std::shared_lock lock(read_write_guard_);

// Locate partitions.
const auto& tables_it{tables_.find(table_name)};
Expand Down Expand Up @@ -132,7 +132,7 @@ size_t HashMapBackend<Key>::insert(const std::string& table_name, const size_t n
const uint32_t value_size, const size_t value_stride) {
HCTR_CHECK(value_size <= value_stride);

const std::unique_lock lock(read_write_guard_);
// const std::unique_lock lock(read_write_guard_);

// Locate the partitions, or create them, if they do not exist yet.
const auto& tables_it{tables_.try_emplace(table_name).first};
Expand Down Expand Up @@ -222,7 +222,7 @@ size_t HashMapBackend<Key>::fetch(const std::string& table_name, const size_t nu
const size_t value_stride, const DatabaseMissCallback& on_miss,
const std::chrono::nanoseconds& time_budget) {
const auto begin{std::chrono::high_resolution_clock::now()};
const std::shared_lock lock(read_write_guard_);
// const std::shared_lock lock(read_write_guard_);

// Locate the partitions.
const auto& tables_it{tables_.find(table_name)};
Expand Down Expand Up @@ -306,7 +306,7 @@ size_t HashMapBackend<Key>::fetch(const std::string& table_name, const size_t nu
const DatabaseMissCallback& on_miss,
const std::chrono::nanoseconds& time_budget) {
const auto begin{std::chrono::high_resolution_clock::now()};
const std::shared_lock lock(read_write_guard_);
// const std::shared_lock lock(read_write_guard_);

// Locate the partitions.
const auto& tables_it{tables_.find(table_name)};
Expand Down Expand Up @@ -386,7 +386,7 @@ size_t HashMapBackend<Key>::fetch(const std::string& table_name, const size_t nu

template <typename Key>
size_t HashMapBackend<Key>::evict(const std::string& table_name) {
const std::unique_lock lock(read_write_guard_);
// const std::unique_lock lock(read_write_guard_);

// Locate the partitions.
const auto& tables_it{tables_.find(table_name)};
Expand All @@ -410,7 +410,7 @@ size_t HashMapBackend<Key>::evict(const std::string& table_name) {
template <typename Key>
size_t HashMapBackend<Key>::evict(const std::string& table_name, const size_t num_keys,
const Key* const keys) {
const std::unique_lock lock(read_write_guard_);
// const std::unique_lock lock(read_write_guard_);

// Locate the partitions.
const auto& tables_it{tables_.find(table_name)};
Expand Down Expand Up @@ -476,7 +476,7 @@ template <typename Key>
std::vector<std::string> HashMapBackend<Key>::find_tables(const std::string& model_name) {
const std::string& tag_prefix{HierParameterServerBase::make_tag_name(model_name, "", false)};

const std::shared_lock lock(read_write_guard_);
// const std::shared_lock lock(read_write_guard_);

std::vector<std::string> matches;
for (const auto& pair : tables_) {
Expand All @@ -489,7 +489,7 @@ std::vector<std::string> HashMapBackend<Key>::find_tables(const std::string& mod

template <typename Key>
size_t HashMapBackend<Key>::dump_bin(const std::string& table_name, std::ofstream& file) {
const std::shared_lock lock(read_write_guard_);
// const std::shared_lock lock(read_write_guard_);

// Locate the partitions.
const auto& tables_it{tables_.find(table_name)};
Expand Down Expand Up @@ -519,7 +519,7 @@ size_t HashMapBackend<Key>::dump_bin(const std::string& table_name, std::ofstrea
#ifdef HCTR_USE_ROCKS_DB
template <typename Key>
size_t HashMapBackend<Key>::dump_sst(const std::string& table_name, rocksdb::SstFileWriter& file) {
const std::shared_lock lock(read_write_guard_);
// const std::shared_lock lock(read_write_guard_);

// Locate the partitions.
const auto& tables_it{tables_.find(table_name)};
Expand Down
2 changes: 1 addition & 1 deletion HugeCTR/src/hps/hier_parameter_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ void HierParameterServer<TypeHashKey>::update_database_per_model(
};

char host_name[HOST_NAME_MAX + 1];
HCTR_CHECK_HINT(!gethostname(host_name, sizeof(host_name)), "Unable to determine hostname.\n");
HCTR_CHECK_HINT(!::gethostname(host_name, sizeof(host_name)), "Unable to determine hostname.\n");

switch (inference_params.update_source.type) {
case UpdateSourceType_t::Null:
Expand Down