C++11特性之异步操作实现线程池

C++
1.3k words

std::future

std::future是C++11标准库中的一个模板类,表示异步操作的结果

当我们在进行多线程中使用异步任务时,std::future是用来帮我们在需要的时候获取任务执行的结果

他的一个重要的特性就是能够阻塞当前的线程,一直到异步操作完成,从而确保在获取结果的时候,异步工作线程的任务是完成的

主要应用场景就是异步任务,比如说网络请求或者计算密集型任务;并发控制,在多线程编程中,我们需要等待某些任务完成之后才能继续执行其他操作,实现线程同步;最后就是可以通过std::future::get()来获取任务的结果

用法

image-20241002155228574.png

async函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
#include <future>
#include "../logs/Xulog.h"

int Add(int num1, int num2)
{
INFO("正在计算...");
return num1 + num2;
}

int main()
{
// std::async(func, ...)
// std::async(policy, func, ...)
INFO("异步任务执行");
std::future<int> res = std::async(std::launch::deferred, Add, 114, 514);
INFO("获取结果");
int sum = res.get();
INFO("结果是%d", sum);
return 0;
}
1
2
3
4
[24-10-02|16:04:34][139769238738752][root][Async.cc:16][INFO]   异步任务执行
[24-10-02|16:04:34][139769238738752][root][Async.cc:18][INFO] 获取结果
[24-10-02|16:04:34][139769238738752][root][Async.cc:8][INFO] 正在计算...
[24-10-02|16:04:34][139769238738752][root][Async.cc:20][INFO] 结果是628

在我们调用获取结果之后,才显示正在计算,执行异步任务,

这个std::launch::deferred其实就是设置,在获取结果时,才会进行传参调用,deferred本身是推迟的意思

与之对应的就是std::launch::async,是如果有结果的话,直接获取结果,没有的话也是会阻塞等待

promise::get_future

这是一个模板类

image.png

主要是用来返回future对象,会和future共享一个同步状态

使用样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
#include "../logs/Xulog.h"

void Add(int num1, int num2, std::promise<int> &prom)
{
INFO("正在计算...");
prom.set_value(num1+num2);
}
int main()
{
std::promise<int> prom; // 创建promise对象
std::future<int> fu = prom.get_future(); // 关联future对象

std::thread thr(Add, 114, 514, std::ref(prom));
// 异步线程中对promise对象进行设置值

int res = fu.get();
// 从future中获取数据
INFO("结果是%d", res);
thr.join();
return 0;
}

这里面是隐含了一个阻塞问题,在promise和future中是同步的,自动进行同步

一定是先运行有了结果,才能获取

packaged_task::get_future

这是将一个函数封装起来,也可以返回一个封装对象,来获取他保存的这个函数的执行结果

image.png

演示是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>
#include <thread>
#include <future>
#include <memory>
#include "../logs/Xulog.h"

int Add(int num1, int num2)
{
INFO("正在计算...");
return num1 + num2;
}

int main()
{
// std::packaged_task<int(int, int)> task(Add); // 包装函数
// std::future<int> fu = task.get_future(); // 设置关联

// 这样是不行的
// std::async(std::launch::async, task, 114, 514);
// std::thread(task, 114, 514);

// task(11, 22); // 执行
// std::shared_ptr<std::packaged_task<int(int, int)>>
auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add);
std::future<int> fu = ptask->get_future(); // 设置关联

std::thread thr([ptask]()
{ (*ptask)(114, 514); });

int sum = fu.get();
INFO("结果是%d", sum);
thr.join();
return 0;
}

他可以使用可调用对象的使用,但又不能完全当作函数去使用

但是也可以曲线救国,让task封装在指针里,传给异步线程

如果单纯使用指针,存在生命周期的问题,有可能出现风险

我们就可以在堆上new对象,用智能指针来管理

线程池实现

std::packaged_task+std::future

使用方法:用户传入要执行的函数和参数,由线程池中的工作线程完成任务

实现:

  • 管理的成员
    • 任务池:用vector维护的一个函数池
    • 互斥锁&条件变量:同步互斥
    • 工作线程:从任务池取出任务执行任务
    • 结束允许标志:控制线程池结束
  • 管理的操作
    • 入队任务:入队一个函数和参数
    • 停止运行:终止线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <mutex>
#include <vector>
#include <condition_variable>
#include <atomic>
#include "../logs/Xulog.h"

class threadpool
{
public:
using Functor = std::function<void(void)>;
threadpool(int thr_count = 1) : _stop(false)
{
for (int i = 0; i < thr_count; i++)
_threads.emplace_back(&threadpool::entry, this);
}
~threadpool()
{
stop();
}
void stop()
{
if (_stop == true)
return;
_stop = true;
_cv.notify_all(); // 唤醒线程
for (auto &thread : _threads)
thread.join();
}

// 自动推导返回值类型
template <typename F, typename... Args>
auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
{
// 将传入函数封装成packaged_task任务包
using return_type = decltype(func(args...));
auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);
std::future<return_type> fu = task->get_future();
// 构造lambda表达式(捕获任务对象,函数内执行任务对象)
{
std::unique_lock<std::mutex> lock(_mutex);
// 将构造出来的匿名对象传入任务池
_taskpool.push_back([task]()
{ (*task)(); });
_cv.notify_one();
}
return fu;
}

private:
// 线程入口函数 从任务池中取出任务执行
void entry()
{
while (!_stop)
{
// 临时任务池
// 避免频繁加解锁
std::vector<Functor> tmp_taskpool;
{
// 加锁
std::unique_lock<std::mutex> lock(_mutex);
// 等待任务不为空或_stop被置为1
_cv.wait(lock, [this]()
{ return _stop || !_taskpool.empty(); });

// 取出任务进行执行
tmp_taskpool.swap(_taskpool);
}
for (auto &task : tmp_taskpool)
{
task();
}
}
}

private:
std::atomic<bool> _stop;
std::vector<Functor> _taskpool;
std::mutex _mutex;
std::condition_variable _cv;
std::vector<std::thread> _threads;
};

int Add(int num1, int num2)
{
return num1 + num2;
}

int main()
{
threadpool pool;
for (int i = 0; i < 10; i++)
{
std::future<int> fu = pool.push(Add, 114, 514 + i);
INFO("结果是%d", fu.get());
}
pool.stop();
return 0;
}
Comments