TypechoJoeTheme

鱼一的博客 ◡̈

yuyi

知不可乎骤得,托遗响于悲风
网站页面
标签搜索
c++

C++11 实现线程池🎃

使用基本的 C++11 特性实现一个相对简单但符合良好代码规范的线程池,这个简化版本的线程池将展示基本的创建、任务分配和安全停止线程的功能。

简化版线程池实现

...

int main() {
    SimpleThreadPool pool(50);
    for(size_t i =1; i<=50;i++){
        pool.enqueue(print_task, i);
    }

    // 线程池会在对象销毁时自动清理所有线程
    return 0;
}

接下来将创建一个 SimpleThreadPool 的类实现线程池,并使其能使用 enqueue 分配任务不同线程。

#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>

class SimpleThreadPool {
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;

    std::mutex queue_mutex;
    std::condition_variable condition;
    std::atomic<bool> stop;
...
};

这个简化版本的线程池主要使用了std::thread, std::mutex, std::condition_variable, 和 std::queue,不涉及std::packaged_taskstd::future。这样可以减少对模板和高级特性的依赖,同时仍然提供线程池的基本功能。

workers.emplace_back([this] {
    while (true) {
        std::function<void()> task;
        
        {
            std::unique_lock<std::mutex> lock(this->queue_mutex);
            this->condition.wait(lock, [this] {
                return this->stop || !this->tasks.empty();
            });
            if (this->stop && this->tasks.empty())
                return;
            task = std::move(this->tasks.front());
            this->tasks.pop();
        }
        
        task();
    }
});

emplace_back 直接在 workers 向量的内存位置构造一个 std::thread 对象,传入 Lambda 表达式作为线程函数,避免了额外的复制或移动操作。

// 使用 push_back
std::thread worker([this] {
    // Lambda 表达式,作为线程函数
    while (true) {
        std::function<void()> task;

        {
            std::unique_lock<std::mutex> lock(this->queue_mutex);
            this->condition.wait(lock, [this] {
                return this->stop || !this->tasks.empty();
            });
            if (this->stop && this->tasks.empty())
                return;
            task = std::move(this->tasks.front());
            this->tasks.pop();
        }

        task();
    }
});

// 将构造好的线程对象移动到 vector 中
workers.push_back(std::move(worker));

构造和移动:std::thread worker(...) 先在局部变量 worker 中构造线程对象,然后使用 std::move 将其移动到 workers 向量中。

使用 emplace_back 可以更加高效,因为它避免了临时对象的创建和随后的移动操作。尽管 push_backstd::move 也可以达到相同的效果,但 emplace_back 更加简洁和直接。

在创建 std::thread 对象时,传入的函数(包括 Lambda 表达式)会在新线程启动时立即执行。
主线程和新线程是并行执行的,主线程不会等待新线程完成,除非显式调用 join 方法。
在你的线程池代码中,每个工作线程被创建后,会立即执行传入的 Lambda 表达式,并在没有任务时阻塞等待。


class SimpleThreadPool {
...
public:
    SimpleThreadPool(size_t num_threads) : stop(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            // 创建线程并加入容器
            workers.emplace_back(...);   // 见上文
        }
    }

    ~SimpleThreadPool() {
        {
            // 析构时停止线程池所有线程
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        // 
        for (std::thread &worker : workers) {
            worker.join();
        }
    }
}

void print_task(int n) {
    std::cout << "Task " << n << " is running." << std::endl;
}
  1. 同步线程: join() 会阻塞调用线程,直到被连接的线程执行完毕。 这确保主线程会等待工作线程完成其任务后再继续执行,避免了主线程过早结束导致资源未正确释放或程序行为异常的问题。
  2. 释放资源: join() 会释放与线程相关的资源,包括线程栈和线程本地存储 (TLS)。 如果不调用 join(),这些资源将无法被释放,导致资源泄漏。
  3. 不调用 join() 的后果:
    主线程过早结束: 主线程可能在工作线程完成之前结束,导致程序无法按预期运行。
  4. 资源泄漏: 与线程相关的资源无法被释放,占用系统资源,影响程序性能。
  5. std::terminate() 调用: 如果程序结束时仍有未连接的线程,C++ 运行时会调用 std::terminate() 函数终止程序。
    template<class F, class... Args>
    void enqueue(F&& f, Args&&... args) {
        // 绑定传入的函数和其参数
        auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }
            tasks.emplace(task);
        }
        condition.notify_one();
    }

代码说明

  1. 线程管理

    • 线程在构造函数中创建,并在析构函数中安全地停止和回收。
    • 使用 std::atomic<bool> 来标记线程池是否停止,保证线程状态的线程安全访问。
  2. 任务队列

    • 任务存储在 std::queue 中,使用 std::mutex 保护队列以避免并发冲突。
    • std::condition_variable 用于线程同步,让线程在没有任务时等待。
  3. 任务提交

    • 使用 std::bind 来包装带参数的函数调用,允许将不同的任务和参数推入队列中。
    • 提交任务时,如果线程池已停止,则抛出异常。

这个简化版本的线程池易于理解,适合初学者学习多线程和线程池的基本概念,同时它确保了线程的正确创建和销毁,避免了资源泄露和其他常见的并发问题。

这段代码是线程池类中的一个成员函数,名为 enqueue,它的作用是向线程池的任务队列中添加一个新的任务。下面详细解释每一部分:

模板参数

template<class F, class... Args>

这是一个模板函数,用来接受任意类型的可调用对象 F 和其参数 Args...。这种设计允许 enqueue 方法接受各种不同签名的函数或者函数对象,并传递适当的参数。

函数参数

void enqueue(F&& f, Args&&... args) {

这里使用了完美转发(Perfect Forwarding),F&&Args&&... 允许将实参以其原始类型(保持lvalue或rvalue特性)传递到函数内部。这是C++11中引入的一个特性,用来支持转发参数,同时避免不必要的拷贝。

任务包装

auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);

在这一行中,std::bind 用于创建一个新的可调用对象,它绑定了函数 f 和它的参数 argsstd::forward<F>(f)std::forward<Args>(args)... 确保函数和参数的lvalue或rvalue属性被保留,这是为了最大化效率和灵活性。生成的 task 是一个无参的可调用对象,它在被调用时会执行 f(args...)

任务添加到队列

{
    std::unique_lock<std::mutex> lock(queue_mutex);
    if (stop) {
        throw std::runtime_error("enqueue on stopped ThreadPool");
    }
    tasks.emplace(task);
}

这一块代码首先使用 std::unique_lock<std::mutex>queue_mutex 进行加锁,以确保线程安全地修改任务队列。如果线程池已经设置为停止 (stoptrue),则抛出异常,因为不应该向已停止的线程池提交新任务。如果线程池未停止,新创建的任务 task 会被添加到 tasks 队列中。

通知工作线程

condition.notify_one();

这行代码用 std::condition_variablenotify_one 方法来唤醒一个正在等待任务的线程(如果有的话)。这意味着如果队列中有任务等待处理,并且至少有一个线程处于等待状态,则其中一个线程将被唤醒来执行这个新添加的任务。

总结

这个 enqueue 函数是线程池的核心功能之一,它不仅允许以线程安全的方式向任务队列添加新任务,而且通过条件变量的合理使用,有效管理了线程之间的工作分配。通过模板和完美转发的使用,这个函数同时也保证了高度的通用性和效率。

知识拓展
std::thread worker([this] { ... }); 的确是直接构造了一个 std::thread 对象,并立即启动了线程。
直接构造和赋值/new 的区别:
直接构造:
std::thread worker(...); 这种方式直接调用 std::thread 的构造函数,创建并初始化一个 std::thread 对象。
线程会在构造函数完成时立即启动。
这种方式简洁高效,不需要额外的赋值操作。
赋值:
std::thread worker; worker = std::thread(...);
这种方式先默认构造一个 std::thread 对象,然后使用赋值运算符将一个新的线程对象赋值给它。
线程会在赋值运算符完成时启动。
这种方式需要进行两次操作,效率略低于直接构造。
new 关键字:
std::thread *worker = new std::thread(...);
这种方式在堆上动态分配内存,并创建一个 std::thread 对象。
线程会在 new 表达式完成时启动。
这种方式需要手动管理内存,并在使用完后使用 delete 运算符释放内存。
选择建议:
在大多数情况下,直接构造 std::thread 对象是最简洁高效的方式。 只有在需要动态管理线程生命周期时才考虑使用 new 关键字。 避免使用赋值方式,因为它需要进行两次操作,效率较低。
赞(0)
版权属于:

鱼一的博客 ◡̈

本文链接:

https://yuyi.monster/archives/220/(转载时请注明本文出处及文章链接)

评论 (0)

More Info for me 📱

IP信息

人生倒计时

今日已经过去小时
这周已经过去
本月已经过去
今年已经过去个月