-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadpool.h
72 lines (65 loc) · 1.86 KB
/
threadpool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
#include "types.h"
class ThreadPool
{
public:
DISALLOW_COPY_AND_ASSIGN(ThreadPool);
ThreadPool(unsigned size) :
stop { false } {
for(size_t ii = 0; ii < size; ++ii) {
threads.emplace_back
(
[&]() {
for(;;) {
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&]{ return stop || !tasks.empty(); });
if(stop && tasks.empty()) { return; }
auto task = std::move(tasks.front()); tasks.pop();
lck.unlock();
task();
}
}
);
}
}
template <typename Func, typename ... Arg>
auto execute(Func&& func, Arg&& ... arg) {
using Ret = typename std::result_of<Func(Arg...)>::type;
auto task = std::make_shared<std::packaged_task<Ret()>> (
std::bind(std::forward<Func>(func), std::forward<Arg>(arg)...)
);
std::future<Ret> fut = task->get_future();
std::unique_lock<std::mutex> lck(mtx);
assert(!stop);
tasks.emplace([task]() { (*task)(); });
cv.notify_one();
return fut;
}
~ThreadPool() noexcept {
stop = true;
cv.notify_all();
for(auto& thr : threads) {
assert(thr.joinable());
thr.join();
}
}
private:
std::atomic<bool> stop;
std::mutex mtx;
std::condition_variable cv;
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
};
#endif // _THREADPOOL_H_