Skip to content

Commit

Permalink
make internal environment utilities more streamlined and consistent
Browse files Browse the repository at this point in the history
accommodate clang-12 quirk
  • Loading branch information
ericniebler committed Jan 20, 2024
1 parent 96eb561 commit c3e1d76
Show file tree
Hide file tree
Showing 30 changed files with 487 additions and 411 deletions.
5 changes: 4 additions & 1 deletion examples/benchmark/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ statistics_all compute_perf(
struct numa_deleter {
std::size_t size_;
exec::numa_allocator<char> allocator_;

void operator()(char* ptr) noexcept {
allocator_.deallocate(ptr, size_);
}
Expand Down Expand Up @@ -120,7 +121,9 @@ void my_main(int argc, char** argv, exec::numa_policy* policy = exec::get_numa_p
std::size_t buffer_size = 2000 << 20;
for (std::size_t i = 0; i < static_cast<std::size_t>(nthreads); ++i) {
exec::numa_allocator<char> alloc(policy->thread_index_to_node(i));
buffers.push_back(std::unique_ptr<char, numa_deleter>{alloc.allocate(buffer_size), numa_deleter{buffer_size, alloc}});
buffers.push_back(std::unique_ptr<char, numa_deleter>{
alloc.allocate(buffer_size), numa_deleter{buffer_size, alloc}
});
}
#endif
for (std::size_t i = 0; i < static_cast<std::size_t>(nthreads); ++i) {
Expand Down
8 changes: 4 additions & 4 deletions examples/benchmark/fibonacci.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,24 @@ struct fib_s {

friend void tag_invoke(stdexec::start_t, operation& self) noexcept {
if (self.n < self.cutoff) {
stdexec::set_value((Receiver &&) self.rcvr_, serial_fib(self.n));
stdexec::set_value((Receiver&&) self.rcvr_, serial_fib(self.n));
} else {
auto mkchild = [&](long n) {
return stdexec::on(self.sched, fib_sender(fib_s{self.cutoff, n, self.sched}));
};

stdexec::start_detached(
stdexec::when_all(mkchild(self.n - 1), mkchild(self.n - 2))
| stdexec::then([rcvr = (Receiver &&) self.rcvr_](long a, long b) {
stdexec::set_value((Receiver &&) rcvr, a + b);
| stdexec::then([rcvr = (Receiver&&) self.rcvr_](long a, long b) {
stdexec::set_value((Receiver&&) rcvr, a + b);
}));
}
}
};

template <stdexec::receiver_of<completion_signatures> Receiver>
friend operation<Receiver> tag_invoke(stdexec::connect_t, fib_s self, Receiver rcvr) {
return {(Receiver &&) rcvr, self.cutoff, self.n, self.sched};
return {(Receiver&&) rcvr, self.cutoff, self.n, self.sched};
}
};

Expand Down
20 changes: 9 additions & 11 deletions examples/benchmark/static_thread_pool_old.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ namespace exec_old {

template <stdexec::sender Sender, std::integral Shape, class Fun>
using bulk_sender_t = //
bulk_sender<
stdexec::__id<stdexec::__decay_t<Sender>>,
Shape,
Fun>;
bulk_sender< stdexec::__id<stdexec::__decay_t<Sender>>, Shape, Fun>;

#if STDEXEC_MSVC()
// MSVCBUG https://developercommunity.visualstudio.com/t/Alias-template-with-pack-expansion-in-no/10437850
Expand Down Expand Up @@ -472,16 +469,12 @@ namespace exec_old {

template <class Self, class Receiver>
using bulk_op_state_t = //
bulk_op_state<
stdexec::__cvref_id<Self, Sender>,
stdexec::__id<Receiver>,
Shape,
Fun>;
bulk_op_state< stdexec::__cvref_id<Self, Sender>, stdexec::__id<Receiver>, Shape, Fun>;

template <stdexec::__decays_to<bulk_sender> Self, stdexec::receiver Receiver>
requires stdexec::
receiver_of<Receiver, completion_signatures<Self, stdexec::env_of_t<Receiver>>>
friend bulk_op_state_t<Self, Receiver> //
friend bulk_op_state_t<Self, Receiver> //
tag_invoke(stdexec::connect_t, Self&& self, Receiver rcvr) //
noexcept(stdexec::__nothrow_constructible_from<
bulk_op_state_t<Self, Receiver>,
Expand Down Expand Up @@ -685,7 +678,12 @@ namespace exec_old {
stdexec::start(op.inner_op_);
}

bulk_op_state(static_thread_pool& pool, Shape shape, Fun fn, CvrefSender&& sender, Receiver receiver)
bulk_op_state(
static_thread_pool& pool,
Shape shape,
Fun fn,
CvrefSender&& sender,
Receiver receiver)
: shared_state_(pool, (Receiver&&) receiver, shape, fn)
, inner_op_{stdexec::connect((CvrefSender&&) sender, bulk_rcvr{shared_state_})} {
}
Expand Down
2 changes: 1 addition & 1 deletion include/exec/__detail/__bwos_lifo_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ namespace exec::bwos {

template <class Tp, class Allocator>
lifo_queue<Tp, Allocator>::block_type::block_type(const block_type &other)
: ring_buffer_(other.ring_buffer_) {
: ring_buffer_(other.ring_buffer_) {
head_.store(other.head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
tail_.store(other.tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_tail_.store(other.steal_tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
Expand Down
71 changes: 48 additions & 23 deletions include/exec/__detail/__numa.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,32 @@ namespace exec {
class no_numa_policy : public numa_policy {
public:
no_numa_policy() noexcept = default;
std::size_t num_nodes() override { return 1; }
std::size_t num_cpus(int node) override { return std::thread::hardware_concurrency(); }
int bind_to_node(int node) override { return 0; }
int thread_index_to_node(std::size_t index) override { return 0; }

std::size_t num_nodes() override {
return 1;
}

std::size_t num_cpus(int node) override {
return std::thread::hardware_concurrency();
}

int bind_to_node(int node) override {
return 0;
}

int thread_index_to_node(std::size_t index) override {
return 0;
}
};
}

#if STDEXEC_ENABLE_NUMA
#include <numa.h>

namespace exec {
struct default_numa_policy : numa_policy {
default_numa_policy() : node_to_thread_index_(::numa_num_task_nodes()) {
default_numa_policy()
: node_to_thread_index_(::numa_num_task_nodes()) {
std::size_t total_cpus = 0;
std::size_t n_nodes = num_nodes();
for (std::size_t node = 0; node < n_nodes; ++node) {
Expand All @@ -55,14 +69,18 @@ namespace exec {
}
}

std::size_t num_nodes() override { return node_to_thread_index_.size(); }

std::size_t num_cpus(int node) override {
std::size_t num_nodes() override {
return node_to_thread_index_.size();
}

std::size_t num_cpus(int node) override {
struct ::bitmask* cpus = ::numa_allocate_cpumask();
if (!cpus) {
return 0;
}
scope_guard sg{[&]() noexcept { ::numa_free_cpumask(cpus); }};
scope_guard sg{[&]() noexcept {
::numa_free_cpumask(cpus);
}};
int rc = ::numa_node_to_cpus(node, cpus);
if (rc < 0) {
return 0;
Expand All @@ -71,12 +89,14 @@ namespace exec {
return num_cpus;
}

int bind_to_node(int node) override {
int bind_to_node(int node) override {
struct ::bitmask* nodes = ::numa_allocate_nodemask();
if (!nodes) {
return -1;
}
scope_guard sg{[&]() noexcept { ::numa_free_nodemask(nodes); }};
scope_guard sg{[&]() noexcept {
::numa_free_nodemask(nodes);
}};
::numa_bitmask_setbit(nodes, node);
::numa_bind(nodes);
return 0;
Expand Down Expand Up @@ -107,10 +127,14 @@ namespace exec {
using const_pointer = const T*;
using value_type = T;

explicit numa_allocator(int node) noexcept : node_(node) {}
explicit numa_allocator(int node) noexcept
: node_(node) {
}

template <class U>
explicit numa_allocator(const numa_allocator<U>& other) noexcept : node_(other.node_) {}
explicit numa_allocator(const numa_allocator<U>& other) noexcept
: node_(other.node_) {
}

int node_;

Expand Down Expand Up @@ -139,12 +163,11 @@ namespace exec {
::copy_bitmask_to_nodemask(::numa_all_nodes_ptr, &mask.mask_);
return mask;
}


public:
nodemask() noexcept
: mask_{}
{

public:
nodemask() noexcept
: mask_{} {
::copy_bitmask_to_nodemask(::numa_no_nodes_ptr, &mask_);
}

Expand Down Expand Up @@ -184,7 +207,7 @@ namespace exec {
return ::numa_bitmask_equal(&lhs_mask, &rhs_mask);
}

private:
private:
::nodemask_t mask_;
};
}
Expand All @@ -203,10 +226,12 @@ namespace exec {
using const_pointer = const T*;
using value_type = T;

explicit numa_allocator(int) noexcept {}
explicit numa_allocator(int) noexcept {
}

template <class U>
explicit numa_allocator(const numa_allocator<U>&) noexcept {}
explicit numa_allocator(const numa_allocator<U>&) noexcept {
}

T* allocate(std::size_t n) {
std::allocator<T> alloc{};
Expand All @@ -228,7 +253,7 @@ namespace exec {
return mask;
}

public:
public:
nodemask() noexcept = default;

static const nodemask& any() noexcept {
Expand All @@ -248,7 +273,7 @@ namespace exec {
return lhs.mask_ == rhs.mask_;
}

private:
private:
bool mask_{false};
};
}
Expand Down
8 changes: 4 additions & 4 deletions include/exec/any_sender_of.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ namespace exec {
};

template <class _Env>
using __env_t = __make_env_t<_Env, __with<get_stop_token_t, in_place_stop_token>>;
using __env_t = __env::__join_t<__env::__with<in_place_stop_token, get_stop_token_t>, _Env>;

template <class _ReceiverId>
struct __stoppable_receiver {
Expand Down Expand Up @@ -828,9 +828,9 @@ namespace exec {

template <same_as<get_env_t> _GetEnv, same_as<__t> _Self>
friend __env_t<env_of_t<_Receiver>> tag_invoke(_GetEnv, const _Self& __self) noexcept {
return __make_env(
get_env(__self.__op_->__rcvr_),
__mkprop(__self.__op_->__stop_source_.get_token(), get_stop_token));
return __env::__join(
__env::__with(__self.__op_->__stop_source_.get_token(), get_stop_token),
get_env(__self.__op_->__rcvr_));
}
};
};
Expand Down
Loading

0 comments on commit c3e1d76

Please sign in to comment.