Skip to content

模拟QThreadPool实现

Published: at 10:19 AM | 9 min read

std::thread::hardware_concurrency()

返回硬件线程上下文的数量,通常是CPU内核数量

template numeric_limits

根据当前平台,获取指定类型的信息

std::numeric_limits::min() 获取int最小值 std::numeric_limits::max() 获取unsigned long最大值

std::unique_ptr

管理一个指针对象,在出作用域后自动销毁对象,而且它更安全不允许直接通过赋值来修改它

std::upper_bound

std::upper_bound (v.begin(), v.end(), 20)
返回最后一个比20大的迭代器,没找到返回v.end()

std::lower_bound

std::lower_bound(v.begin(), v.end(), 20)
返回第一个等于20的迭代器,没找到返回v.end()

std::unique_lockstd::mutex

构造的时候加锁,析构的时候解锁,保证加解锁是成对的不会漏掉

源码如下,做了些注释: runnable.h

#ifndef RUNNABLE_H
#define RUNNABLE_H

// 可运行的任务,默认支持自动销毁,通过重载run来实现要执行的任务
class Runnable {
public:
	Runnable(void) : m_ref(0) {}
	virtual ~Runnable(void) {}
	virtual void run(void) = 0;

	bool autoDelete(void) const { return this->m_ref != -1; }
	void setAutoDelete(bool v) { this->m_ref = v ? 0 : -1; }

private:
	int m_ref;

	friend class ThreadPool;
	friend class ThreadPoolPrivate;
	friend class ThreadPoolThread;
};

#endif // RUNNABLE_H

threadpoolthread.h

#ifndef THREADPOOLTHREAD_H
#define THREADPOOLTHREAD_H

#include <condition_variable>

class Runnable;
class ThreadPoolPrivate;

namespace std {
	class thread;
}

class ThreadPoolThread {
public:
	ThreadPoolThread(ThreadPoolPrivate* manager);

	// 线程启动后立即调用
	void operator()(void);

	void registerThreadInactive(void);

	std::condition_variable runnableReady;
	ThreadPoolPrivate* manager;
	Runnable* runnable;
	std::thread* thread;
};

#endif // THREADPOOLTHREAD_H

threadpoolthread.cpp

#include <mutex>
#include <thread>
#include "threadpoolthread.h"
#include "threadpool_p.h"
#include "runnable.h"

ThreadPoolThread::ThreadPoolThread(ThreadPoolPrivate* manager)
	: manager(manager), runnable(nullptr), thread(nullptr)
{
}

void ThreadPoolThread::operator()(void)
{
	std::unique_lock<std::mutex> locker(this->manager->mutex);
	while (true) {
		Runnable* r    = this->runnable;
		this->runnable = nullptr;

		do {
			if (r) {
				const bool auto_delete = r->autoDelete();

				locker.unlock();
				r->run();
				locker.lock();

				if (auto_delete && !--r->m_ref) {
					delete r;
				}
			}

			if (this->manager->tooManyThreadsActive()) {
				break;
			}

			if (this->manager->queue.empty()) {
				r = nullptr;
			}
			else {
				r = this->manager->queue.front().first;
				this->manager->queue.pop_front();
			}
		} while (r);

		if (this->manager->isExiting) {
			this->registerThreadInactive();
			break;
		}

		bool expired = this->manager->tooManyThreadsActive();
		if (!expired) {
			this->manager->waitingThreads.push_back(this);
			this->registerThreadInactive();
			this->runnableReady.wait_for(locker, std::chrono::milliseconds(manager->expiryTimeout));
			++manager->activeThreads;

			for (auto it = this->manager->waitingThreads.begin(); it != this->manager->waitingThreads.end(); ++it) {
				if (*it == this) {
					this->manager->waitingThreads.erase(it);
					expired = true;
					break;
				}
			}
		}

		if (expired) {
			this->manager->expiredThreads.push_back(this);
			this->registerThreadInactive();
			break;
		}
	}
}

void ThreadPoolThread::registerThreadInactive(void)
{
	if (--this->manager->activeThreads == 0) {
		this->manager->noActiveThreads.notify_all();
	}
}

threadpool_p.h

#ifndef THREADPOOL_P_H
#define THREADPOOL_P_H

#include <condition_variable>
#include <list>
#include <mutex>
#include <set>
#include <utility>

class Runnable;
class ThreadPoolThread;

// 线程池内部接口,所有的成员变量都封装在这里
class ThreadPoolPrivate {
public:
	ThreadPoolPrivate(void);

	// 试着去执行一个任务,如果激活线程已经满了返回false
	bool tryStart(Runnable* runnable);

	// 将新任务插入到优先级相同的任务后面
	void enqueueTask(Runnable* runnable, int priority = 0);

	std::size_t activeThreadCount(void) const;

	// 使用更多的线程执行更多的任务
	void tryToStartMoreThreads(void);
	bool tooManyThreadsActive(void) const;

	void startThread(Runnable* runnable = nullptr);
	void reset(void);
	bool waitForDone(unsigned long int msecs);
	void clear(void);

	// 偷窃一个任务(删除一个任务)
	bool stealRunnable(Runnable* runnable);
	// 偷窃一个任务(删除一个任务)并运行此任务
	void stealAndRunRunnable(Runnable* runnable);

	mutable std::mutex mutex;
	std::set<ThreadPoolThread*> allThreads;
	std::list<ThreadPoolThread*> waitingThreads;
	std::list<ThreadPoolThread*> expiredThreads;
	std::list<std::pair<Runnable*, int> > queue;
	std::condition_variable noActiveThreads;

	bool isExiting;
	unsigned long int expiryTimeout;
	std::size_t maxThreadCount;
	std::size_t reservedThreads;
	std::size_t activeThreads;
};

#endif // THREADPOOL_P_H

threadpool_p.cpp

#include <algorithm>
#include <chrono>
#include <thread>
#include <memory>
#include "threadpool_p.h"
#include "threadpoolthread.h"
#include "runnable.h"

ThreadPoolPrivate::ThreadPoolPrivate(void)
	: isExiting(false), expiryTimeout(30000),
	  maxThreadCount(std::max(std::thread::hardware_concurrency(), 1u)),
	  reservedThreads(0), activeThreads(0)
{
}

bool ThreadPoolPrivate::tryStart(Runnable* task)
{
	if (this->allThreads.empty()) {
		this->startThread(task);
		return true;
	}

	if (this->activeThreadCount() >= this->maxThreadCount) {
		return false;
	}

	if (this->waitingThreads.size()) {
		this->enqueueTask(task);
		ThreadPoolThread* t = this->waitingThreads.front();
		this->waitingThreads.pop_front();
		t->runnableReady.notify_one();
		return true;
	}

	if (this->expiredThreads.size()) {
		ThreadPoolThread* t = this->expiredThreads.front();
		this->expiredThreads.pop_front();

		++this->activeThreads;

		if (task->autoDelete()) {
			++task->m_ref;
		}

		t->runnable = task;
		t->thread->join();
		t->thread = std::move(new std::thread(&ThreadPoolThread::operator(), t));
		return true;
	}

	this->startThread(task);
	return true;
}

// 重载操作符,供upper_bound使用
inline bool operator<(int priority, const std::pair<Runnable*, int>& p)
{
	return p.second < priority;
}

inline bool operator<(const std::pair<Runnable*, int>&p, int priority)
{
	return priority < p.second;
}

void ThreadPoolPrivate::enqueueTask(Runnable* runnable, int priority)
{
	if (runnable->autoDelete()) {
		 ++runnable->m_ref;
	}

	auto it = std::upper_bound(this->queue.begin(), this->queue.end(), priority);
	this->queue.insert(it, std::make_pair(runnable, priority));
}

std::size_t ThreadPoolPrivate::activeThreadCount(void) const
{
	return
		  this->allThreads.size()
		- this->waitingThreads.size()
		- this->expiredThreads.size()
		+ this->reservedThreads
	;
}

void ThreadPoolPrivate::tryToStartMoreThreads(void)
{
	while (!this->queue.empty() && this->tryStart(this->queue.front().first)) {
		this->queue.pop_front();
	}
}

bool ThreadPoolPrivate::tooManyThreadsActive(void) const
{
	const std::size_t activeThreadCount = this->activeThreadCount();
	return activeThreadCount > this->maxThreadCount && (activeThreadCount - this->reservedThreads) > 1;
}

void ThreadPoolPrivate::startThread(Runnable* runnable)
{
	std::unique_ptr<ThreadPoolThread> thread(new ThreadPoolThread(this));
	this->allThreads.insert(thread.get());
	++this->activeThreads;

	if (runnable->autoDelete()) {
		++runnable->m_ref;
	}

	thread->runnable = runnable;
	thread->thread   = new std::thread(&ThreadPoolThread::operator(), thread.get());
	thread.release();
}

void ThreadPoolPrivate::reset(void)
{
	std::unique_lock<std::mutex> locker(this->mutex);
	this->isExiting = true;

	while (!this->allThreads.empty()) {
		std::set<ThreadPoolThread*> allThreadsCopy;
		allThreadsCopy.swap(this->allThreads);
		locker.unlock();

		for (auto it = allThreadsCopy.begin(); it != allThreadsCopy.end(); ++it) {
			(*it)->runnableReady.notify_all();
			(*it)->thread->join();
			delete (*it)->thread;
			delete (*it);
		}

		locker.lock();
	}

	this->waitingThreads.clear();
	this->expiredThreads.clear();
	isExiting = false;
}

bool ThreadPoolPrivate::waitForDone(unsigned long int msecs)
{
	std::unique_lock<std::mutex> locker(this->mutex);
	if (msecs == std::numeric_limits<unsigned long int>::max()) {
		while (!(this->queue.empty() && this->activeThreads == 0)) {
			this->noActiveThreads.wait(locker);
		}
	}
	else {
		auto start = std::chrono::steady_clock::now();
		auto till  = start + std::chrono::milliseconds(msecs);
		while (
			   !(this->queue.empty() && this->activeThreads == 0)
			&& std::chrono::steady_clock::now() < till
		) {
			this->noActiveThreads.wait_until(locker, till);
		}
	}

	return this->queue.empty() && !this->activeThreads;
}

void ThreadPoolPrivate::clear(void)
{
	std::unique_lock<std::mutex> locker(this->mutex);
	while (!this->queue.empty()) {
		std::pair<Runnable*, int>& item = this->queue.front();
		Runnable* r = item.first;
		if (r->autoDelete() && --r->m_ref) {
			delete r;
		}

		this->queue.pop_front();
	}
}

bool ThreadPoolPrivate::stealRunnable(Runnable* runnable)
{
	if (!runnable) {
		return false;
	}

	std::unique_lock<std::mutex> locker(this->mutex);
	auto it  = this->queue.begin();
	auto end = this->queue.end();

	while (it != end) {
		if (it->first == runnable) {
			this->queue.erase(it);
			return true;
		}

		++it;
	}

	return false;
}

void ThreadPoolPrivate::stealAndRunRunnable(Runnable* runnable)
{
	if (!this->stealRunnable(runnable)) {
		return;
	}

	const bool autoDelete = runnable->autoDelete();
	bool del = autoDelete && !--runnable->m_ref;

	runnable->run();

	if (del) {
		delete runnable;
	}
}

threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <limits>
#include <memory>
#include <vector>

class Runnable;
class ThreadPoolPrivate;

// 线程池,外部接口供用户使用
class ThreadPool {
public:
	ThreadPool(void);
	~ThreadPool(void);

	// 启动一个任务,如果没有空闲的线程就入队
	void start(Runnable* runnable, int priority = 0);
	// 试着去启动一个任务,如果没有空闲线程就返回false
	bool tryStart(Runnable* runnable);

	// 到期时间,等待时间
	unsigned long int expiryTimeout(void) const;
	void setExpiryTimeout(unsigned long int v);

	// 最大线程数,初始化为CPU内核数
	std::size_t maxThreadCount(void) const;
	void setMaxThreadCount(std::size_t n);

	// 激活的线程数
	std::size_t activeThreadCount(void) const;

	// 任务队列大小
	std::size_t queueSize(void) const;

	// 保留线程,暂时不使用,调用一次新增一个
	void reserveThread(void);
	// 释放线程,将保留的线程拿出来使用,调用一次释放一次
	void releaseThread(void);

	// 等待所有激活的线程执行完成
	bool waitForDone(unsigned long int timeout = std::numeric_limits<unsigned long int>::max());

	// 清理掉所有未执行的任务
	void clear(void);

	// 删除一个任务
	void cancel(Runnable* runnable);

private:
	friend class ThreadPoolPrivate;

	const std::unique_ptr<ThreadPoolPrivate> d_ptr;

	inline ThreadPoolPrivate* d_func(void) { return this->d_ptr.get(); }
	inline const ThreadPoolPrivate* d_func(void) const { return this->d_ptr.get(); }
};

#endif // THREADPOOL_H

threadpool.cpp

#include <mutex>
#include "threadpool.h"
#include "threadpool_p.h"
#include "threadpoolthread.h"
#include "runnable.h"

ThreadPool::ThreadPool(void)
	: d_ptr(new ThreadPoolPrivate())
{
}

ThreadPool::~ThreadPool(void)
{
	this->waitForDone();
}

void ThreadPool::start(Runnable* runnable, int priority)
{
	if (!runnable) {
		return;
	}

	ThreadPoolPrivate* const d = this->d_func();
	std::unique_lock<std::mutex> locker(d->mutex);
	if (!d->tryStart(runnable)) {
		d->enqueueTask(runnable, priority);

		if (!d->waitingThreads.empty()) {
			ThreadPoolThread* t = d->waitingThreads.front();
			d->waitingThreads.pop_front();
			t->runnableReady.notify_one();
		}
	}
}

bool ThreadPool::tryStart(Runnable* runnable)
{
	if (!runnable) {
		return false;
	}

	ThreadPoolPrivate* const d = this->d_func();
	std::unique_lock<std::mutex> locker(d->mutex);

	if (d->allThreads.empty() && d->activeThreadCount() >= d->maxThreadCount) {
		return false;
	}

	return d->tryStart(runnable);
}

unsigned long int ThreadPool::expiryTimeout(void) const
{
	return this->d_func()->expiryTimeout;
}

void ThreadPool::setExpiryTimeout(unsigned long int v)
{
	this->d_func()->expiryTimeout = v;
}

std::size_t ThreadPool::maxThreadCount(void) const
{
	return this->d_func()->maxThreadCount;
}

void ThreadPool::setMaxThreadCount(std::size_t n)
{
	ThreadPoolPrivate* const d = this->d_func();
	if (d->maxThreadCount == n) {
		return;
	}

	d->maxThreadCount = n;
	d->tryToStartMoreThreads();
}

std::size_t ThreadPool::activeThreadCount(void) const
{
	const ThreadPoolPrivate* const d = this->d_func();
	std::unique_lock<std::mutex> locker(d->mutex);
	return d->activeThreadCount();
}

std::size_t ThreadPool::queueSize(void) const
{
	return this->d_func()->queue.size();
}

void ThreadPool::reserveThread(void)
{
	ThreadPoolPrivate* const d = this->d_func();
	std::unique_lock<std::mutex> locker(d->mutex);
	++d->reservedThreads;
}

void ThreadPool::releaseThread(void)
{
	ThreadPoolPrivate* const d = this->d_func();
	std::unique_lock<std::mutex> locker(d->mutex);
	--d->reservedThreads;
	d->tryToStartMoreThreads();
}

bool ThreadPool::waitForDone(unsigned long int msec)
{
	ThreadPoolPrivate* const d = this->d_func();
	bool ret = d->waitForDone(msec);
	if (ret) {
		d->reset();
	}

	return ret;
}

void ThreadPool::clear(void)
{
	this->d_func()->clear();
}

void ThreadPool::cancel(Runnable* runnable)
{
	ThreadPoolPrivate* const d = this->d_func();
	if (!d->stealRunnable(runnable)) {
		return;
	}

	if (runnable->autoDelete() && !--runnable->m_ref) {
		delete runnable;
	}
}

main.cpp

#include <iostream>
#include <thread>
#include <mutex>
#include "threadpool.h"
#include "runnable.h"

std::mutex s_mutex;
class Task : public Runnable
{
public:
	Task(int i)
		: i_(i)
	{
	}
	void run()
	{
		std::unique_lock<std::mutex> lock(s_mutex);
		//std::this_thread::sleep_for(std::chrono::milliseconds(100));
		std::cout << "number:" << i_ << ",thread id:" << std::this_thread::get_id() << std::endl;
	}
private:
	int i_;
};

int _tmain(int argc, _TCHAR* argv[])
{

	std::unique_ptr<ThreadPool> tp(new ThreadPool());
	for (int i = 0; i < 100; i++) {
		tp->start(new Task(i));
	}

	system("pause");
	return 0;
}