Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
threadpool.hpp
浏览该文件的文档.
1#pragma once
2#include <iostream>
3#include <functional>
4#include <memory>
5#include <thread>
6#include <future>
7#include <mutex>
8#include <vector>
9#include <condition_variable>
10#include <atomic>
11#include "../third/Xulog/logs/Xulog.h"
12
13namespace XuMQ
14{
16 {
17 public:
18 using ptr = std::shared_ptr<threadpool>;
19 using Functor = std::function<void(void)>;
20 threadpool(int thr_count = 1) : _stop(false)
21 {
22 for (int i = 0; i < thr_count; i++)
23 _threads.emplace_back(&threadpool::entry, this);
24 }
26 {
27 stop();
28 }
29 void stop()
30 {
31 if (_stop == true)
32 return;
33 _stop = true;
34 _cv.notify_all(); // 唤醒线程
35 for (auto &thread : _threads)
36 thread.join();
37 }
38
39 // 自动推导返回值类型
40 template <typename F, typename... Args>
41 auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>
42 {
43 // 将传入函数封装成packaged_task任务包
44 using return_type = decltype(func(args...));
45 auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);
46 auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);
47 std::future<return_type> fu = task->get_future();
48 // 构造lambda表达式(捕获任务对象,函数内执行任务对象)
49 {
50 std::unique_lock<std::mutex> lock(_mutex);
51 // 将构造出来的匿名对象传入任务池
52 _taskpool.push_back([task]()
53 { (*task)(); });
54 _cv.notify_one();
55 }
56 return fu;
57 }
58
59 private:
60 // 线程入口函数 从任务池中取出任务执行
61 void entry()
62 {
63 while (!_stop)
64 {
65 // 临时任务池
66 // 避免频繁加解锁
67 std::vector<Functor> tmp_taskpool;
68 {
69 // 加锁
70 std::unique_lock<std::mutex> lock(_mutex);
71 // 等待任务不为空或_stop被置为1
72 _cv.wait(lock, [this]()
73 { return _stop || !_taskpool.empty(); });
74
75 // 取出任务进行执行
76 tmp_taskpool.swap(_taskpool);
77 }
78 for (auto &task : tmp_taskpool)
79 {
80 task();
81 }
82 }
83 }
84
85 private:
86 std::atomic<bool> _stop;
87 std::vector<Functor> _taskpool;
88 std::mutex _mutex;
89 std::condition_variable _cv;
90 std::vector<std::thread> _threads;
91 };
92}
Definition threadpool.hpp:16
std::atomic< bool > _stop
Definition threadpool.hpp:86
std::vector< Functor > _taskpool
Definition threadpool.hpp:87
std::shared_ptr< threadpool > ptr
Definition threadpool.hpp:18
void stop()
Definition threadpool.hpp:29
std::vector< std::thread > _threads
Definition threadpool.hpp:90
auto push(F &&func, Args &&...args) -> std::future< decltype(func(args...))>
Definition threadpool.hpp:41
std::mutex _mutex
Definition threadpool.hpp:88
std::condition_variable _cv
Definition threadpool.hpp:89
std::function< void(void)> Functor
Definition threadpool.hpp:19
void entry()
Definition threadpool.hpp:61
threadpool(int thr_count=1)
Definition threadpool.hpp:20
~threadpool()
Definition threadpool.hpp:25
Definition channel.hpp:22