yuyi
C++11 实现线程池🎃
使用基本的 C++11 特性实现一个相对简单但符合良好代码规范的线程池,这个简化版本的线程池将展示基本的创建、任务分配和安全停止线程的功能。
简化版线程池实现
...
int main() {
SimpleThreadPool pool(50);
for(size_t i =1; i<=50;i++){
pool.enqueue(print_task, i);
}
// 线程池会在对象销毁时自动清理所有线程
return 0;
}
接下来将创建一个 SimpleThreadPool
的类实现线程池,并使其能使用 enqueue
分配任务不同线程。
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>
class SimpleThreadPool {
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic<bool> stop;
...
};
这个简化版本的线程池主要使用了std::thread
, std::mutex
, std::condition_variable
, 和 std::queue
,不涉及std::packaged_task
和std::future
。这样可以减少对模板和高级特性的依赖,同时仍然提供线程池的基本功能。
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] {
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
emplace_back
直接在 workers
向量的内存位置构造一个 std::thread
对象,传入 Lambda 表达式作为线程函数,避免了额外的复制或移动操作。
// 使用 push_back
std::thread worker([this] {
// Lambda 表达式,作为线程函数
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] {
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
// 将构造好的线程对象移动到 vector 中
workers.push_back(std::move(worker));
构造和移动:std::thread worker(...)
先在局部变量 worker
中构造线程对象,然后使用 std::move
将其移动到 workers
向量中。
使用 emplace_back
可以更加高效,因为它避免了临时对象的创建和随后的移动操作。尽管 push_back
和 std::move
也可以达到相同的效果,但 emplace_back 更加简洁和直接。
在创建 std::thread
对象时,传入的函数(包括 Lambda 表达式)会在新线程启动时立即执行。
主线程和新线程是并行执行的,主线程不会等待新线程完成,除非显式调用 join
方法。
在你的线程池代码中,每个工作线程被创建后,会立即执行传入的 Lambda 表达式,并在没有任务时阻塞等待。
class SimpleThreadPool {
...
public:
SimpleThreadPool(size_t num_threads) : stop(false) {
for (size_t i = 0; i < num_threads; ++i) {
// 创建线程并加入容器
workers.emplace_back(...); // 见上文
}
}
~SimpleThreadPool() {
{
// 析构时停止线程池所有线程
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
//
for (std::thread &worker : workers) {
worker.join();
}
}
}
void print_task(int n) {
std::cout << "Task " << n << " is running." << std::endl;
}
- 同步线程:
join()
会阻塞调用线程,直到被连接的线程执行完毕。 这确保主线程会等待工作线程完成其任务后再继续执行,避免了主线程过早结束导致资源未正确释放或程序行为异常的问题。 - 释放资源:
join()
会释放与线程相关的资源,包括线程栈和线程本地存储 (TLS)。 如果不调用 join(),这些资源将无法被释放,导致资源泄漏。 - 不调用
join()
的后果:
主线程过早结束: 主线程可能在工作线程完成之前结束,导致程序无法按预期运行。 - 资源泄漏: 与线程相关的资源无法被释放,占用系统资源,影响程序性能。
std::terminate()
调用: 如果程序结束时仍有未连接的线程,C++ 运行时会调用std::terminate()
函数终止程序。
template<class F, class... Args>
void enqueue(F&& f, Args&&... args) {
// 绑定传入的函数和其参数
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace(task);
}
condition.notify_one();
}
代码说明
线程管理:
- 线程在构造函数中创建,并在析构函数中安全地停止和回收。
- 使用
std::atomic<bool>
来标记线程池是否停止,保证线程状态的线程安全访问。
任务队列:
- 任务存储在
std::queue
中,使用std::mutex
保护队列以避免并发冲突。 std::condition_variable
用于线程同步,让线程在没有任务时等待。
- 任务存储在
任务提交:
- 使用
std::bind
来包装带参数的函数调用,允许将不同的任务和参数推入队列中。 - 提交任务时,如果线程池已停止,则抛出异常。
- 使用
这个简化版本的线程池易于理解,适合初学者学习多线程和线程池的基本概念,同时它确保了线程的正确创建和销毁,避免了资源泄露和其他常见的并发问题。
这段代码是线程池类中的一个成员函数,名为 enqueue
,它的作用是向线程池的任务队列中添加一个新的任务。下面详细解释每一部分:
模板参数
template<class F, class... Args>
这是一个模板函数,用来接受任意类型的可调用对象 F
和其参数 Args...
。这种设计允许 enqueue
方法接受各种不同签名的函数或者函数对象,并传递适当的参数。
函数参数
void enqueue(F&& f, Args&&... args) {
这里使用了完美转发(Perfect Forwarding),F&&
和 Args&&...
允许将实参以其原始类型(保持lvalue或rvalue特性)传递到函数内部。这是C++11中引入的一个特性,用来支持转发参数,同时避免不必要的拷贝。
任务包装
auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
在这一行中,std::bind
用于创建一个新的可调用对象,它绑定了函数 f
和它的参数 args
。std::forward<F>(f)
和 std::forward<Args>(args)...
确保函数和参数的lvalue或rvalue属性被保留,这是为了最大化效率和灵活性。生成的 task
是一个无参的可调用对象,它在被调用时会执行 f(args...)
。
任务添加到队列
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace(task);
}
这一块代码首先使用 std::unique_lock<std::mutex>
对 queue_mutex
进行加锁,以确保线程安全地修改任务队列。如果线程池已经设置为停止 (stop
为 true
),则抛出异常,因为不应该向已停止的线程池提交新任务。如果线程池未停止,新创建的任务 task
会被添加到 tasks
队列中。
通知工作线程
condition.notify_one();
这行代码用 std::condition_variable
的 notify_one
方法来唤醒一个正在等待任务的线程(如果有的话)。这意味着如果队列中有任务等待处理,并且至少有一个线程处于等待状态,则其中一个线程将被唤醒来执行这个新添加的任务。
总结
这个 enqueue
函数是线程池的核心功能之一,它不仅允许以线程安全的方式向任务队列添加新任务,而且通过条件变量的合理使用,有效管理了线程之间的工作分配。通过模板和完美转发的使用,这个函数同时也保证了高度的通用性和效率。
知识拓展std::thread worker([this] { ... });
的确是直接构造了一个std::thread
对象,并立即启动了线程。
直接构造和赋值/new 的区别:
直接构造:std::thread worker(...);
这种方式直接调用std::thread
的构造函数,创建并初始化一个std::thread
对象。
线程会在构造函数完成时立即启动。
这种方式简洁高效,不需要额外的赋值操作。
赋值:std::thread worker; worker = std::thread(...);
这种方式先默认构造一个 std::thread 对象,然后使用赋值运算符将一个新的线程对象赋值给它。
线程会在赋值运算符完成时启动。
这种方式需要进行两次操作,效率略低于直接构造。
new 关键字:std::thread *worker = new std::thread(...);
这种方式在堆上动态分配内存,并创建一个std::thread
对象。
线程会在 new 表达式完成时启动。
这种方式需要手动管理内存,并在使用完后使用 delete 运算符释放内存。
选择建议:
在大多数情况下,直接构造std::thread
对象是最简洁高效的方式。 只有在需要动态管理线程生命周期时才考虑使用 new 关键字。 避免使用赋值方式,因为它需要进行两次操作,效率较低。