Linux多线程——互斥锁的封装与生产消费模型的实现

961 words

生产者消费者模型

生产者消费者模型是一个多线程的应用模型

主要分为两个角色,生产者和消费者,这两个角色又可以用多个线程实际运作

这两者之间需要对生产者生产的产品,进行交互,具体来说就是使用阻塞队列来进行产品的交互

这里的产品可以是很多东西,我们可以认为产品是一个类,这个类可以是一切

在整个生产、消费的过程中,有三种关系

生产者与生产者之间的关系,我们不允许两个生产者共同写,他们之间是互斥关系

消费者与消费者的关系,我们也不允许两个消费者共同取,他们之间也是互斥关系

生产者与消费者之间的关系,不允许一边写一边取,这是互斥关系,并且要求先写后取,这是互斥关系

为了更方便的使用互斥锁,我们把系统提供的互斥锁接口进行封装,并且贯彻一下RAII思想,让我们的使用更加方便

互斥锁的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

class Mutex // 互斥锁,具体的锁在外部定义,传进来进行调用
{
public:
Mutex(pthread_mutex_t *lock)
: _lock(lock)
{
}
~Mutex() {}
void Lock()
{
pthread_mutex_lock(_lock);
}
void UnLock()
{
pthread_mutex_unlock(_lock);
}

private:
pthread_mutex_t *_lock;
};

class LockGuard
{
public:
LockGuard(pthread_mutex_t *lock)
: _mutex(lock)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.UnLock();
}

private:
Mutex _mutex;
};

这里主要封装了两个类,第一个是封装了一下Mutex,因为那两个函数实在太丑了

第二个类是用来实现RAII的,在类声明时自动上锁,在生命周期结束时自动解锁

交互任务

生产者与消费者交互的地点是阻塞队列,而他们交互的内容可以是数据、对象、任务

我们这样定义,生产者负责布置任务,然后通过阻塞队列,传递给消费者,让消费者完成任务

这里的任务可以是很多,例如抢票、数据处理等等

我们设置一个随机计算加减乘除的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>

#define DEFAULT_VALUE 0

enum
{
OK = 0,
div_zero,
mod_zero,
unknow
};

const std::string operators = "+-*/%___"; // 加减乘除和模拟其他未知操作

class Task
{
public:
Task()
{
}
Task(int x, int y, char op)
: data_x(x), data_y(y), Operator(op), result(DEFAULT_VALUE), code(OK)
{
}

void Run()
{
switch (Operator)
{
case '+':
result = data_x + data_y;
break;
case '-':
result = data_x - data_y;
break;
case '*':
result = data_x * data_y;
break;
case '/':
if (data_y == 0)
code = div_zero;
else
result = data_x / data_y;
break;
case '%':
if (data_y == 0)
code = mod_zero;
else
result = data_x % data_y;
break;
default:
code = unknow;
break;
}
}

void operator()()
{
Run();
sleep(2);
}

std::string PrintTask()
{
return std::to_string(data_x) + Operator + std::to_string(data_y) + "=?";
}

std::string PrintResult()
{
return std::to_string(data_x) + Operator + std::to_string(data_y) + "=" + std::to_string(result) + "[" + std::to_string(code) + "]";
}
~Task()
{
}

private:
int data_x;
int data_y;
char Operator;

int result;
int code; // 任务完成的怎么样,是否有错误
};

阻塞队列

接下来就是阻塞队列的实现,要注意分别提供给生产者和消费者的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#pragma once
#include <iostream>
#include <queue>
#include "LockGuard.hpp"

const int TOP = 5;

template <class T>
class BlockQueue
{
public:
BlockQueue(int top = TOP)
: _capacity(TOP)
{
}
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.size() == 0;
}
void Push(const T &in)
{
LockGuard lockguard(&_mutex);
while (IsFull)
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_c_cond);
}
void Pop(T *out)
{
LockGuard lockguard(&_mutex);
while (IsEmpty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_p_cond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity; // 阻塞队列上限
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _p_cond; // 生产者信号量
pthread_cond_t _c_cond; // 消费者信号量
};

测试

最后是测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#pragma once
#include <iostream>
#include <queue>
#include "LockGuard.hpp"

const int TOP = 5;

template <class T>
class BlockQueue
{
public:
BlockQueue(int top = TOP)
: _capacity(TOP)
{
}
bool IsFull()
{
return _q.size() == _capacity;
}
bool IsEmpty()
{
return _q.size() == 0;
}
void Push(const T &in)
{
LockGuard lockguard(&_mutex);
while (IsFull())
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_c_cond);
}
void Pop(T *out)
{
LockGuard lockguard(&_mutex);
while (IsEmpty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_p_cond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity; // 阻塞队列上限
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _p_cond; // 生产者信号量
pthread_cond_t _c_cond; // 消费者信号量
};
Comments