Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
consumer.hpp
浏览该文件的文档.
1
10
11#pragma once
12#include "../common/logger.hpp"
13#include "../common/helper.hpp"
14#include "../common/msg.pb.h"
15#include <iostream>
16#include <unordered_map>
17#include <vector>
18#include <mutex>
19#include <memory>
20#include <functional>
21
22namespace XuMQ
23{
24 using ConsumerCallback = std::function<void(const std::string&, const BasicProperties *, const std::string&)>;
27 struct Consumer
28 {
29 using ptr = std::shared_ptr<Consumer>;
30 std::string tag;
31 std::string qname;
32 bool auto_ack;
34
42 Consumer(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
43 : tag(ctag), qname(queue_name), auto_ack(ack), callback(cb) {}
44 };
45
49 {
50 public:
51 using ptr = std::shared_ptr<QueueConsumer>;
54 QueueConsumer(const std::string &qname)
55 : _qname(qname), _rr_seq(0) {}
62 Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
63 {
64 // 加锁
65 std::unique_lock<std::mutex> lock(_mutex);
66 // 判断消费者是否重复
67 for (auto &consumer : _consumers)
68 {
69 if (consumer->tag == ctag)
70 {
71 warn(logger, "消费者重复添加! 消费者标识: %s", ctag.c_str());
72 return Consumer::ptr();
73 }
74 }
75 // 构造对象
76 auto consumer = std::make_shared<Consumer>(ctag, queue_name, ack, cb);
77 // 添加消费者
78 _consumers.push_back(consumer);
79 return consumer;
80 }
83 void remove(const std::string &ctag)
84 {
85 std::unique_lock<std::mutex> lock(_mutex);
86 for (auto it = _consumers.begin(); it != _consumers.end(); it++)
87 {
88 if (ctag == (*it)->tag)
89 {
90 _consumers.erase(it);
91 return;
92 }
93 }
94 }
98 {
99 // 加锁
100 std::unique_lock<std::mutex> lock(_mutex);
101 if (_consumers.size() == 0)
102 {
103 error(logger, "当前消费者队列为空!");
104 return Consumer::ptr();
105 }
106 // 获取轮转到的下标
107 int idx = _rr_seq++ % _consumers.size();
108 // 返回
109 return _consumers[idx];
110 }
113 bool empty()
114 {
115 // 加锁
116 std::unique_lock<std::mutex> lock(_mutex);
117 return _consumers.size() == 0;
118 }
122 bool exists(const std::string &ctag)
123 {
124 // 加锁
125 std::unique_lock<std::mutex> lock(_mutex);
126 for (auto &consumer : _consumers)
127 if (consumer->tag == ctag)
128 return true;
129 return false;
130 }
132 void clear()
133 {
134 // 加锁
135 std::unique_lock<std::mutex> lock(_mutex);
136 _consumers.clear();
137 _rr_seq = 0;
138 }
139
140 private:
141 std::string _qname;
142 std::mutex _mutex;
143 uint64_t _rr_seq;
144 std::vector<Consumer::ptr> _consumers;
145 };
146
150 {
151 public:
152 using ptr = std::shared_ptr<ConsumerManager>;
157 void initQueueConsumer(const std::string &name)
158 {
159 // 加锁
160 std::unique_lock<std::mutex> lock(_mutex);
161 // 判断重复
162 auto it = _qconsumers.find(name);
163 if (it != _qconsumers.end())
164 {
165 return;
166 }
167 // 新增
168 auto qconsumers = std::make_shared<QueueConsumer>(name);
169 _qconsumers.insert(std::make_pair(name, qconsumers));
170 }
173 void destroyQueueConsumer(const std::string &name)
174 {
175 std::unique_lock<std::mutex> lock(_mutex);
176 _qconsumers.erase(name);
177 }
184 Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
185 {
187 {
188 std::unique_lock<std::mutex> lock(_mutex);
189 // 获取队列的消费者管理单元
190 auto it = _qconsumers.find(queue_name);
191 if (it == _qconsumers.end())
192 {
193 warn(logger, "没有找到指定队列! 队列名称: %s", queue_name.c_str());
194 return Consumer::ptr();
195 }
196 qcp = it->second;
197 }
198 // 完成新建
199 return qcp->create(ctag, queue_name, ack, cb);
200 }
204 void remove(const std::string &ctag, const std::string &queue_name)
205 {
207 {
208 std::unique_lock<std::mutex> lock(_mutex);
209 // 获取队列的消费者管理单元
210 auto it = _qconsumers.find(queue_name);
211 if (it == _qconsumers.end())
212 {
213 warn(logger, "没有找到指定队列! 队列名称: %s", queue_name.c_str());
214 return;
215 }
216 qcp = it->second;
217 }
218 qcp->remove(ctag);
219 }
223 Consumer::ptr choose(const std::string &queue_name)
224 {
226 {
227 std::unique_lock<std::mutex> lock(_mutex);
228 // 获取队列的消费者管理单元
229 auto it = _qconsumers.find(queue_name);
230 if (it == _qconsumers.end())
231 {
232 warn(logger, "没有找到指定队列! 队列名称: %s", queue_name.c_str());
233 return Consumer::ptr();
234 }
235 qcp = it->second;
236 }
237 return qcp->choose();
238 }
239
243 bool empty(const std::string &queue_name)
244 {
246 {
247 std::unique_lock<std::mutex> lock(_mutex);
248 // 获取队列的消费者管理单元
249 auto it = _qconsumers.find(queue_name);
250 if (it == _qconsumers.end())
251 {
252 warn(logger, "没有找到指定队列! 队列名称: %s", queue_name.c_str());
253 return true;
254 }
255 qcp = it->second;
256 }
257 return qcp->empty();
258 }
263 bool exists(const std::string &ctag, const std::string &queue_name)
264 {
266 {
267 std::unique_lock<std::mutex> lock(_mutex);
268 // 获取队列的消费者管理单元
269 auto it = _qconsumers.find(queue_name);
270 if (it == _qconsumers.end())
271 {
272 warn(logger, "没有找到指定队列! 队列名称: %s", queue_name.c_str());
273 return false;
274 }
275 qcp = it->second;
276 }
277 return qcp->exists(ctag);
278 }
280 void clear()
281 {
282 std::unique_lock<std::mutex> lock(_mutex);
284 _qconsumers.clear();
285 }
286
287 private:
288 std::mutex _mutex;
289 std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;
290 };
291}
消费者队列管理器
Definition consumer.hpp:150
void initQueueConsumer(const std::string &name)
初始化消费者队列
Definition consumer.hpp:157
void destroyQueueConsumer(const std::string &name)
销毁消费者队列
Definition consumer.hpp:173
std::mutex _mutex
Definition consumer.hpp:288
ConsumerManager()
无参构造
Definition consumer.hpp:154
void remove(const std::string &ctag, const std::string &queue_name)
移除队列的一个消费者
Definition consumer.hpp:204
bool exists(const std::string &ctag, const std::string &queue_name)
判断队列中的消费者是否存在
Definition consumer.hpp:263
Consumer::ptr choose(const std::string &queue_name)
获取指定队列的消费者
Definition consumer.hpp:223
std::shared_ptr< ConsumerManager > ptr
消息队列管理器指针
Definition consumer.hpp:152
bool empty(const std::string &queue_name)
判断指定队列是否为空
Definition consumer.hpp:243
void clear()
清理
Definition consumer.hpp:280
std::unordered_map< std::string, QueueConsumer::ptr > _qconsumers
Definition consumer.hpp:289
Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
向指定队列新增消费者
Definition consumer.hpp:184
以队列为单元的消费者管理类
Definition consumer.hpp:49
QueueConsumer(const std::string &qname)
构造函数
Definition consumer.hpp:54
void remove(const std::string &ctag)
移除一个消费者
Definition consumer.hpp:83
std::vector< Consumer::ptr > _consumers
消费者管理数组
Definition consumer.hpp:144
bool empty()
判断消费者队列是否为空
Definition consumer.hpp:113
Consumer::ptr choose()
获取一个消费者
Definition consumer.hpp:97
std::string _qname
队列名称
Definition consumer.hpp:141
std::mutex _mutex
互斥锁
Definition consumer.hpp:142
Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
创建一个消费者
Definition consumer.hpp:62
std::shared_ptr< QueueConsumer > ptr
消费者管理类
Definition consumer.hpp:51
uint64_t _rr_seq
轮转序号
Definition consumer.hpp:143
bool exists(const std::string &ctag)
判断消费者是否存在
Definition consumer.hpp:122
void clear()
清空
Definition consumer.hpp:132
Definition channel.hpp:22
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::function< void(const std::string &, const BasicProperties *, const std::string &)> ConsumerCallback
消费者回调函数
Definition consumer.hpp:19
Consumer(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
消费者构造函数
Definition consumer.hpp:42
ConsumerCallback callback
消费者回调函数
Definition consumer.hpp:28
std::string qname
消费者订阅的队列名称
Definition consumer.hpp:26
std::shared_ptr< Consumer > ptr
消费者结构管理指针
Definition consumer.hpp:24
bool auto_ack
自动确认标志
Definition consumer.hpp:27
std::string tag
消费者标识
Definition consumer.hpp:25
Consumer()
无参构造函数
Definition consumer.hpp:36