Linux多线程——日志任务的线程池实现

794 words

线程池

线程池可以说是把之前所有的内容全部串联起来的一个项目

我们这里实现一个简单的版本,可以对其进行扩展

线程池也是一种生产者消费者模型

生产者布置任务而消费者处理任务

主要运用的场景是需要大量现场完成任务,任务完成时间较短,例如WEB服务器中的网页请求

线程池的使用非常简单

就是创建固定数量的线程,然后往任务列表里推送任务即可

线程池会自动分派线程去完成

我们将之前的所有内容可以串联起来做一个小型项目,非常建议阅读并自行实现,写代码才是学习编程的最好方式

  1. 互斥锁和信号量封装
  2. 线程库封装
  3. 日志系统

日志系统完善

这里主要完善了日志系统写的方式,可以向屏幕输出、单个文件输出、按照等级划分的文件输出

借用了隔壁日志系统的代码,那边也还没写完

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#pragma once
#include "level.hpp"
#include "format.hpp"
#include "util.hpp"
#include <fstream>

const std::string logdir = "./log/";

enum WriteStyle
{
Screen = 114, // 屏幕打印
OneFile, // 全部打印到一个文件
ClassFile // 按等级分类打印
};

void WriteToOneFile(const std::string &str, const std::string &logname = "logALL")
{
Xulog::Util::File::createDirectory(logdir);
std::string fullLogName = logdir + logname;
std::ofstream out(fullLogName, std::ios_base::app);
if (!out.is_open())
{
std::cout << fullLogName << ":文件打开失败" << std::endl;
}
out.write(str.c_str(), str.size());
out.close();
}

void WriteToClassFile(const std::string &str, Xulog::LogLevel::value val)
{
std::string logname = "log";
logname += Xulog::LogLevel::toString(val);
WriteToOneFile(str, logname);
}

void WriteLog(Xulog::LogMsg msg, Xulog::Formatter fmt, WriteStyle style = Screen)
{
std::string str = fmt.Format(msg);
switch (style)
{
case Screen:
std::cout << str;
break;
case OneFile:
WriteToOneFile(str);
break;
case ClassFile:
WriteToClassFile(str, msg._level);
break;
}
}

线程池的实现

线程数据

定义一个ThreadData,用于获取当前线程的信息,可以添加别的信息,例如创建时间,运行时间等

1
2
3
4
5
6
7
8
9
10
class ThreadData
{
public:
ThreadData(const std::string &name)
: threadname(name)
{
}
~ThreadData() {}
std::string threadname;
};

线程池的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#pragma once
#include "Thread.hpp"
#include "LockGuard.hpp"
#include "write.hpp"
#include <iostream>
#include <queue>
#include <vector>
#include <functional>
#include <unistd.h>

static const int THREAD_NUM = 5;
Xulog::Formatter fmt;

class ThreadData
{
public:
ThreadData(const std::string &name)
: threadname(name)
{
}
~ThreadData() {}
std::string threadname;
};

template <class T>
class ThreadPool
{
public:
ThreadPool(int thread_num = THREAD_NUM)
: _thread_num(thread_num)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
// 构建线程
for (int i = 0; i < _thread_num; i++)
{
std::string threadname = "thread-";
threadname += std::to_string(i + 1);
ThreadData td(threadname);
_threads.emplace_back(threadname, std::bind(&ThreadPool::ThreadRun, this, std::placeholders::_1), td);
Xulog::LogMsg msg(Xulog::LogLevel::value::DEBUG, 0, "default", td.threadname.c_str(), "线程已经启动");
WriteLog(msg, fmt);
}
}
void ThreadRun(ThreadData &td)
{
while (true)
{
// 取到任务
T task;
{
LockGuard lockguard(&_mutex);
while (_task_que.empty())
{
ThreadWait(td);
}
task = _task_que.front();
_task_que.pop();
}
// 处理任务
task();
Xulog::LogMsg msg(Xulog::LogLevel::value::DEBUG, 0, "default", td.threadname.c_str(), "任务已经完成,结果是:" + task.PrintResult());
WriteLog(msg, fmt);
sleep(1);
}
}
void ThreadWait(ThreadData &td)
{
Xulog::LogMsg msg(Xulog::LogLevel::value::INFO, 0, "default", "root", td.threadname + "休眠了");
WriteLog(msg, fmt);

pthread_cond_wait(&_cond, &_mutex);
}
void ThreadWeakUp()
{
pthread_cond_signal(&_cond);
}
bool Start()
{
// 启动线程
for (auto &thread : _threads)
{
thread.Start();
}
return true;
}
void Push(const T &in)
{
Xulog::LogMsg msg(Xulog::LogLevel::value::DEBUG, 0, "default", "root", "生成了一个任务,任务是:" + in.PrintTask());
WriteLog(msg, fmt);

LockGuard lockguard(&_mutex);
_task_que.push(in);
ThreadWeakUp();
}
void Wait() // 调试方法
{
for (auto &thread : _threads)
{
thread.Join();
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}

private:
std::queue<T> _task_que;
std::vector<Thread<ThreadData>> _threads;
int _thread_num;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
};

完整代码

线程池+日志系统(github.com)

Comments