-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support lock-free hashmap backend #436
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
#include <thread_pool.hpp> | ||
#include <unordered_map> | ||
#include <vector> | ||
#include <atomic> | ||
|
||
namespace HugeCTR { | ||
|
||
|
@@ -115,6 +116,35 @@ class HashMapBackend final : public VolatileBackend<Key, HashMapBackendParams> { | |
uint64_t access_count; | ||
}; | ||
ValuePtr value; | ||
std::atomic<int8_t> lck; | ||
|
||
Payload() { lck = -1; } | ||
|
||
explicit Payload(const Payload& p) { | ||
value = p.value; | ||
lck.store(p.lck.load()); | ||
} | ||
|
||
Payload(Payload&& p) { | ||
value = std::exchange(p.value, nullptr); | ||
lck.store(p.lck.load()); | ||
} | ||
|
||
Payload& operator=(const Payload& p) { | ||
if (this != &p) { | ||
value = p.value; | ||
lck.store(p.lck.load()); | ||
} | ||
return *this; | ||
} | ||
|
||
Payload& operator=(Payload&& p) { | ||
if (this != &p) { | ||
value = std::exchange(p.value, nullptr); | ||
lck.store(p.lck.load()); | ||
} | ||
return *this; | ||
} | ||
}; | ||
using Entry = std::pair<const Key, Payload>; | ||
|
||
|
@@ -126,21 +156,53 @@ class HashMapBackend final : public VolatileBackend<Key, HashMapBackendParams> { | |
std::vector<ValuePage> value_pages; | ||
std::vector<ValuePtr> value_slots; | ||
|
||
mutable std::shared_mutex read_write_lck; | ||
|
||
// Key -> Payload map. | ||
phmap::flat_hash_map<Key, Payload> entries; | ||
phmap::parallel_flat_hash_map<Key, Payload, phmap::priv::hash_default_hash<Key>, | ||
phmap::priv::hash_default_eq<Key>, | ||
phmap::priv::Allocator<phmap::priv::Pair<const Key, Payload>>, | ||
4, std::shared_mutex> entries; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it will avoid breaking the updates, but partly due to reasons that I wanted to avoid. Why The submap is this thing in their code:
Upon query each call ends up with a structure like this,
which in turn executes:
. So in other words. It will select one of the 4 independent submaps and FULLY lock it. So there is no parallelism possible on that submap. So in the very BEST case when you have 4 keys, each being hashed by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yingcanw We can take over this part of the change without much issues. In my new streamlined hashmap we will use a similar method to speed things up. |
||
|
||
Partition() = delete; | ||
|
||
Partition(const uint32_t value_size, const HashMapBackendParams& params) | ||
: value_size{value_size}, allocation_rate{params.allocation_rate} {} | ||
|
||
explicit Partition(const Partition& p) { | ||
value_size = p.value_size; | ||
allocation_rate = p.allocation_rate; | ||
|
||
value_pages = p.value_pages; | ||
value_slots = p.value_slots; | ||
|
||
entries = entries; | ||
} | ||
|
||
Partition& operator=(Partition&& p) { | ||
if (this != &p) { | ||
// TODO(robertzhu) | ||
// std::scoped_lock lock(read_write_lck, p.read_write_lck); | ||
|
||
value_size = p.value_size; | ||
allocation_rate = p.allocation_rate; | ||
|
||
value_pages = std::move(p.value_pages); | ||
value_slots = std::move(p.value_slots); | ||
|
||
entries = std::move(entries); | ||
} | ||
|
||
return *this; | ||
} | ||
}; | ||
|
||
// Actual data. | ||
CharAllocator char_allocator_; | ||
std::unordered_map<std::string, std::vector<Partition>> tables_; | ||
|
||
// Access control. | ||
mutable std::shared_mutex read_write_guard_; | ||
// mutable std::shared_mutex read_write_guard_; | ||
|
||
// Overflow resolution. | ||
size_t resolve_overflow_(const std::string& table_name, size_t part_index, Partition& part); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
#include <hps/inference_utils.hpp> | ||
#include <thread_pool.hpp> | ||
#include <type_traits> | ||
#include <atomic> | ||
|
||
namespace HugeCTR { | ||
|
||
|
@@ -55,7 +56,9 @@ namespace HugeCTR { | |
const Payload& payload{it->second}; \ | ||
\ | ||
/* Stash pointer and reference in map. */ \ | ||
std::unique_lock lock(part.read_write_lck); \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yingcanw What worries me about this is mostly the strain under heavy load. We would need to measure if it doesn't kill insert speed. The new hashmap will fix this one. |
||
part.value_slots.emplace_back(payload.value); \ | ||
lock.unlock(); \ | ||
part.entries.erase(it); \ | ||
++num_deletions; \ | ||
} \ | ||
|
@@ -91,6 +94,7 @@ namespace HugeCTR { | |
\ | ||
/* Race-conditions here are deliberately ignored because insignificant in practice. */ \ | ||
__VA_ARGS__; \ | ||
while (payload.lck.load(std::memory_order_relaxed) != 0); \ | ||
std::copy_n(payload.value, part.value_size, &values[(k - keys) * value_stride]); \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think here is a logical error. What if |
||
} else { \ | ||
on_miss(k - keys); \ | ||
|
@@ -135,13 +139,15 @@ namespace HugeCTR { | |
std::is_same_v<decltype(k), const Key* const>); \ | ||
static_assert(std::is_same_v<decltype(values), const char* const>); \ | ||
\ | ||
/* TODO(robertzhu): use thread safe api */ \ | ||
const auto& res{part.entries.try_emplace(*k)}; \ | ||
Payload& payload{res.first->second}; \ | ||
\ | ||
__VA_ARGS__; \ | ||
\ | ||
/* If new insertion. */ \ | ||
if (res.second) { \ | ||
std::unique_lock<std::shared_mutex> lock(part.read_write_lck); \ | ||
/* If no free space, allocate another buffer, and fill pointer queue. */ \ | ||
if (part.value_slots.empty()) { \ | ||
const size_t stride{(value_size + value_page_alignment - 1) / value_page_alignment * \ | ||
|
@@ -152,6 +158,7 @@ namespace HugeCTR { | |
/* Get more memory. */ \ | ||
part.value_pages.emplace_back(num_values* stride, char_allocator_); \ | ||
ValuePage& value_page{part.value_pages.back()}; \ | ||
/*HCTR_LOG_C(DEBUG, WORLD, "insert value_page: num_values ", num_values, "; stride ", stride, "; value_page ", value_page.capacity(), ".\n"); \*/ | ||
\ | ||
/* Stock up slot references. */ \ | ||
part.value_slots.reserve(part.value_slots.size() + num_values); \ | ||
|
@@ -165,9 +172,16 @@ namespace HugeCTR { | |
payload.value = part.value_slots.back(); \ | ||
part.value_slots.pop_back(); \ | ||
++num_inserts; \ | ||
lock.unlock(); \ | ||
} \ | ||
\ | ||
if (payload.lck.load(std::memory_order_relaxed) != -1) { \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like it could work. It is a little bit fuzzy. Can you clarify the intended lifecycle of |
||
int8_t expected = 0; \ | ||
while (!payload.lck.compare_exchange_weak(expected, 1, std::memory_order_release, \ | ||
std::memory_order_relaxed)); \ | ||
} \ | ||
std::copy_n(&values[(k - keys) * value_stride], value_size, payload.value); \ | ||
payload.lck.store(0, std::memory_order_relaxed); \ | ||
} while (0) | ||
|
||
/** | ||
|
@@ -198,4 +212,4 @@ namespace HugeCTR { | |
// TODO: Remove me! | ||
#pragma GCC diagnostic pop | ||
|
||
} // namespace HugeCTR | ||
} // namespace HugeCTR |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,7 @@ HashMapBackend<Key>::HashMapBackend(const HashMapBackendParams& params) : Base(p | |
|
||
template <typename Key> | ||
size_t HashMapBackend<Key>::size(const std::string& table_name) const { | ||
const std::shared_lock lock(read_write_guard_); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes below are fine under the assumption that we takeover the above changes. |
||
// const std::shared_lock lock(read_write_guard_); | ||
|
||
// Locate the partitions. | ||
const auto& tables_it{tables_.find(table_name)}; | ||
|
@@ -54,7 +54,7 @@ size_t HashMapBackend<Key>::contains(const std::string& table_name, const size_t | |
const Key* const keys, | ||
const std::chrono::nanoseconds& time_budget) const { | ||
const auto begin{std::chrono::high_resolution_clock::now()}; | ||
const std::shared_lock lock(read_write_guard_); | ||
// const std::shared_lock lock(read_write_guard_); | ||
|
||
// Locate partitions. | ||
const auto& tables_it{tables_.find(table_name)}; | ||
|
@@ -132,7 +132,7 @@ size_t HashMapBackend<Key>::insert(const std::string& table_name, const size_t n | |
const uint32_t value_size, const size_t value_stride) { | ||
HCTR_CHECK(value_size <= value_stride); | ||
|
||
const std::unique_lock lock(read_write_guard_); | ||
// const std::unique_lock lock(read_write_guard_); | ||
|
||
// Locate the partitions, or create them, if they do not exist yet. | ||
const auto& tables_it{tables_.try_emplace(table_name).first}; | ||
|
@@ -222,7 +222,7 @@ size_t HashMapBackend<Key>::fetch(const std::string& table_name, const size_t nu | |
const size_t value_stride, const DatabaseMissCallback& on_miss, | ||
const std::chrono::nanoseconds& time_budget) { | ||
const auto begin{std::chrono::high_resolution_clock::now()}; | ||
const std::shared_lock lock(read_write_guard_); | ||
// const std::shared_lock lock(read_write_guard_); | ||
|
||
// Locate the partitions. | ||
const auto& tables_it{tables_.find(table_name)}; | ||
|
@@ -306,7 +306,7 @@ size_t HashMapBackend<Key>::fetch(const std::string& table_name, const size_t nu | |
const DatabaseMissCallback& on_miss, | ||
const std::chrono::nanoseconds& time_budget) { | ||
const auto begin{std::chrono::high_resolution_clock::now()}; | ||
const std::shared_lock lock(read_write_guard_); | ||
// const std::shared_lock lock(read_write_guard_); | ||
|
||
// Locate the partitions. | ||
const auto& tables_it{tables_.find(table_name)}; | ||
|
@@ -386,7 +386,7 @@ size_t HashMapBackend<Key>::fetch(const std::string& table_name, const size_t nu | |
|
||
template <typename Key> | ||
size_t HashMapBackend<Key>::evict(const std::string& table_name) { | ||
const std::unique_lock lock(read_write_guard_); | ||
// const std::unique_lock lock(read_write_guard_); | ||
|
||
// Locate the partitions. | ||
const auto& tables_it{tables_.find(table_name)}; | ||
|
@@ -410,7 +410,7 @@ size_t HashMapBackend<Key>::evict(const std::string& table_name) { | |
template <typename Key> | ||
size_t HashMapBackend<Key>::evict(const std::string& table_name, const size_t num_keys, | ||
const Key* const keys) { | ||
const std::unique_lock lock(read_write_guard_); | ||
// const std::unique_lock lock(read_write_guard_); | ||
|
||
// Locate the partitions. | ||
const auto& tables_it{tables_.find(table_name)}; | ||
|
@@ -476,7 +476,7 @@ template <typename Key> | |
std::vector<std::string> HashMapBackend<Key>::find_tables(const std::string& model_name) { | ||
const std::string& tag_prefix{HierParameterServerBase::make_tag_name(model_name, "", false)}; | ||
|
||
const std::shared_lock lock(read_write_guard_); | ||
// const std::shared_lock lock(read_write_guard_); | ||
|
||
std::vector<std::string> matches; | ||
for (const auto& pair : tables_) { | ||
|
@@ -489,7 +489,7 @@ std::vector<std::string> HashMapBackend<Key>::find_tables(const std::string& mod | |
|
||
template <typename Key> | ||
size_t HashMapBackend<Key>::dump_bin(const std::string& table_name, std::ofstream& file) { | ||
const std::shared_lock lock(read_write_guard_); | ||
// const std::shared_lock lock(read_write_guard_); | ||
|
||
// Locate the partitions. | ||
const auto& tables_it{tables_.find(table_name)}; | ||
|
@@ -519,7 +519,7 @@ size_t HashMapBackend<Key>::dump_bin(const std::string& table_name, std::ofstrea | |
#ifdef HCTR_USE_ROCKS_DB | ||
template <typename Key> | ||
size_t HashMapBackend<Key>::dump_sst(const std::string& table_name, rocksdb::SstFileWriter& file) { | ||
const std::shared_lock lock(read_write_guard_); | ||
// const std::shared_lock lock(read_write_guard_); | ||
|
||
// Locate the partitions. | ||
const auto& tables_it{tables_.find(table_name)}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically speaking I have nothing against this change in general. But need to carefully think that this will not break the MultiProcessHashmap. Maybe it is better to splice the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is more relevant is that despite you using
int_8
here, the memory usage should still increase by 8 here due to alignment. Can we un-align the data structure without loss of performance?