12#include "../common/logger.hpp"
13#include "../common/helper.hpp"
14#include "../common/msg.pb.h"
16#include <unordered_map>
24 using ConsumerCallback = std::function<void(
const std::string&,
const BasicProperties *,
const std::string&)>;
29 using ptr = std::shared_ptr<Consumer>;
51 using ptr = std::shared_ptr<QueueConsumer>;
65 std::unique_lock<std::mutex> lock(
_mutex);
69 if (consumer->tag == ctag)
71 warn(
logger,
"消费者重复添加! 消费者标识: %s", ctag.c_str());
76 auto consumer = std::make_shared<Consumer>(ctag, queue_name, ack, cb);
85 std::unique_lock<std::mutex> lock(
_mutex);
88 if (ctag == (*it)->tag)
100 std::unique_lock<std::mutex> lock(
_mutex);
103 error(
logger,
"当前消费者队列为空!");
116 std::unique_lock<std::mutex> lock(
_mutex);
125 std::unique_lock<std::mutex> lock(
_mutex);
127 if (consumer->tag == ctag)
135 std::unique_lock<std::mutex> lock(
_mutex);
152 using ptr = std::shared_ptr<ConsumerManager>;
160 std::unique_lock<std::mutex> lock(
_mutex);
168 auto qconsumers = std::make_shared<QueueConsumer>(name);
169 _qconsumers.insert(std::make_pair(name, qconsumers));
175 std::unique_lock<std::mutex> lock(
_mutex);
188 std::unique_lock<std::mutex> lock(
_mutex);
193 warn(
logger,
"没有找到指定队列! 队列名称: %s", queue_name.c_str());
199 return qcp->create(ctag, queue_name, ack, cb);
204 void remove(
const std::string &ctag,
const std::string &queue_name)
208 std::unique_lock<std::mutex> lock(
_mutex);
213 warn(
logger,
"没有找到指定队列! 队列名称: %s", queue_name.c_str());
227 std::unique_lock<std::mutex> lock(
_mutex);
232 warn(
logger,
"没有找到指定队列! 队列名称: %s", queue_name.c_str());
237 return qcp->choose();
243 bool empty(
const std::string &queue_name)
247 std::unique_lock<std::mutex> lock(
_mutex);
252 warn(
logger,
"没有找到指定队列! 队列名称: %s", queue_name.c_str());
263 bool exists(
const std::string &ctag,
const std::string &queue_name)
267 std::unique_lock<std::mutex> lock(
_mutex);
272 warn(
logger,
"没有找到指定队列! 队列名称: %s", queue_name.c_str());
277 return qcp->exists(ctag);
282 std::unique_lock<std::mutex> lock(
_mutex);
消费者队列管理器
Definition consumer.hpp:150
void initQueueConsumer(const std::string &name)
初始化消费者队列
Definition consumer.hpp:157
void destroyQueueConsumer(const std::string &name)
销毁消费者队列
Definition consumer.hpp:173
std::mutex _mutex
Definition consumer.hpp:288
ConsumerManager()
无参构造
Definition consumer.hpp:154
void remove(const std::string &ctag, const std::string &queue_name)
移除队列的一个消费者
Definition consumer.hpp:204
bool exists(const std::string &ctag, const std::string &queue_name)
判断队列中的消费者是否存在
Definition consumer.hpp:263
Consumer::ptr choose(const std::string &queue_name)
获取指定队列的消费者
Definition consumer.hpp:223
std::shared_ptr< ConsumerManager > ptr
消息队列管理器指针
Definition consumer.hpp:152
bool empty(const std::string &queue_name)
判断指定队列是否为空
Definition consumer.hpp:243
void clear()
清理
Definition consumer.hpp:280
std::unordered_map< std::string, QueueConsumer::ptr > _qconsumers
Definition consumer.hpp:289
Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
向指定队列新增消费者
Definition consumer.hpp:184
以队列为单元的消费者管理类
Definition consumer.hpp:49
QueueConsumer(const std::string &qname)
构造函数
Definition consumer.hpp:54
void remove(const std::string &ctag)
移除一个消费者
Definition consumer.hpp:83
std::vector< Consumer::ptr > _consumers
消费者管理数组
Definition consumer.hpp:144
bool empty()
判断消费者队列是否为空
Definition consumer.hpp:113
Consumer::ptr choose()
获取一个消费者
Definition consumer.hpp:97
std::string _qname
队列名称
Definition consumer.hpp:141
std::mutex _mutex
互斥锁
Definition consumer.hpp:142
Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
创建一个消费者
Definition consumer.hpp:62
std::shared_ptr< QueueConsumer > ptr
消费者管理类
Definition consumer.hpp:51
uint64_t _rr_seq
轮转序号
Definition consumer.hpp:143
bool exists(const std::string &ctag)
判断消费者是否存在
Definition consumer.hpp:122
void clear()
清空
Definition consumer.hpp:132
Definition channel.hpp:22
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::function< void(const std::string &, const BasicProperties *, const std::string &)> ConsumerCallback
消费者回调函数
Definition consumer.hpp:19
Consumer(const std::string &ctag, const std::string &queue_name, bool ack, const ConsumerCallback &cb)
消费者构造函数
Definition consumer.hpp:42
ConsumerCallback callback
消费者回调函数
Definition consumer.hpp:28
std::string qname
消费者订阅的队列名称
Definition consumer.hpp:26
std::shared_ptr< Consumer > ptr
消费者结构管理指针
Definition consumer.hpp:24
bool auto_ack
自动确认标志
Definition consumer.hpp:27
std::string tag
消费者标识
Definition consumer.hpp:25
Consumer()
无参构造函数
Definition consumer.hpp:36