std::future
std::future是C++11标准库中的一个模板类,表示异步操作的结果
当我们在进行多线程中使用异步任务时,std::future是用来帮我们在需要的时候获取任务执行的结果
他的一个重要的特性就是能够阻塞当前的线程,一直到异步操作完成,从而确保在获取结果的时候,异步工作线程的任务是完成的
主要应用场景就是异步任务,比如说网络请求或者计算密集型任务;并发控制,在多线程编程中,我们需要等待某些任务完成之后才能继续执行其他操作,实现线程同步;最后就是可以通过std::future::get()来获取任务的结果
用法

async函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #include <iostream> #include <thread> #include <future> #include "../logs/Xulog.h"
int Add(int num1, int num2) { INFO("正在计算..."); return num1 + num2; }
int main() { INFO("异步任务执行"); std::future<int> res = std::async(std::launch::deferred, Add, 114, 514); INFO("获取结果"); int sum = res.get(); INFO("结果是%d", sum); return 0; }
|
1 2 3 4
| [24-10-02|16:04:34][139769238738752][root][Async.cc:16][INFO] 异步任务执行 [24-10-02|16:04:34][139769238738752][root][Async.cc:18][INFO] 获取结果 [24-10-02|16:04:34][139769238738752][root][Async.cc:8][INFO] 正在计算... [24-10-02|16:04:34][139769238738752][root][Async.cc:20][INFO] 结果是628
|
在我们调用获取结果之后,才显示正在计算,执行异步任务,
这个std::launch::deferred
其实就是设置,在获取结果时,才会进行传参调用,deferred本身是推迟的意思
与之对应的就是std::launch::async
,是如果有结果的话,直接获取结果,没有的话也是会阻塞等待
promise::get_future
这是一个模板类

主要是用来返回future对象,会和future共享一个同步状态
使用样例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| #include <iostream> #include <thread> #include <future> #include <chrono> #include "../logs/Xulog.h"
void Add(int num1, int num2, std::promise<int> &prom) { INFO("正在计算..."); prom.set_value(num1+num2); } int main() { std::promise<int> prom; std::future<int> fu = prom.get_future();
std::thread thr(Add, 114, 514, std::ref(prom));
int res = fu.get(); INFO("结果是%d", res); thr.join(); return 0; }
|
这里面是隐含了一个阻塞问题,在promise和future中是同步的,自动进行同步
一定是先运行有了结果,才能获取
packaged_task::get_future
这是将一个函数封装起来,也可以返回一个封装对象,来获取他保存的这个函数的执行结果

演示是这样的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| #include <iostream> #include <thread> #include <future> #include <memory> #include "../logs/Xulog.h"
int Add(int num1, int num2) { INFO("正在计算..."); return num1 + num2; }
int main() {
auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add); std::future<int> fu = ptask->get_future();
std::thread thr([ptask]() { (*ptask)(114, 514); });
int sum = fu.get(); INFO("结果是%d", sum); thr.join(); return 0; }
|
他可以使用可调用对象的使用,但又不能完全当作函数去使用
但是也可以曲线救国,让task封装在指针里,传给异步线程
如果单纯使用指针,存在生命周期的问题,有可能出现风险
我们就可以在堆上new对象,用智能指针来管理
线程池实现
std::packaged_task+std::future
使用方法:用户传入要执行的函数和参数,由线程池中的工作线程完成任务
实现:
- 管理的成员
- 任务池:用vector维护的一个函数池
- 互斥锁&条件变量:同步互斥
- 工作线程:从任务池取出任务执行任务
- 结束允许标志:控制线程池结束
- 管理的操作
- 入队任务:入队一个函数和参数
- 停止运行:终止线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| #include <iostream> #include <functional> #include <memory> #include <thread> #include <future> #include <mutex> #include <vector> #include <condition_variable> #include <atomic> #include "../logs/Xulog.h"
class threadpool { public: using Functor = std::function<void(void)>; threadpool(int thr_count = 1) : _stop(false) { for (int i = 0; i < thr_count; i++) _threads.emplace_back(&threadpool::entry, this); } ~threadpool() { stop(); } void stop() { if (_stop == true) return; _stop = true; _cv.notify_all(); for (auto &thread : _threads) thread.join(); }
template <typename F, typename... Args> auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))> { using return_type = decltype(func(args...)); auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...); auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func); std::future<return_type> fu = task->get_future(); { std::unique_lock<std::mutex> lock(_mutex); _taskpool.push_back([task]() { (*task)(); }); _cv.notify_one(); } return fu; }
private: void entry() { while (!_stop) { std::vector<Functor> tmp_taskpool; { std::unique_lock<std::mutex> lock(_mutex); _cv.wait(lock, [this]() { return _stop || !_taskpool.empty(); });
tmp_taskpool.swap(_taskpool); } for (auto &task : tmp_taskpool) { task(); } } }
private: std::atomic<bool> _stop; std::vector<Functor> _taskpool; std::mutex _mutex; std::condition_variable _cv; std::vector<std::thread> _threads; };
int Add(int num1, int num2) { return num1 + num2; }
int main() { threadpool pool; for (int i = 0; i < 10; i++) { std::future<int> fu = pool.push(Add, 114, 514 + i); INFO("结果是%d", fu.get()); } pool.stop(); return 0; }
|