生产者消费者模型
生产者消费者模型是一个多线程的应用模型
主要分为两个角色,生产者和消费者,这两个角色又可以用多个线程实际运作
这两者之间需要对生产者生产的产品,进行交互,具体来说就是使用阻塞队列来进行产品的交互
这里的产品可以是很多东西,我们可以认为产品是一个类,这个类可以是一切
在整个生产、消费的过程中,有三种关系
生产者与生产者之间的关系,我们不允许两个生产者共同写,他们之间是互斥关系
消费者与消费者的关系,我们也不允许两个消费者共同取,他们之间也是互斥关系
生产者与消费者之间的关系,不允许一边写一边取,这是互斥关系,并且要求先写后取,这是互斥关系
为了更方便的使用互斥锁,我们把系统提供的互斥锁接口进行封装,并且贯彻一下RAII思想,让我们的使用更加方便
互斥锁的封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| class Mutex { public: Mutex(pthread_mutex_t *lock) : _lock(lock) { } ~Mutex() {} void Lock() { pthread_mutex_lock(_lock); } void UnLock() { pthread_mutex_unlock(_lock); }
private: pthread_mutex_t *_lock; };
class LockGuard { public: LockGuard(pthread_mutex_t *lock) : _mutex(lock) { _mutex.Lock(); } ~LockGuard() { _mutex.UnLock(); }
private: Mutex _mutex; };
|
这里主要封装了两个类,第一个是封装了一下Mutex,因为那两个函数实在太丑了
第二个类是用来实现RAII的,在类声明时自动上锁,在生命周期结束时自动解锁
交互任务
生产者与消费者交互的地点是阻塞队列,而他们交互的内容可以是数据、对象、任务
我们这样定义,生产者负责布置任务,然后通过阻塞队列,传递给消费者,让消费者完成任务
这里的任务可以是很多,例如抢票、数据处理等等
我们设置一个随机计算加减乘除的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| #pragma once #include <iostream> #include <string> #include <unistd.h>
#define DEFAULT_VALUE 0
enum { OK = 0, div_zero, mod_zero, unknow };
const std::string operators = "+-*/%___";
class Task { public: Task() { } Task(int x, int y, char op) : data_x(x), data_y(y), Operator(op), result(DEFAULT_VALUE), code(OK) { }
void Run() { switch (Operator) { case '+': result = data_x + data_y; break; case '-': result = data_x - data_y; break; case '*': result = data_x * data_y; break; case '/': if (data_y == 0) code = div_zero; else result = data_x / data_y; break; case '%': if (data_y == 0) code = mod_zero; else result = data_x % data_y; break; default: code = unknow; break; } }
void operator()() { Run(); sleep(2); }
std::string PrintTask() { return std::to_string(data_x) + Operator + std::to_string(data_y) + "=?"; }
std::string PrintResult() { return std::to_string(data_x) + Operator + std::to_string(data_y) + "=" + std::to_string(result) + "[" + std::to_string(code) + "]"; } ~Task() { }
private: int data_x; int data_y; char Operator;
int result; int code; };
|
阻塞队列
接下来就是阻塞队列的实现,要注意分别提供给生产者和消费者的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| #pragma once #include <iostream> #include <queue> #include "LockGuard.hpp"
const int TOP = 5;
template <class T> class BlockQueue { public: BlockQueue(int top = TOP) : _capacity(TOP) { } bool IsFull() { return _q.size() == _capacity; } bool IsEmpty() { return _q.size() == 0; } void Push(const T &in) { LockGuard lockguard(&_mutex); while (IsFull) { pthread_cond_wait(&_p_cond, &_mutex); } _q.push(in); pthread_cond_signal(&_c_cond); } void Pop(T *out) { LockGuard lockguard(&_mutex); while (IsEmpty()) { pthread_cond_wait(&_c_cond, &_mutex); } *out = _q.front(); _q.pop(); pthread_cond_signal(&_p_cond); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_p_cond); pthread_cond_destroy(&_c_cond); } private: std::queue<T> _q; int _capacity; pthread_mutex_t _mutex; pthread_cond_t _p_cond; pthread_cond_t _c_cond; };
|
测试
最后是测试程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| #pragma once #include <iostream> #include <queue> #include "LockGuard.hpp"
const int TOP = 5;
template <class T> class BlockQueue { public: BlockQueue(int top = TOP) : _capacity(TOP) { } bool IsFull() { return _q.size() == _capacity; } bool IsEmpty() { return _q.size() == 0; } void Push(const T &in) { LockGuard lockguard(&_mutex); while (IsFull()) { pthread_cond_wait(&_p_cond, &_mutex); } _q.push(in); pthread_cond_signal(&_c_cond); } void Pop(T *out) { LockGuard lockguard(&_mutex); while (IsEmpty()) { pthread_cond_wait(&_c_cond, &_mutex); } *out = _q.front(); _q.pop(); pthread_cond_signal(&_p_cond); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_p_cond); pthread_cond_destroy(&_c_cond); } private: std::queue<T> _q; int _capacity; pthread_mutex_t _mutex; pthread_cond_t _p_cond; pthread_cond_t _c_cond; };
|