文章标题 原创 翻译 转载 文章内容 ## std::thread::hardware_concurrency() 返回硬件线程上下文的数量,通常是CPU内核数量 ## template <class T> numeric_limits 根据当前平台,获取指定类型的信息 > std::numeric_limits<int>::min() 获取int最小值 > std::numeric_limits<unsigned long>::max() 获取unsigned long最大值 ## std::unique_ptr 管理一个指针对象,在出作用域后自动销毁对象,而且它更安全不允许直接通过赋值来修改它 ## std::upper_bound std::upper_bound (v.begin(), v.end(), 20) 返回最后一个比20大的迭代器,没找到返回v.end() ## std::lower_bound std::lower_bound(v.begin(), v.end(), 20) 返回第一个等于20的迭代器,没找到返回v.end() ## std::unique_lock<std::mutex> 构造的时候加锁,析构的时候解锁,保证加解锁是成对的不会漏掉 源码如下,做了些注释: runnable.h ``` #ifndef RUNNABLE_H #define RUNNABLE_H // 可运行的任务,默认支持自动销毁,通过重载run来实现要执行的任务 class Runnable { public: Runnable(void) : m_ref(0) {} virtual ~Runnable(void) {} virtual void run(void) = 0; bool autoDelete(void) const { return this->m_ref != -1; } void setAutoDelete(bool v) { this->m_ref = v ? 0 : -1; } private: int m_ref; friend class ThreadPool; friend class ThreadPoolPrivate; friend class ThreadPoolThread; }; #endif // RUNNABLE_H ``` threadpoolthread.h ``` #ifndef THREADPOOLTHREAD_H #define THREADPOOLTHREAD_H #include <condition_variable> class Runnable; class ThreadPoolPrivate; namespace std { class thread; } class ThreadPoolThread { public: ThreadPoolThread(ThreadPoolPrivate* manager); // 线程启动后立即调用 void operator()(void); void registerThreadInactive(void); std::condition_variable runnableReady; ThreadPoolPrivate* manager; Runnable* runnable; std::thread* thread; }; #endif // THREADPOOLTHREAD_H ``` threadpoolthread.cpp ``` #include <mutex> #include <thread> #include "threadpoolthread.h" #include "threadpool_p.h" #include "runnable.h" ThreadPoolThread::ThreadPoolThread(ThreadPoolPrivate* manager) : manager(manager), runnable(nullptr), thread(nullptr) { } void ThreadPoolThread::operator()(void) { std::unique_lock<std::mutex> locker(this->manager->mutex); while (true) { Runnable* r = this->runnable; this->runnable = nullptr; do { if (r) { const bool auto_delete = r->autoDelete(); locker.unlock(); r->run(); locker.lock(); if (auto_delete && !--r->m_ref) { delete r; } } if (this->manager->tooManyThreadsActive()) { break; } if (this->manager->queue.empty()) { r = nullptr; } else { r = this->manager->queue.front().first; this->manager->queue.pop_front(); } } while (r); if (this->manager->isExiting) { this->registerThreadInactive(); break; } bool expired = this->manager->tooManyThreadsActive(); if (!expired) { this->manager->waitingThreads.push_back(this); this->registerThreadInactive(); this->runnableReady.wait_for(locker, std::chrono::milliseconds(manager->expiryTimeout)); ++manager->activeThreads; for (auto it = this->manager->waitingThreads.begin(); it != this->manager->waitingThreads.end(); ++it) { if (*it == this) { this->manager->waitingThreads.erase(it); expired = true; break; } } } if (expired) { this->manager->expiredThreads.push_back(this); this->registerThreadInactive(); break; } } } void ThreadPoolThread::registerThreadInactive(void) { if (--this->manager->activeThreads == 0) { this->manager->noActiveThreads.notify_all(); } } ``` threadpool_p.h ``` #ifndef THREADPOOL_P_H #define THREADPOOL_P_H #include <condition_variable> #include <list> #include <mutex> #include <set> #include <utility> class Runnable; class ThreadPoolThread; // 线程池内部接口,所有的成员变量都封装在这里 class ThreadPoolPrivate { public: ThreadPoolPrivate(void); // 试着去执行一个任务,如果激活线程已经满了返回false bool tryStart(Runnable* runnable); // 将新任务插入到优先级相同的任务后面 void enqueueTask(Runnable* runnable, int priority = 0); std::size_t activeThreadCount(void) const; // 使用更多的线程执行更多的任务 void tryToStartMoreThreads(void); bool tooManyThreadsActive(void) const; void startThread(Runnable* runnable = nullptr); void reset(void); bool waitForDone(unsigned long int msecs); void clear(void); // 偷窃一个任务(删除一个任务) bool stealRunnable(Runnable* runnable); // 偷窃一个任务(删除一个任务)并运行此任务 void stealAndRunRunnable(Runnable* runnable); mutable std::mutex mutex; std::set<ThreadPoolThread*> allThreads; std::list<ThreadPoolThread*> waitingThreads; std::list<ThreadPoolThread*> expiredThreads; std::list<std::pair<Runnable*, int> > queue; std::condition_variable noActiveThreads; bool isExiting; unsigned long int expiryTimeout; std::size_t maxThreadCount; std::size_t reservedThreads; std::size_t activeThreads; }; #endif // THREADPOOL_P_H ``` threadpool_p.cpp ``` #include <algorithm> #include <chrono> #include <thread> #include <memory> #include "threadpool_p.h" #include "threadpoolthread.h" #include "runnable.h" ThreadPoolPrivate::ThreadPoolPrivate(void) : isExiting(false), expiryTimeout(30000), maxThreadCount(std::max(std::thread::hardware_concurrency(), 1u)), reservedThreads(0), activeThreads(0) { } bool ThreadPoolPrivate::tryStart(Runnable* task) { if (this->allThreads.empty()) { this->startThread(task); return true; } if (this->activeThreadCount() >= this->maxThreadCount) { return false; } if (this->waitingThreads.size()) { this->enqueueTask(task); ThreadPoolThread* t = this->waitingThreads.front(); this->waitingThreads.pop_front(); t->runnableReady.notify_one(); return true; } if (this->expiredThreads.size()) { ThreadPoolThread* t = this->expiredThreads.front(); this->expiredThreads.pop_front(); ++this->activeThreads; if (task->autoDelete()) { ++task->m_ref; } t->runnable = task; t->thread->join(); t->thread = std::move(new std::thread(&ThreadPoolThread::operator(), t)); return true; } this->startThread(task); return true; } // 重载操作符,供upper_bound使用 inline bool operator<(int priority, const std::pair<Runnable*, int>& p) { return p.second < priority; } inline bool operator<(const std::pair<Runnable*, int>&p, int priority) { return priority < p.second; } void ThreadPoolPrivate::enqueueTask(Runnable* runnable, int priority) { if (runnable->autoDelete()) { ++runnable->m_ref; } auto it = std::upper_bound(this->queue.begin(), this->queue.end(), priority); this->queue.insert(it, std::make_pair(runnable, priority)); } std::size_t ThreadPoolPrivate::activeThreadCount(void) const { return this->allThreads.size() - this->waitingThreads.size() - this->expiredThreads.size() + this->reservedThreads ; } void ThreadPoolPrivate::tryToStartMoreThreads(void) { while (!this->queue.empty() && this->tryStart(this->queue.front().first)) { this->queue.pop_front(); } } bool ThreadPoolPrivate::tooManyThreadsActive(void) const { const std::size_t activeThreadCount = this->activeThreadCount(); return activeThreadCount > this->maxThreadCount && (activeThreadCount - this->reservedThreads) > 1; } void ThreadPoolPrivate::startThread(Runnable* runnable) { std::unique_ptr<ThreadPoolThread> thread(new ThreadPoolThread(this)); this->allThreads.insert(thread.get()); ++this->activeThreads; if (runnable->autoDelete()) { ++runnable->m_ref; } thread->runnable = runnable; thread->thread = new std::thread(&ThreadPoolThread::operator(), thread.get()); thread.release(); } void ThreadPoolPrivate::reset(void) { std::unique_lock<std::mutex> locker(this->mutex); this->isExiting = true; while (!this->allThreads.empty()) { std::set<ThreadPoolThread*> allThreadsCopy; allThreadsCopy.swap(this->allThreads); locker.unlock(); for (auto it = allThreadsCopy.begin(); it != allThreadsCopy.end(); ++it) { (*it)->runnableReady.notify_all(); (*it)->thread->join(); delete (*it)->thread; delete (*it); } locker.lock(); } this->waitingThreads.clear(); this->expiredThreads.clear(); isExiting = false; } bool ThreadPoolPrivate::waitForDone(unsigned long int msecs) { std::unique_lock<std::mutex> locker(this->mutex); if (msecs == std::numeric_limits<unsigned long int>::max()) { while (!(this->queue.empty() && this->activeThreads == 0)) { this->noActiveThreads.wait(locker); } } else { auto start = std::chrono::steady_clock::now(); auto till = start + std::chrono::milliseconds(msecs); while ( !(this->queue.empty() && this->activeThreads == 0) && std::chrono::steady_clock::now() < till ) { this->noActiveThreads.wait_until(locker, till); } } return this->queue.empty() && !this->activeThreads; } void ThreadPoolPrivate::clear(void) { std::unique_lock<std::mutex> locker(this->mutex); while (!this->queue.empty()) { std::pair<Runnable*, int>& item = this->queue.front(); Runnable* r = item.first; if (r->autoDelete() && --r->m_ref) { delete r; } this->queue.pop_front(); } } bool ThreadPoolPrivate::stealRunnable(Runnable* runnable) { if (!runnable) { return false; } std::unique_lock<std::mutex> locker(this->mutex); auto it = this->queue.begin(); auto end = this->queue.end(); while (it != end) { if (it->first == runnable) { this->queue.erase(it); return true; } ++it; } return false; } void ThreadPoolPrivate::stealAndRunRunnable(Runnable* runnable) { if (!this->stealRunnable(runnable)) { return; } const bool autoDelete = runnable->autoDelete(); bool del = autoDelete && !--runnable->m_ref; runnable->run(); if (del) { delete runnable; } } ``` threadpool.h ``` #ifndef THREADPOOL_H #define THREADPOOL_H #include <limits> #include <memory> #include <vector> class Runnable; class ThreadPoolPrivate; // 线程池,外部接口供用户使用 class ThreadPool { public: ThreadPool(void); ~ThreadPool(void); // 启动一个任务,如果没有空闲的线程就入队 void start(Runnable* runnable, int priority = 0); // 试着去启动一个任务,如果没有空闲线程就返回false bool tryStart(Runnable* runnable); // 到期时间,等待时间 unsigned long int expiryTimeout(void) const; void setExpiryTimeout(unsigned long int v); // 最大线程数,初始化为CPU内核数 std::size_t maxThreadCount(void) const; void setMaxThreadCount(std::size_t n); // 激活的线程数 std::size_t activeThreadCount(void) const; // 任务队列大小 std::size_t queueSize(void) const; // 保留线程,暂时不使用,调用一次新增一个 void reserveThread(void); // 释放线程,将保留的线程拿出来使用,调用一次释放一次 void releaseThread(void); // 等待所有激活的线程执行完成 bool waitForDone(unsigned long int timeout = std::numeric_limits<unsigned long int>::max()); // 清理掉所有未执行的任务 void clear(void); // 删除一个任务 void cancel(Runnable* runnable); private: friend class ThreadPoolPrivate; const std::unique_ptr<ThreadPoolPrivate> d_ptr; inline ThreadPoolPrivate* d_func(void) { return this->d_ptr.get(); } inline const ThreadPoolPrivate* d_func(void) const { return this->d_ptr.get(); } }; #endif // THREADPOOL_H ``` threadpool.cpp ``` #include <mutex> #include "threadpool.h" #include "threadpool_p.h" #include "threadpoolthread.h" #include "runnable.h" ThreadPool::ThreadPool(void) : d_ptr(new ThreadPoolPrivate()) { } ThreadPool::~ThreadPool(void) { this->waitForDone(); } void ThreadPool::start(Runnable* runnable, int priority) { if (!runnable) { return; } ThreadPoolPrivate* const d = this->d_func(); std::unique_lock<std::mutex> locker(d->mutex); if (!d->tryStart(runnable)) { d->enqueueTask(runnable, priority); if (!d->waitingThreads.empty()) { ThreadPoolThread* t = d->waitingThreads.front(); d->waitingThreads.pop_front(); t->runnableReady.notify_one(); } } } bool ThreadPool::tryStart(Runnable* runnable) { if (!runnable) { return false; } ThreadPoolPrivate* const d = this->d_func(); std::unique_lock<std::mutex> locker(d->mutex); if (d->allThreads.empty() && d->activeThreadCount() >= d->maxThreadCount) { return false; } return d->tryStart(runnable); } unsigned long int ThreadPool::expiryTimeout(void) const { return this->d_func()->expiryTimeout; } void ThreadPool::setExpiryTimeout(unsigned long int v) { this->d_func()->expiryTimeout = v; } std::size_t ThreadPool::maxThreadCount(void) const { return this->d_func()->maxThreadCount; } void ThreadPool::setMaxThreadCount(std::size_t n) { ThreadPoolPrivate* const d = this->d_func(); if (d->maxThreadCount == n) { return; } d->maxThreadCount = n; d->tryToStartMoreThreads(); } std::size_t ThreadPool::activeThreadCount(void) const { const ThreadPoolPrivate* const d = this->d_func(); std::unique_lock<std::mutex> locker(d->mutex); return d->activeThreadCount(); } std::size_t ThreadPool::queueSize(void) const { return this->d_func()->queue.size(); } void ThreadPool::reserveThread(void) { ThreadPoolPrivate* const d = this->d_func(); std::unique_lock<std::mutex> locker(d->mutex); ++d->reservedThreads; } void ThreadPool::releaseThread(void) { ThreadPoolPrivate* const d = this->d_func(); std::unique_lock<std::mutex> locker(d->mutex); --d->reservedThreads; d->tryToStartMoreThreads(); } bool ThreadPool::waitForDone(unsigned long int msec) { ThreadPoolPrivate* const d = this->d_func(); bool ret = d->waitForDone(msec); if (ret) { d->reset(); } return ret; } void ThreadPool::clear(void) { this->d_func()->clear(); } void ThreadPool::cancel(Runnable* runnable) { ThreadPoolPrivate* const d = this->d_func(); if (!d->stealRunnable(runnable)) { return; } if (runnable->autoDelete() && !--runnable->m_ref) { delete runnable; } } ``` main.cpp ``` #include <iostream> #include <thread> #include <mutex> #include "threadpool.h" #include "runnable.h" std::mutex s_mutex; class Task : public Runnable { public: Task(int i) : i_(i) { } void run() { std::unique_lock<std::mutex> lock(s_mutex); //std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::cout << "number:" << i_ << ",thread id:" << std::this_thread::get_id() << std::endl; } private: int i_; }; int _tmain(int argc, _TCHAR* argv[]) { std::unique_ptr<ThreadPool> tp(new ThreadPool()); for (int i = 0; i < 100; i++) { tp->start(new Task(i)); } system("pause"); return 0; } ``` 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交