std::thread::hardware_concurrency()
返回硬件线程上下文的数量,通常是CPU内核数量
template numeric_limits
根据当前平台,获取指定类型的信息
std::numeric_limits
::min() 获取int最小值 std::numeric_limits ::max() 获取unsigned long最大值
std::unique_ptr
管理一个指针对象,在出作用域后自动销毁对象,而且它更安全不允许直接通过赋值来修改它
std::upper_bound
std::upper_bound (v.begin(), v.end(), 20)
返回最后一个比20大的迭代器,没找到返回v.end()
std::lower_bound
std::lower_bound(v.begin(), v.end(), 20)
返回第一个等于20的迭代器,没找到返回v.end()
std::unique_lockstd::mutex
构造的时候加锁,析构的时候解锁,保证加解锁是成对的不会漏掉
源码如下,做了些注释: runnable.h
#ifndef RUNNABLE_H
#define RUNNABLE_H
// 可运行的任务,默认支持自动销毁,通过重载run来实现要执行的任务
class Runnable {
public:
Runnable(void) : m_ref(0) {}
virtual ~Runnable(void) {}
virtual void run(void) = 0;
bool autoDelete(void) const { return this->m_ref != -1; }
void setAutoDelete(bool v) { this->m_ref = v ? 0 : -1; }
private:
int m_ref;
friend class ThreadPool;
friend class ThreadPoolPrivate;
friend class ThreadPoolThread;
};
#endif // RUNNABLE_H
threadpoolthread.h
#ifndef THREADPOOLTHREAD_H
#define THREADPOOLTHREAD_H
#include <condition_variable>
class Runnable;
class ThreadPoolPrivate;
namespace std {
class thread;
}
class ThreadPoolThread {
public:
ThreadPoolThread(ThreadPoolPrivate* manager);
// 线程启动后立即调用
void operator()(void);
void registerThreadInactive(void);
std::condition_variable runnableReady;
ThreadPoolPrivate* manager;
Runnable* runnable;
std::thread* thread;
};
#endif // THREADPOOLTHREAD_H
threadpoolthread.cpp
#include <mutex>
#include <thread>
#include "threadpoolthread.h"
#include "threadpool_p.h"
#include "runnable.h"
ThreadPoolThread::ThreadPoolThread(ThreadPoolPrivate* manager)
: manager(manager), runnable(nullptr), thread(nullptr)
{
}
void ThreadPoolThread::operator()(void)
{
std::unique_lock<std::mutex> locker(this->manager->mutex);
while (true) {
Runnable* r = this->runnable;
this->runnable = nullptr;
do {
if (r) {
const bool auto_delete = r->autoDelete();
locker.unlock();
r->run();
locker.lock();
if (auto_delete && !--r->m_ref) {
delete r;
}
}
if (this->manager->tooManyThreadsActive()) {
break;
}
if (this->manager->queue.empty()) {
r = nullptr;
}
else {
r = this->manager->queue.front().first;
this->manager->queue.pop_front();
}
} while (r);
if (this->manager->isExiting) {
this->registerThreadInactive();
break;
}
bool expired = this->manager->tooManyThreadsActive();
if (!expired) {
this->manager->waitingThreads.push_back(this);
this->registerThreadInactive();
this->runnableReady.wait_for(locker, std::chrono::milliseconds(manager->expiryTimeout));
++manager->activeThreads;
for (auto it = this->manager->waitingThreads.begin(); it != this->manager->waitingThreads.end(); ++it) {
if (*it == this) {
this->manager->waitingThreads.erase(it);
expired = true;
break;
}
}
}
if (expired) {
this->manager->expiredThreads.push_back(this);
this->registerThreadInactive();
break;
}
}
}
void ThreadPoolThread::registerThreadInactive(void)
{
if (--this->manager->activeThreads == 0) {
this->manager->noActiveThreads.notify_all();
}
}
threadpool_p.h
#ifndef THREADPOOL_P_H
#define THREADPOOL_P_H
#include <condition_variable>
#include <list>
#include <mutex>
#include <set>
#include <utility>
class Runnable;
class ThreadPoolThread;
// 线程池内部接口,所有的成员变量都封装在这里
class ThreadPoolPrivate {
public:
ThreadPoolPrivate(void);
// 试着去执行一个任务,如果激活线程已经满了返回false
bool tryStart(Runnable* runnable);
// 将新任务插入到优先级相同的任务后面
void enqueueTask(Runnable* runnable, int priority = 0);
std::size_t activeThreadCount(void) const;
// 使用更多的线程执行更多的任务
void tryToStartMoreThreads(void);
bool tooManyThreadsActive(void) const;
void startThread(Runnable* runnable = nullptr);
void reset(void);
bool waitForDone(unsigned long int msecs);
void clear(void);
// 偷窃一个任务(删除一个任务)
bool stealRunnable(Runnable* runnable);
// 偷窃一个任务(删除一个任务)并运行此任务
void stealAndRunRunnable(Runnable* runnable);
mutable std::mutex mutex;
std::set<ThreadPoolThread*> allThreads;
std::list<ThreadPoolThread*> waitingThreads;
std::list<ThreadPoolThread*> expiredThreads;
std::list<std::pair<Runnable*, int> > queue;
std::condition_variable noActiveThreads;
bool isExiting;
unsigned long int expiryTimeout;
std::size_t maxThreadCount;
std::size_t reservedThreads;
std::size_t activeThreads;
};
#endif // THREADPOOL_P_H
threadpool_p.cpp
#include <algorithm>
#include <chrono>
#include <thread>
#include <memory>
#include "threadpool_p.h"
#include "threadpoolthread.h"
#include "runnable.h"
ThreadPoolPrivate::ThreadPoolPrivate(void)
: isExiting(false), expiryTimeout(30000),
maxThreadCount(std::max(std::thread::hardware_concurrency(), 1u)),
reservedThreads(0), activeThreads(0)
{
}
bool ThreadPoolPrivate::tryStart(Runnable* task)
{
if (this->allThreads.empty()) {
this->startThread(task);
return true;
}
if (this->activeThreadCount() >= this->maxThreadCount) {
return false;
}
if (this->waitingThreads.size()) {
this->enqueueTask(task);
ThreadPoolThread* t = this->waitingThreads.front();
this->waitingThreads.pop_front();
t->runnableReady.notify_one();
return true;
}
if (this->expiredThreads.size()) {
ThreadPoolThread* t = this->expiredThreads.front();
this->expiredThreads.pop_front();
++this->activeThreads;
if (task->autoDelete()) {
++task->m_ref;
}
t->runnable = task;
t->thread->join();
t->thread = std::move(new std::thread(&ThreadPoolThread::operator(), t));
return true;
}
this->startThread(task);
return true;
}
// 重载操作符,供upper_bound使用
inline bool operator<(int priority, const std::pair<Runnable*, int>& p)
{
return p.second < priority;
}
inline bool operator<(const std::pair<Runnable*, int>&p, int priority)
{
return priority < p.second;
}
void ThreadPoolPrivate::enqueueTask(Runnable* runnable, int priority)
{
if (runnable->autoDelete()) {
++runnable->m_ref;
}
auto it = std::upper_bound(this->queue.begin(), this->queue.end(), priority);
this->queue.insert(it, std::make_pair(runnable, priority));
}
std::size_t ThreadPoolPrivate::activeThreadCount(void) const
{
return
this->allThreads.size()
- this->waitingThreads.size()
- this->expiredThreads.size()
+ this->reservedThreads
;
}
void ThreadPoolPrivate::tryToStartMoreThreads(void)
{
while (!this->queue.empty() && this->tryStart(this->queue.front().first)) {
this->queue.pop_front();
}
}
bool ThreadPoolPrivate::tooManyThreadsActive(void) const
{
const std::size_t activeThreadCount = this->activeThreadCount();
return activeThreadCount > this->maxThreadCount && (activeThreadCount - this->reservedThreads) > 1;
}
void ThreadPoolPrivate::startThread(Runnable* runnable)
{
std::unique_ptr<ThreadPoolThread> thread(new ThreadPoolThread(this));
this->allThreads.insert(thread.get());
++this->activeThreads;
if (runnable->autoDelete()) {
++runnable->m_ref;
}
thread->runnable = runnable;
thread->thread = new std::thread(&ThreadPoolThread::operator(), thread.get());
thread.release();
}
void ThreadPoolPrivate::reset(void)
{
std::unique_lock<std::mutex> locker(this->mutex);
this->isExiting = true;
while (!this->allThreads.empty()) {
std::set<ThreadPoolThread*> allThreadsCopy;
allThreadsCopy.swap(this->allThreads);
locker.unlock();
for (auto it = allThreadsCopy.begin(); it != allThreadsCopy.end(); ++it) {
(*it)->runnableReady.notify_all();
(*it)->thread->join();
delete (*it)->thread;
delete (*it);
}
locker.lock();
}
this->waitingThreads.clear();
this->expiredThreads.clear();
isExiting = false;
}
bool ThreadPoolPrivate::waitForDone(unsigned long int msecs)
{
std::unique_lock<std::mutex> locker(this->mutex);
if (msecs == std::numeric_limits<unsigned long int>::max()) {
while (!(this->queue.empty() && this->activeThreads == 0)) {
this->noActiveThreads.wait(locker);
}
}
else {
auto start = std::chrono::steady_clock::now();
auto till = start + std::chrono::milliseconds(msecs);
while (
!(this->queue.empty() && this->activeThreads == 0)
&& std::chrono::steady_clock::now() < till
) {
this->noActiveThreads.wait_until(locker, till);
}
}
return this->queue.empty() && !this->activeThreads;
}
void ThreadPoolPrivate::clear(void)
{
std::unique_lock<std::mutex> locker(this->mutex);
while (!this->queue.empty()) {
std::pair<Runnable*, int>& item = this->queue.front();
Runnable* r = item.first;
if (r->autoDelete() && --r->m_ref) {
delete r;
}
this->queue.pop_front();
}
}
bool ThreadPoolPrivate::stealRunnable(Runnable* runnable)
{
if (!runnable) {
return false;
}
std::unique_lock<std::mutex> locker(this->mutex);
auto it = this->queue.begin();
auto end = this->queue.end();
while (it != end) {
if (it->first == runnable) {
this->queue.erase(it);
return true;
}
++it;
}
return false;
}
void ThreadPoolPrivate::stealAndRunRunnable(Runnable* runnable)
{
if (!this->stealRunnable(runnable)) {
return;
}
const bool autoDelete = runnable->autoDelete();
bool del = autoDelete && !--runnable->m_ref;
runnable->run();
if (del) {
delete runnable;
}
}
threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <limits>
#include <memory>
#include <vector>
class Runnable;
class ThreadPoolPrivate;
// 线程池,外部接口供用户使用
class ThreadPool {
public:
ThreadPool(void);
~ThreadPool(void);
// 启动一个任务,如果没有空闲的线程就入队
void start(Runnable* runnable, int priority = 0);
// 试着去启动一个任务,如果没有空闲线程就返回false
bool tryStart(Runnable* runnable);
// 到期时间,等待时间
unsigned long int expiryTimeout(void) const;
void setExpiryTimeout(unsigned long int v);
// 最大线程数,初始化为CPU内核数
std::size_t maxThreadCount(void) const;
void setMaxThreadCount(std::size_t n);
// 激活的线程数
std::size_t activeThreadCount(void) const;
// 任务队列大小
std::size_t queueSize(void) const;
// 保留线程,暂时不使用,调用一次新增一个
void reserveThread(void);
// 释放线程,将保留的线程拿出来使用,调用一次释放一次
void releaseThread(void);
// 等待所有激活的线程执行完成
bool waitForDone(unsigned long int timeout = std::numeric_limits<unsigned long int>::max());
// 清理掉所有未执行的任务
void clear(void);
// 删除一个任务
void cancel(Runnable* runnable);
private:
friend class ThreadPoolPrivate;
const std::unique_ptr<ThreadPoolPrivate> d_ptr;
inline ThreadPoolPrivate* d_func(void) { return this->d_ptr.get(); }
inline const ThreadPoolPrivate* d_func(void) const { return this->d_ptr.get(); }
};
#endif // THREADPOOL_H
threadpool.cpp
#include <mutex>
#include "threadpool.h"
#include "threadpool_p.h"
#include "threadpoolthread.h"
#include "runnable.h"
ThreadPool::ThreadPool(void)
: d_ptr(new ThreadPoolPrivate())
{
}
ThreadPool::~ThreadPool(void)
{
this->waitForDone();
}
void ThreadPool::start(Runnable* runnable, int priority)
{
if (!runnable) {
return;
}
ThreadPoolPrivate* const d = this->d_func();
std::unique_lock<std::mutex> locker(d->mutex);
if (!d->tryStart(runnable)) {
d->enqueueTask(runnable, priority);
if (!d->waitingThreads.empty()) {
ThreadPoolThread* t = d->waitingThreads.front();
d->waitingThreads.pop_front();
t->runnableReady.notify_one();
}
}
}
bool ThreadPool::tryStart(Runnable* runnable)
{
if (!runnable) {
return false;
}
ThreadPoolPrivate* const d = this->d_func();
std::unique_lock<std::mutex> locker(d->mutex);
if (d->allThreads.empty() && d->activeThreadCount() >= d->maxThreadCount) {
return false;
}
return d->tryStart(runnable);
}
unsigned long int ThreadPool::expiryTimeout(void) const
{
return this->d_func()->expiryTimeout;
}
void ThreadPool::setExpiryTimeout(unsigned long int v)
{
this->d_func()->expiryTimeout = v;
}
std::size_t ThreadPool::maxThreadCount(void) const
{
return this->d_func()->maxThreadCount;
}
void ThreadPool::setMaxThreadCount(std::size_t n)
{
ThreadPoolPrivate* const d = this->d_func();
if (d->maxThreadCount == n) {
return;
}
d->maxThreadCount = n;
d->tryToStartMoreThreads();
}
std::size_t ThreadPool::activeThreadCount(void) const
{
const ThreadPoolPrivate* const d = this->d_func();
std::unique_lock<std::mutex> locker(d->mutex);
return d->activeThreadCount();
}
std::size_t ThreadPool::queueSize(void) const
{
return this->d_func()->queue.size();
}
void ThreadPool::reserveThread(void)
{
ThreadPoolPrivate* const d = this->d_func();
std::unique_lock<std::mutex> locker(d->mutex);
++d->reservedThreads;
}
void ThreadPool::releaseThread(void)
{
ThreadPoolPrivate* const d = this->d_func();
std::unique_lock<std::mutex> locker(d->mutex);
--d->reservedThreads;
d->tryToStartMoreThreads();
}
bool ThreadPool::waitForDone(unsigned long int msec)
{
ThreadPoolPrivate* const d = this->d_func();
bool ret = d->waitForDone(msec);
if (ret) {
d->reset();
}
return ret;
}
void ThreadPool::clear(void)
{
this->d_func()->clear();
}
void ThreadPool::cancel(Runnable* runnable)
{
ThreadPoolPrivate* const d = this->d_func();
if (!d->stealRunnable(runnable)) {
return;
}
if (runnable->autoDelete() && !--runnable->m_ref) {
delete runnable;
}
}
main.cpp
#include <iostream>
#include <thread>
#include <mutex>
#include "threadpool.h"
#include "runnable.h"
std::mutex s_mutex;
class Task : public Runnable
{
public:
Task(int i)
: i_(i)
{
}
void run()
{
std::unique_lock<std::mutex> lock(s_mutex);
//std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "number:" << i_ << ",thread id:" << std::this_thread::get_id() << std::endl;
}
private:
int i_;
};
int _tmain(int argc, _TCHAR* argv[])
{
std::unique_ptr<ThreadPool> tp(new ThreadPool());
for (int i = 0; i < 100; i++) {
tp->start(new Task(i));
}
system("pause");
return 0;
}