c++ 简单线程池

头文件

#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>

class ThreadPool {
public:
    ThreadPool(size_t numThreads);
    ~ThreadPool();

    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)->std::future<typename std::result_of<F(Args...)>::type>;
private:
    void workerThread();
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;

    std::mutex queueMutex;
    std::condition_variable condition;
    bool stop;
};


cpp

 #include "threadPool.h"

ThreadPool::ThreadPool(size_t numThreads) : stop(false)
{
    for (size_t i = 0; i < numThreads; ++i)
    {
        workers.emplace_back(&ThreadPool::workerThread, this);
    }
}

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        stop = true;
    }
    condition.notify_all();
    for (std::thread& worker : workers) {
        worker.join();
    }
}

void ThreadPool::workerThread() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            condition.wait(lock, [this] { return stop || !tasks.empty(); });
            if (stop && tasks.empty()) {
                return;
            }
            if (!tasks.empty()) {
                task = std::move(tasks.front());
                tasks.pop();
            }
        }
        if (task) {
            task();
        }
    }
}

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using returnType = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared<std::packaged_task<returnType()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

    std::future<returnType> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);

        if (stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }

        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

int main() {
    ThreadPool pool(4);

    auto result1 = pool.enqueue([](int answer) { return answer; }, 42);
    auto result2 = pool.enqueue([](int a, int b) { return a + b; }, 5, 7);

    std::cout << "Result1: " << result1.get() << std::endl;
    std::cout << "Result2: " << result2.get() << std::endl;

    return 0;
}

解释

enqueue

Sure, let’s break down the enqueue function template step-by-step:

Template Definition

template<class F, class... Args>

This defines a function template that can take any callable object F (such as a function, function pointer, lambda, or functor) and a variadic list of arguments Args. The class... Args syntax allows the function to accept any number of additional arguments, making it very flexible.

Return Type

auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;

The return type of the function is a std::future that will hold the result of invoking the callable object F with arguments Args.... The std::result_of<F(Args...)>::type part determines the type that will be returned by calling F with Args....

  • std::result_of<F(Args...)>::type: This uses the std::result_of type trait to deduce the return type of calling the function F with arguments Args....

Function Body

auto task = std::make_shared<std::packaged_task<returnType()>>(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

Here, we are creating a std::shared_ptr to a std::packaged_task. A std::packaged_task wraps a callable object, allowing it to be executed asynchronously and providing a future to retrieve the result.

  • returnType: This is a type alias for the return type of the callable object F with arguments Args....
  • std::make_shared<std::packaged_task<returnType()>>: This creates a shared pointer to a std::packaged_task that will eventually execute the callable F with the provided arguments.
  • std::bind(std::forward<F>(f), std::forward<Args>(args)...): This binds the callable F with the provided arguments Args..., allowing them to be stored and called later.

Storing the Task

{
    std::unique_lock<std::mutex> lock(queueMutex);

    if (stop) {
        throw std::runtime_error("enqueue on stopped ThreadPool");
    }

    tasks.emplace([task](){ (*task)(); });
}

This block ensures that access to the task queue is synchronized using a mutex lock. It checks if the thread pool is stopped and throws an exception if it is. Otherwise, it adds the task to the queue.

  • std::unique_lock<std::mutex> lock(queueMutex): Locks the mutex to ensure thread-safe access to the task queue.
  • if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); }: Checks if the thread pool has been stopped. If so, it throws an exception.
  • tasks.emplace([task](){ (*task)(); }): Adds a new task to the task queue. The task is a lambda function that calls the packaged_task to execute the callable F with its arguments.

Notifying Workers

condition.notify_one();

This line notifies one of the worker threads that a new task is available. The worker threads are waiting on this condition variable and will wake up to process the new task.

Returning the Future

return res;

Finally, the function returns the std::future associated with the packaged_task. This future can be used by the caller to get the result of the task once it has been executed by a worker thread.

Full Example for Clarity

Here’s the complete enqueue function with the detailed explanation:

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using returnType = typename std::result_of<F(Args...)>::type;

    // Create a packaged_task to wrap the callable and its arguments
    auto task = std::make_shared<std::packaged_task<returnType()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );

    // Get the future associated with the task
    std::future<returnType> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queueMutex);

        // Ensure the thread pool is not stopped
        if (stop) {
            throw std::runtime_error("enqueue on stopped ThreadPool");
        }

        // Add the task to the queue
        tasks.emplace([task](){ (*task)(); });
    }

    // Notify one of the waiting worker threads
    condition.notify_one();
    return res;
}

Summary

  1. Template Parameters: F is the callable, Args... are the arguments.
  2. Return Type: A std::future that holds the result of the callable.
  3. Creating Packaged Task: Wraps the callable and its arguments to be executed asynchronously.
  4. Mutex Lock: Ensures thread-safe access to the task queue.
  5. Task Queue: Adds the task to the queue.
  6. Notify Worker: Notifies a worker thread to process the new task.
  7. Return Future: Provides a future to get the result of the task.

Next Steps

a. Add unit tests to verify the ThreadPool implementation.
b. Implement error handling for tasks within the ThreadPool.

enqueue

好的,下面是对每一行代码的详细解释:

函数模板声明

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
  • template<class F, class… Args>: 定义一个模板函数,该函数接受一个可调用对象 F 和一组参数 Args...
  • auto ThreadPool::enqueue(F&& f, Args&&… args): enqueueThreadPool 类的成员函数,它使用了模板参数 FArgs...,并通过右值引用接收这些参数(即使用 &&)。
  • -> std::future<typename std::result_of<F(Args…)>::type>: 函数返回类型是一个 std::future,其类型由 F 调用 Args... 参数后返回的结果类型决定。

使用别名 returnType

using returnType = typename std::result_of<F(Args...)>::type;
  • using returnType: 使用类型别名将 std::result_of<F(Args...)>::type 定义为 returnType
  • std::result_of<F(Args…)>::type: 通过 std::result_of 获取可调用对象 F 使用参数 Args... 后的返回类型。

创建 packaged_task

auto task = std::make_shared<std::packaged_task<returnType()>>(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
  • std::make_shared<std::packaged_task<returnType()>>: 创建一个 std::shared_ptr,指向一个 std::packaged_task 对象。
  • std::packaged_task<returnType()>: std::packaged_task 是一个模板类,用于包装一个可调用对象,以便异步调用并获取其结果。
  • std::bind(std::forward(f), std::forward(args)…): 使用 std::bind 将可调用对象 F 和参数 Args... 绑定在一起,生成一个新的可调用对象,并将其传递给 std::packaged_taskstd::forward 确保参数的完美转发。

获取 future

std::future<returnType> res = task->get_future();
  • std::future res: 定义一个 std::future 对象 res,用于保存 packaged_task 的结果。
  • task->get_future(): 调用 packaged_taskget_future 方法,获取与任务关联的 std::future 对象。

加锁并检查线程池状态

{
    std::unique_lock<std::mutex> lock(queueMutex);
    if (stop) {
        throw std::runtime_error("enqueue on stopped ThreadPool");
    }
    tasks.emplace([task](){ (*task)(); });
}
  • std::unique_lockstd::mutex lock(queueMutex): 创建一个互斥锁对象 lock,并锁定 queueMutex,确保对任务队列的访问是线程安全的。
  • if (stop): 检查线程池是否已停止接受新任务。
  • throw std::runtime_error(“enqueue on stopped ThreadPool”): 如果线程池已停止,抛出一个运行时错误异常。
  • tasks.emplace(task{ (*task)(); }): 将一个新的任务添加到任务队列中。这个任务是一个 lambda 函数,调用 task 对象的 operator() 来执行实际的任务。

通知一个等待的线程

condition.notify_one();
  • condition.notify_one(): 通知一个正在等待 condition 的线程,让它从等待中醒来,以便处理新添加的任务。

返回 future

return res;
  • return res: 返回先前获取的 std::future 对象,让调用者可以在稍后获取任务的结果。

为什么使用 std::future 的详细解释:

  1. 异步任务的结果获取
    当我们将一个任务提交到线程池时,任务是异步执行的。std::future 提供了一种机制,让我们可以在任务执行完成后,获取其结果,而不需要阻塞主线程或者轮询状态。
auto result = pool.enqueue([](int x) { return x * x; }, 10);
std::cout << result.get() << std::endl; // 获取任务结果
  1. 同步等待任务完成
    std::future 提供了 get() 方法,这个方法会阻塞调用它的线程,直到任务完成并返回结果。这样,我们可以在需要的时候等待任务完成并获取结果。
auto result = pool.enqueue([](int x) { return x * x; }, 10);
result.get(); // 阻塞等待任务完成并获取结果
  1. 异常传播
    如果任务在执行过程中抛出异常,std::future 会捕获这个异常,并在调用 get() 时重新抛出。这使得我们可以在调用 get() 时处理任务中的异常。
auto result = pool.enqueue([]() { throw std::runtime_error("Error"); });
try {
    result.get();
} catch (const std::exception& e) {
    std::cout << "Caught exception: " << e.what() << std::endl;
}
  1. 使用简单方便
    std::future 和 std::promise 以及 std::packaged_task 的组合使用,使得在多线程环境中管理任务和获取任务结果变得非常简单和方便。

参考资料
chatgpt
现代 C++ 编程实战

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/770177.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JAVA+SSM+VUE《教学视频点播系统》

1管理员登录 管理员登录&#xff0c;通过填写用户名、密码、角色等信息&#xff0c;输入完成后选择登录即可进入视频点播系统&#xff0c;如图1所示。 图1管理员登录界面图 2管理员功能实现 2.1 修改密码 管理员对修改密码进行填写原密码、新密码、确认密码并进行删除、修改…

【Python机器学习】算法链与管道——在网格搜索中使用管道

在网格搜索中使用管道的工作原理与使用任何其他估计器都相同。 我们定义一个需要搜索的参数网络&#xff0c;并利用管道和参数网格构建一个GridSearchCV。不过在指定参数网格时存在一处细微的变化。我们需要为每个参数指定它在管道中所属的步骤。我们要调节的两个参数C和gamma…

监控与安全服务

kali 系统 nmap扫描 网段的扫描 使用脚本扫描 使用john破解密码 哈希算法是一种单向加密的算法&#xff0c;也就是将原始数据生成一串“乱码”只能通过原始数据&#xff0c;生成这串“乱码”&#xff0c;但是不能通过“乱码”回推出原始数据相同的原始数据&#xff0c;生成的乱…

红酒与时尚秀场:品味潮流新风尚

在时尚与品味的交汇点上&#xff0c;红酒总是以其不同的方式&#xff0c;为每一次的时尚盛宴增添一抹诱人的色彩。当红酒遇上时尚秀场&#xff0c;不仅是一场视觉的盛宴&#xff0c;更是一次心灵的触动。今天&#xff0c;就让我们一起走进红酒与时尚秀场的世界&#xff0c;感受…

Elasticsearch:结合稀疏、密集和地理字段

作者&#xff1a;来自 Elastic Madhusudhan Konda 如何以自定义方式组合多个稀疏、密集和地理字段 Elasticsearch 是一款强大的工具&#xff0c;可用于近乎实时地搜索和分析数据。作为开发人员&#xff0c;我们经常会遇到包含各种不同字段的数据集。有些字段是必填字段&#x…

算法力扣刷题记录 二十八【225. 用队列实现栈】

前言 栈和队列篇。 记录 二十八【225. 用队列实现栈】 一、题目阅读 请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff09;的栈&#xff0c;并支持普通栈的全部四种操作&#xff08;push、top、pop 和 empty&#xff09;。 实现 MyStack 类&#xff1a; void p…

数据库安全审计系统:满足数据安全治理合规要求

伴随着数据库信息价值以及可访问性提升&#xff0c;使得数据库面对来自内部和外部的安全风险大大增加&#xff0c;如违规越权操作、恶意入侵导致机密信息窃取泄漏&#xff0c;但事后却无法有效追溯和审计。 国内专注于保密与非密领域的分级保护、等级保护、业务连续性安全和大数…

浅谈渗透测试实战

很多时候&#xff0c;在看白帽子们的漏洞的时候总有一种感觉就是把web渗透简单地理解成了发现web系统漏洞进而获取webshell。其实&#xff0c;个人感觉一个完整的渗透&#xff08;从黑客的角度去思考问题&#xff09;应该是以尽一切可能获取目标的系统或者服务器的最高权限&…

TCL中环可转债缩水近90亿:业绩持续承压,百亿自有资金购买理财

《港湾商业观察》廖紫雯 日前&#xff0c;TCL中环新能源科技股份有限公司&#xff08;以下简称&#xff1a;TCL中环&#xff0c;002129.SZ&#xff09;可转债总额缩水近90亿&#xff0c;引发市场关注。可转债大幅缩水的另一面&#xff0c;公司此前发布公告披露将使用百亿自有资…

深入详解RocketMQ源码安装与调试

1.源码下载 http://rocketmq.apache.org/dowloading/releases/ 2. 环境要求 64位系统JDK1.8(64位)Maven 3.2.x

[笔记] 卷积03 - 运算的对称性 时域构建高通滤波器的失败尝试

1.卷积运算具备足够好的对称性 1.在计算卷积时&#xff0c;两个函数的位置是可以颠倒的&#xff0c;对吧&#xff1f; 在卷积运算中&#xff0c;确实可以对参与卷积的两个函数进行颠倒。这是因为卷积的定义是通过一个函数与另一个函数的翻转后的形式进行积分运算。具体来说&a…

【系统架构设计师】计算机组成与体系结构 ⑨ ( 磁盘管理 | “ 磁盘 “ 单缓冲区 与 双缓冲区 | “ 磁盘 “ 单缓冲区 与 双缓冲区案例 )

文章目录 一、" 磁盘 " 单缓冲区 与 双缓冲区1、" 磁盘 " 单缓冲区2、" 磁盘 " 双缓冲区 二、" 磁盘 " 单缓冲区 与 双缓冲区案例1、案例描述2、磁盘单缓冲区 - 流水线分析3、磁盘双缓冲区 - 流水线分析 一、" 磁盘 " 单缓冲…

Avalonia应用在基于Linux的国产操作deepin上运行

deepin系统介绍 deepin(原名Linux Deepin)致力于为全球用户提供美观易用&#xff0c;安全可靠的 Linux发行版。deepin项目于2008年发起&#xff0c;并在2009年发布了以 linux deepin为名称的第一个版本。2014年4月更名为 deepin&#xff0c;在中国常被称为“深度操作系统”。 …

matlab 干涉图仿真

目录 一、算法概述1、干涉图2、生成步骤 二、代码实现三、结果展示 本文由CSDN点云侠原创&#xff0c;原文链接。如果你不是在点云侠的博客中看到该文章&#xff0c;那么此处便是不要脸的爬虫。 一、算法概述 1、干涉图 干涉图是两束或多束相干光波相遇时&#xff0c;它们的振…

大模型学习笔记3【大模型】LLaMA学习笔记

文章目录 学习内容LLaMALLaMA模型结构LLaMA下载和使用好用的开源项目[Chinese-Alpaca](https://github.com/ymcui/Chinese-LLaMA-Alpaca)Chinese-Alpaca使用量化评估 学习内容 完整学习LLaMA LLaMA 2023年2月&#xff0c;由FaceBook公开了LLaMA&#xff0c;包含7B&#xff0…

echarts柱状选中shadow阴影背景宽度设置

使用line&#xff0c;宽度增大到所需要的宽度&#xff0c;设置下颜色透明度就行 tooltip: {trigger: axis,//把阴影的层级往下降z:-15,axisPointer: {type: line,lineStyle: {color: rgba(150,150,150,0.3),width: 44,type: solid,},}, }, series: [{type: bar,barWidth:20,//…

探究Executors创建的线程池(如newFixedThreadPool)其核心线程数等参数的可调整性

java中提供Executors类来创建一些固定模板参数的线程池&#xff0c;如下图&#xff08;newWorkStealingPool除外&#xff0c;这个是创建ForkJoinPool的&#xff0c;这里忽略&#xff09;&#xff1a; 拿newFixedThreadPool方法创建线程池为例&#xff0c;newFixedThreadPool是…

24位DAC转换的FPGA设计及将其封装成自定义IP核的方法

在vivado设计中,为了方便的使用Block Desgin进行设计,可以使用vivado软件把自己编写的代码封装成IP核,封装后的IP核和原来的代码具有相同的功能。本文以实现24位DA转换(含并串转换,使用的数模转换器为CL4660)为例,介绍VIVADO封装IP核的方法及调用方法,以及DAC转换的详细…

【WEB前端2024】3D智体编程:乔布斯3D纪念馆-第54课-poplang语音编程控制机器人

【WEB前端2024】3D智体编程&#xff1a;乔布斯3D纪念馆-第54课-poplang语音编程控制机器人 使用dtns.network德塔世界&#xff08;开源的智体世界引擎&#xff09;&#xff0c;策划和设计《乔布斯超大型的开源3D纪念馆》的系列教程。dtns.network是一款主要由JavaScript编写的…

代码随想录——柠檬水找零(Leetcode860)

题目链接 贪心 class Solution {public boolean lemonadeChange(int[] bills) {if(bills[0] 10 || bills[0] 20 || bills[1] 20){return false;}int count5 1;int count10 0;for(int i 1; i < bills.length; i){if(bills[i] 5){count5;}if(bills[i] 10){count10;…