14#include "muduo/net/TcpConnection.h"
15#include "muduo/protobuf/codec.h"
16#include "muduo/protobuf/dispatcher.h"
17#include "muduo/base/Atomic.h"
18#include "muduo/base/Types.h"
20#include "../common/logger.hpp"
21#include "../common/helper.hpp"
22#include "../common/msg.pb.h"
23#include "../common/protocol.pb.h"
24#include "../common/threadpool.hpp"
25#include <google/protobuf/map.h>
50 using ptr = std::shared_ptr<Channel>;
71 bool ret =
_host->declareExchange(req->exchange_name(), req->exchange_type(),
72 req->durable(), req->auto_delete(), req->args());
79 _host->deleteExchange(req->exchange_name());
86 bool ret =
_host->declareQueue(req->queue_name(), req->durable(),
87 req->exclusive(), req->auto_delete(), req->args());
90 debug(
logger,
"声明队列成功 队列名称为%s", req->queue_name().c_str());
91 _cmp->initQueueConsumer(req->queue_name());
98 _cmp->destroyQueueConsumer(req->queue_name());
99 _host->deleteQueue(req->queue_name());
106 bool ret =
_host->bind(req->exchange_name(), req->queue_name(), req->binding_key());
113 _host->unbind(req->exchange_name(), req->queue_name());
122 if (ep.get() ==
nullptr)
127 std::string routing_key;
129 if (req->has_properties())
131 properties = req->mutable_properties();
134 for (
auto &binding : mqbm)
137 if (
Router::route(ep->type, routing_key, binding.second->binding_key))
140 _host->basicPublish(binding.first, properties, req->body());
152 _host->basicAck(req->queue_name(), req->msg_id());
160 bool ret =
_host->existsQueue(req->queue_name());
164 auto cb = std::bind(&
Channel::callback,
this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
166 _consumer =
_cmp->create(req->consumer_tag(), req->queue_name(), req->auto_ack(), cb);
181 _cmp->remove(req->consumer_tag(), req->queue_name());
203 if (mp.get() ==
nullptr)
205 error(
logger,
"消费任务失败, 指定队列中没有消息: %s", qname.c_str());
209 if (cp.get() ==
nullptr)
211 error(
logger,
"消费任务失败, 指定队列中没有消费者: %s", qname.c_str());
214 cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());
215 if (cp->auto_ack ==
true)
216 _host->basicAck(qname, mp->payload().properties().id());
240 muduo::net::TcpConnectionPtr
_conn;
251 using ptr = std::shared_ptr<ChannelManager>;
266 std::unique_lock<std::mutex> lock(
_mutex);
270 auto channel = std::make_shared<Channel>(
id, host,
cmp, codec, conn, pool);
271 _channels.insert(std::make_pair(
id, channel));
278 std::unique_lock<std::mutex> lock(
_mutex);
286 std::unique_lock<std::mutex> lock(
_mutex);
295 std::unordered_map<std::string, Channel::ptr>
_channels;
void set_routing_key(ArgT0 &&arg0, ArgT... args)
void set_id(ArgT0 &&arg0, ArgT... args)
::XuMQ::DeliveryMode delivery_mode() const
Definition msg.pb.h:737
const std::string & id() const
Definition msg.pb.h:684
const std::string & routing_key() const
Definition msg.pb.h:754
void set_delivery_mode(::XuMQ::DeliveryMode value)
Definition msg.pb.h:745
客户端信道类
Definition channel.hpp:30
Channel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool)
信道构造函数
Definition channel.hpp:58
void basicRespFunc(bool ok, const std::string &rid, const std::string &cid)
基础响应发送函数
Definition channel.hpp:190
void deleteQueue(const deleteQueueRequestPtr &req)
删除队列请求处理函数
Definition channel.hpp:96
std::string _cid
信道id
Definition channel.hpp:306
ConsumerManager::ptr _cmp
消费者管理句柄
Definition channel.hpp:242
std::shared_ptr< Channel > ptr
信道句柄
Definition channel.hpp:32
ProtobufCodecPtr _codec
协议处理句柄
Definition channel.hpp:308
void consume(const basicConsumeResponsePtr &resp)
连接收到推送消息 找到对应的消费者对象 通过回调函数进行消息处理
Definition channel.hpp:271
Consumer::ptr _consumer
信道角色描述
Definition channel.hpp:309
void basicCancel(const basicCancelRequestPtr &req)
取消订阅请求处理函数
Definition channel.hpp:179
void deleteExchange(const deleteExchangeRequestPtr &req)
删除交换机请求处理函数
Definition channel.hpp:77
void declareExchange(const declareExchangeRequestPtr &req)
声明交换机请求处理函数
Definition channel.hpp:69
std::string cid()
Definition channel.hpp:286
void declareQueue(const declareQueueRequestPtr &req)
声明队列请求处理函数
Definition channel.hpp:84
void queueBind(const queueBindRequestPtr &req)
队列绑定请求处理函数
Definition channel.hpp:104
threadpool::ptr _pool
线程池
Definition channel.hpp:244
void callback(const std::string &tag, const BasicProperties *bp, const std::string &body)
消费者回调函数
Definition channel.hpp:222
void basicConsume(const basicConsumeRequestPtr &req)
订阅消息请求处理函数
Definition channel.hpp:157
~Channel()
析构函数
Definition channel.hpp:62
VirtualHost::ptr _host
虚拟机
Definition channel.hpp:243
void queueUnBind(const queueUnBindRequestPtr &req)
队列解绑请求处理函数
Definition channel.hpp:111
void consume(const std::string &qname)
消费调用函数
Definition channel.hpp:200
muduo::net::TcpConnectionPtr _conn
连接
Definition channel.hpp:307
void basicPublish(const basicPublishRequestPtr &req)
消息发布请求处理函数
Definition channel.hpp:118
void basicAck(const basicAckRequestPtr &req)
确认消息请求处理函数
Definition channel.hpp:150
信道管理类
Definition channel.hpp:317
std::unordered_map< std::string, Channel::ptr > _channels
信道id和信道句柄的映射表
Definition channel.hpp:356
ChannelManager()
构造函数
Definition channel.hpp:254
std::shared_ptr< ChannelManager > ptr
信道管理句柄
Definition channel.hpp:319
void closeChannel(const std::string &id)
关闭信道
Definition channel.hpp:276
std::mutex _mutex
互斥锁
Definition channel.hpp:355
Channel::ptr getChannel(const std::string &id)
获取信道句柄
Definition channel.hpp:284
bool openChannel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool)
打开一个信道
Definition channel.hpp:263
std::shared_ptr< ConsumerManager > ptr
消息队列管理器指针
Definition consumer.hpp:152
static bool route(ExchangeType type, const std::string &routing_key, const std::string &binding_key)
路由选择
Definition route.hpp:105
std::shared_ptr< VirtualHost > ptr
Definition host.hpp:25
Definition protocol.pb.h:2629
::XuMQ::BasicProperties * mutable_properties()
Definition protocol.pb.h:5616
void set_body(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_consumer_tag(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:2834
void set_ok(bool value)
Definition protocol.pb.h:5761
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
std::shared_ptr< threadpool > ptr
Definition threadpool.hpp:18
XuMQ::ConsumerManager::ptr cmp
Definition mqconsumer.cpp:4
Definition channel.hpp:22
std::shared_ptr< openChannelRequest > openChannelRequestPtr
打开信道请求
Definition channel.hpp:33
std::shared_ptr< deleteQueueRequest > deleteQueueRequestPtr
删除队列请求
Definition channel.hpp:38
std::shared_ptr< basicPublishRequest > basicPublishRequestPtr
消息发布请求
Definition channel.hpp:41
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::shared_ptr< google::protobuf::Message > MessagePtr
消息句柄
Definition channel.hpp:23
std::shared_ptr< declareQueueRequest > declareQueueRequestPtr
声明队列请求
Definition channel.hpp:37
std::shared_ptr< queueUnBindRequest > queueUnBindRequestPtr
解除绑定请求
Definition channel.hpp:40
std::shared_ptr< declareExchangeRequest > declareExchangeRequestPtr
声明交换机请求
Definition channel.hpp:35
std::shared_ptr< basicConsumeRequest > basicConsumeRequestPtr
取消订阅请求
Definition channel.hpp:44
std::unordered_map< std::string, Binding::ptr > MsgQueueBindingMap
消息队列绑定映射表-—消息队列->绑定信息的映射表
Definition binding.hpp:43
std::shared_ptr< closeChannelRequest > closeChannelRequestPtr
关闭信道请求
Definition channel.hpp:34
std::shared_ptr< ProtobufCodec > ProtobufCodecPtr
协议处理句柄
Definition channel.hpp:24
std::shared_ptr< basicAckRequest > basicAckRequestPtr
消息应答请求
Definition channel.hpp:42
std::shared_ptr< basicCancelRequest > basicCancelRequestPtr
取消订阅请求
Definition channel.hpp:43
std::shared_ptr< queueBindRequest > queueBindRequestPtr
绑定请求
Definition channel.hpp:39
std::shared_ptr< deleteExchangeRequest > deleteExchangeRequestPtr
删除交换机请求
Definition channel.hpp:36
std::shared_ptr< Consumer > ptr
消费者结构管理指针
Definition consumer.hpp:24
std::shared_ptr< Exchange > ptr
使用智能指针管理交换机对象
Definition exchange.hpp:24