-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathThreadPool.cpp
More file actions
117 lines (95 loc) · 3.21 KB
/
ThreadPool.cpp
File metadata and controls
117 lines (95 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include "ThreadPool.hpp"
#include <iterator>
#include <algorithm>
namespace dbr
{
namespace cc
{
ThreadPool::ThreadPool(std::size_t threadCount)
: threadsWaiting(0),
terminate(false),
paused(false)
{
if (threadCount==0)
threadCount = std::thread::hardware_concurrency();
// prevent potential reallocation, thereby screwing up all our hopes and dreams
threads.reserve(threadCount);
std::generate_n(std::back_inserter(threads), threadCount, [this]() { return std::thread{ threadTask, this }; });
}
ThreadPool::~ThreadPool()
{
clear();
// tell threads to stop when they can
terminate = true;
jobsAvailable.notify_all();
// wait for all threads to finish
for (auto& t : threads)
{
if (t.joinable())
t.join();
}
}
std::size_t ThreadPool::threadCount() const
{
return threads.size();
}
std::size_t ThreadPool::waitingJobs() const
{
std::lock_guard<std::mutex> jobLock(jobsMutex);
return jobs.size();
}
ThreadPool::Ids ThreadPool::ids() const
{
Ids ret(threads.size());
std::transform(threads.begin(), threads.end(), ret.begin(), [](auto& t) { return t.get_id(); });
return ret;
}
void ThreadPool::clear()
{
std::lock_guard<std::mutex> lock{ jobsMutex };
while (!jobs.empty())
jobs.pop();
}
void ThreadPool::pause(bool state)
{
paused = state;
if (!paused)
jobsAvailable.notify_all();
}
void ThreadPool::wait()
{
// we're done waiting once all threads are waiting
while (threadsWaiting != threads.size());
}
void ThreadPool::threadTask(ThreadPool* pool)
{
// loop until we break (to keep thread alive)
while (true)
{
// if we need to finish, let's do it before we get into
// all the expensive synchronization stuff
if (pool->terminate)
break;
std::unique_lock<std::mutex> jobsLock{ pool->jobsMutex };
// if there are no more jobs, or we're paused, go into waiting mode
if (pool->jobs.empty() || pool->paused)
{
++pool->threadsWaiting;
pool->jobsAvailable.wait(jobsLock, [&]()
{
return pool->terminate || !(pool->jobs.empty() || pool->paused);
});
--pool->threadsWaiting;
}
// check once more before grabbing a job, since we want to stop ASAP
if (pool->terminate)
break;
// take next job
auto job = std::move(pool->jobs.front());
pool->jobs.pop();
jobsLock.unlock();
job();
}
}
}
}