10#include "muduo/net/TcpConnection.h"
11#include "muduo/protobuf/codec.h"
12#include "muduo/protobuf/dispatcher.h"
13#include "../common/logger.hpp"
14#include "../common/helper.hpp"
15#include "../common/msg.pb.h"
16#include "../common/protocol.pb.h"
19#include <condition_variable>
23 using MessagePtr = std::shared_ptr<google::protobuf::Message>;
32 using ptr = std::shared_ptr<Channel>;
75 google::protobuf::Map<std::string, std::string> &args)
117 google::protobuf::Map<std::string, std::string> &qargs)
150 bool queueBind(
const std::string &ename,
const std::string &qname,
const std::string &key)
166 void queueUnBind(
const std::string &ename,
const std::string &qname)
204 error(
logger,
"消息确认时 当前信道未设置消费者!");
236 debug(
logger,
"请求创建一个消费者 消费者tag为%s 消费者所在队列是%s",tag.c_str(), qname.c_str());
237 if (resp->ok() ==
false)
242 _consumer = std::make_shared<Consumer>(tag, qname, auto_ack, cb);
265 std::unique_lock<std::mutex> lock(
_mutex);
266 _basic_resp.insert(std::make_pair(resp->rid(), resp));
275 warn(
logger,
"消息处理时未找到订阅者信息!");
278 if (
_consumer->tag != resp->consumer_tag())
280 error(
logger,
"推送消息中消费者标识与信道消费者标识不一致!");
283 _consumer->callback(resp->consumer_tag(), resp->mutable_properties(), resp->body());
297 std::unique_lock<std::mutex> lock(
_mutex);
298 _cv.wait(lock, [&rid,
this]
311 std::condition_variable
_cv;
319 using ptr = std::shared_ptr<ChannelManager>;
328 std::unique_lock<std::mutex> lock(
_mutex);
329 auto channel = std::make_shared<Channel>(conn, codec);
330 _channels.insert(std::make_pair(channel->cid(), channel));
337 std::unique_lock<std::mutex> lock(
_mutex);
345 std::unique_lock<std::mutex> lock(
_mutex);
356 std::unordered_map<std::string, Channel::ptr>
_channels;
void set_routing_key(ArgT0 &&arg0, ArgT... args)
void set_id(ArgT0 &&arg0, ArgT... args)
::XuMQ::DeliveryMode delivery_mode() const
Definition msg.pb.h:737
const std::string & id() const
Definition msg.pb.h:684
const std::string & routing_key() const
Definition msg.pb.h:754
void set_delivery_mode(::XuMQ::DeliveryMode value)
Definition msg.pb.h:745
客户端信道类
Definition channel.hpp:30
void closeChannel()
关闭信道请求
Definition channel.hpp:55
std::condition_variable _cv
条件变量
Definition channel.hpp:311
std::string _cid
信道id
Definition channel.hpp:306
Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
构造函数
Definition channel.hpp:36
std::shared_ptr< Channel > ptr
信道句柄
Definition channel.hpp:32
ProtobufCodecPtr _codec
协议处理句柄
Definition channel.hpp:308
void basicCancel()
取消订阅
Definition channel.hpp:246
void consume(const basicConsumeResponsePtr &resp)
连接收到推送消息 找到对应的消费者对象 通过回调函数进行消息处理
Definition channel.hpp:271
basicResponsePtr waitResponse(const std::string &rid)
等待处理响应
Definition channel.hpp:295
void basicAck(const std::string &msg_id)
应答消息
Definition channel.hpp:200
Consumer::ptr _consumer
信道角色描述
Definition channel.hpp:309
void putBasicResponse(const basicResponsePtr &resp)
连接收到基础响应后向映射表添加
Definition channel.hpp:263
std::mutex _mutex
互斥锁
Definition channel.hpp:310
bool basicConsume(const std::string &tag, const std::string &qname, bool auto_ack, const ConsumerCallback &cb)
订阅消息
Definition channel.hpp:222
void queueUnBind(const std::string &ename, const std::string &qname)
解除绑定信息
Definition channel.hpp:166
std::string cid()
Definition channel.hpp:286
bool queueBind(const std::string &ename, const std::string &qname, const std::string &key)
添加绑定信息
Definition channel.hpp:150
std::unordered_map< std::string, basicResponsePtr > _basic_resp
基础响应映射表
Definition channel.hpp:312
bool openChannel()
创建信道请求
Definition channel.hpp:44
~Channel()
Definition channel.hpp:38
void deleteExchange(const std::string &name)
删除指定交换机
Definition channel.hpp:95
void basicPublish(const std::string &ename, const BasicProperties *bp, const std::string &body)
向指定交换机发布消息
Definition channel.hpp:181
bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete, google::protobuf::Map< std::string, std::string > &qargs)
声明消息队列
Definition channel.hpp:113
muduo::net::TcpConnectionPtr _conn
连接
Definition channel.hpp:307
void deleteQueue(const std::string &qname)
删除消息队列
Definition channel.hpp:134
bool declareExchange(const std::string &name, ExchangeType type, bool durable, bool auto_delete, google::protobuf::Map< std::string, std::string > &args)
声明交换机
Definition channel.hpp:71
信道管理类
Definition channel.hpp:317
std::unordered_map< std::string, Channel::ptr > _channels
信道id和信道句柄的映射表
Definition channel.hpp:356
Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
新建一个信道
Definition channel.hpp:326
ChannelManager()
构造函数
Definition channel.hpp:321
Channel::ptr get(const std::string &cid)
获取一个信道
Definition channel.hpp:343
std::shared_ptr< ChannelManager > ptr
信道管理句柄
Definition channel.hpp:319
void remove(const std::string &cid)
移除一个信道
Definition channel.hpp:335
std::mutex _mutex
互斥锁
Definition channel.hpp:355
提供生成 UUID 的工具类。
Definition helper.hpp:184
static std::string uuid()
生成一个随机 UUID。
Definition helper.hpp:194
Definition protocol.pb.h:2015
void set_msg_id(ArgT0 &&arg0, ArgT... args)
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:2428
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_consumer_tag(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:2216
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_auto_ack(bool value)
Definition protocol.pb.h:5194
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_consumer_tag(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:1794
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
void set_body(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
::XuMQ::BasicProperties * mutable_properties()
Definition protocol.pb.h:4745
void set_cid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:293
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:490
::PROTOBUF_NAMESPACE_ID::Map< std::string, std::string > * mutable_args()
Definition protocol.pb.h:3466
void set_auto_delete(bool value)
Definition protocol.pb.h:3437
void set_exchange_type(::XuMQ::ExchangeType value)
Definition protocol.pb.h:3397
void set_durable(bool value)
Definition protocol.pb.h:3417
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:947
void set_queue_name(ArgT0 &&arg0, ArgT... args)
::PROTOBUF_NAMESPACE_ID::Map< std::string, std::string > * mutable_args()
Definition protocol.pb.h:3865
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_durable(bool value)
Definition protocol.pb.h:3816
void set_exclusive(bool value)
Definition protocol.pb.h:3796
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_auto_delete(bool value)
Definition protocol.pb.h:3836
Definition protocol.pb.h:734
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:1191
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:124
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:1376
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
void set_binding_key(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_queue_name(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:1593
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
Definition channel.hpp:22
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::shared_ptr< google::protobuf::Message > MessagePtr
消息句柄
Definition channel.hpp:23
std::shared_ptr< basicResponse > basicResponsePtr
其他响应句柄
Definition channel.hpp:26
ExchangeType
Definition msg.pb.h:66
std::shared_ptr< basicConsumeResponse > basicConsumeResponsePtr
消费响应句柄
Definition channel.hpp:25
std::shared_ptr< ProtobufCodec > ProtobufCodecPtr
协议处理句柄
Definition channel.hpp:24
std::function< void(const std::string &, const BasicProperties *, const std::string &)> ConsumerCallback
消费者回调函数
Definition consumer.hpp:19
std::shared_ptr< Consumer > ptr
消费者结构管理指针
Definition consumer.hpp:24