14#include "muduo/protobuf/codec.h"
15#include "muduo/protobuf/dispatcher.h"
16#include "muduo/base/Logging.h"
17#include "muduo/base/Mutex.h"
18#include "muduo/net/EventLoop.h"
19#include "muduo/net/TcpServer.h"
24#include "../common/threadpool.hpp"
25#include "../common/msg.pb.h"
26#include "../common/protocol.pb.h"
27#include "../common/logger.hpp"
39 using MessagePtr = std::shared_ptr<google::protobuf::Message>;
45 "Server", muduo::net::TcpServer::kReusePort),
47 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
48 _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &
_dispatcher,
49 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
61 std::placeholders::_2, std::placeholders::_3));
63 std::placeholders::_2, std::placeholders::_3));
65 std::placeholders::_2, std::placeholders::_3));
67 std::placeholders::_2, std::placeholders::_3));
69 std::placeholders::_2, std::placeholders::_3));
71 std::placeholders::_2, std::placeholders::_3));
73 std::placeholders::_2, std::placeholders::_3));
75 std::placeholders::_2, std::placeholders::_3));
77 std::placeholders::_2, std::placeholders::_3));
79 std::placeholders::_2, std::placeholders::_3));
81 std::placeholders::_2, std::placeholders::_3));
83 std::placeholders::_2, std::placeholders::_3));
84 _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage,
_codec.get(), std::placeholders::_1,
85 std::placeholders::_2, std::placeholders::_3));
106 if (mconn.get() ==
nullptr)
108 error(
logger,
"打开信道时 没有找到连接对应的Connection对象!");
112 return mconn->openChannel(message);
123 if (mconn.get() ==
nullptr)
125 error(
logger,
"关闭信道时 没有找到连接对应的Connection对象!");
129 return mconn->closeChannel(message);
140 if (mconn.get() ==
nullptr)
142 error(
logger,
"声明交换机时 没有找到连接对应的Connection对象!");
147 if (cp.get() ==
nullptr)
149 error(
logger,
"声明交换机时 没有找到信道!");
152 return cp->declareExchange(message);
163 if (mconn.get() ==
nullptr)
165 error(
logger,
"删除交换机时 没有找到连接对应的Connection对象!");
170 if (cp.get() ==
nullptr)
172 error(
logger,
"删除交换机时 没有找到信道!");
175 return cp->deleteExchange(message);
186 if (mconn.get() ==
nullptr)
188 error(
logger,
"声明队列时 没有找到连接对应的Connection对象!");
193 if (cp.get() ==
nullptr)
195 error(
logger,
"声明队列时 没有找到信道!");
198 return cp->declareQueue(message);
209 if (mconn.get() ==
nullptr)
211 error(
logger,
"删除队列时 没有找到连接对应的Connection对象!");
216 if (cp.get() ==
nullptr)
218 error(
logger,
"删除队列时 没有找到信道!");
221 return cp->deleteQueue(message);
232 if (mconn.get() ==
nullptr)
234 error(
logger,
"队列绑定时 没有找到连接对应的Connection对象!");
239 if (cp.get() ==
nullptr)
241 error(
logger,
"队列绑定时 没有找到信道!");
244 return cp->queueBind(message);
255 if (mconn.get() ==
nullptr)
257 error(
logger,
"队列解绑时 没有找到连接对应的Connection对象!");
262 if (cp.get() ==
nullptr)
264 error(
logger,
"队列解绑时 没有找到信道!");
267 return cp->queueUnBind(message);
278 if (mconn.get() ==
nullptr)
280 error(
logger,
"消息发布时 没有找到连接对应的Connection对象!");
285 if (cp.get() ==
nullptr)
287 error(
logger,
"消息发布时 没有找到信道!");
290 return cp->basicPublish(message);
301 if (mconn.get() ==
nullptr)
303 error(
logger,
"消息确认时 没有找到连接对应的Connection对象!");
308 if (cp.get() ==
nullptr)
310 error(
logger,
"消息确认时 没有找到信道!");
313 return cp->basicAck(message);
324 if (mconn.get() ==
nullptr)
326 error(
logger,
"消息订阅时 没有找到连接对应的Connection对象!");
331 if (cp.get() ==
nullptr)
333 error(
logger,
"消息订阅时 没有找到信道!");
336 return cp->basicConsume(message);
347 if (mconn.get() ==
nullptr)
349 error(
logger,
"取消订阅时 没有找到连接对应的Connection对象!");
354 if (cp.get() ==
nullptr)
356 error(
logger,
"取消订阅时 没有找到信道!");
359 return cp->basicCancel(message);
369 INFO(
"onUnknowMessage: %s", message->GetTypeName());
378 if (conn->connected())
std::shared_ptr< Channel > ptr
信道句柄
Definition channel.hpp:32
std::shared_ptr< Connection > ptr
连接管理句柄
Definition connection.hpp:31
连接管理类
Definition connection.hpp:87
std::shared_ptr< ConnectionManager > ptr
连接管理句柄
Definition connection.hpp:89
消费者队列管理器
Definition consumer.hpp:150
std::shared_ptr< ConsumerManager > ptr
消息队列管理器指针
Definition consumer.hpp:152
服务器类,负责处理客户端请求、管理连接、分发消息。
Definition broker.hpp:37
void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const basicConsumeRequestPtr message, muduo::Timestamp)
处理消息订阅的请求
Definition broker.hpp:321
ProtobufCodecPtr _codec
protobuf协议处理器 -> 对收到的请求数据进行protobuf协议处理
Definition broker.hpp:392
void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const basicCancelRequestPtr message, muduo::Timestamp)
处理取消订阅的请求
Definition broker.hpp:344
void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const declareExchangeRequestPtr message, muduo::Timestamp)
处理声明交换机的请求
Definition broker.hpp:137
void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const closeChannelRequestPtr message, muduo::Timestamp)
处理关闭信道的请求
Definition broker.hpp:120
void onQueueBind(const muduo::net::TcpConnectionPtr &conn, const queueBindRequestPtr message, muduo::Timestamp)
处理队列绑定的请求
Definition broker.hpp:229
ProtobufDispatcher _dispatcher
请求分发器对象 -> 注册请求处理函数
Definition broker.hpp:391
void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn, const deleteQueueRequestPtr message, muduo::Timestamp)
处理删除队列的请求
Definition broker.hpp:206
muduo::net::TcpServer _server
服务器对象
Definition broker.hpp:390
void onDeClareQueue(const muduo::net::TcpConnectionPtr &conn, const declareQueueRequestPtr message, muduo::Timestamp)
处理声明队列的请求
Definition broker.hpp:183
void onConnection(const muduo::net::TcpConnectionPtr &conn)
处理新连接的回调函数
Definition broker.hpp:376
muduo::net::EventLoop _baseloop
基础时间循环
Definition broker.hpp:389
void onUnknowMessage(const muduo::net::TcpConnectionPtr &conn, const Server::MessagePtr message, muduo::Timestamp)
处理未知消息类型的请求
Definition broker.hpp:367
ConnectionManager::ptr _connection_manager
连接管理句柄
Definition broker.hpp:395
void start()
启动服务器,开始监听并处理客户端请求
Definition broker.hpp:89
ConsumerManager::ptr _consumer_manager
消费者管理句柄
Definition broker.hpp:394
void onBasicAck(const muduo::net::TcpConnectionPtr &conn, const basicAckRequestPtr message, muduo::Timestamp)
处理消息应答的请求
Definition broker.hpp:298
std::shared_ptr< google::protobuf::Message > MessagePtr
protobuf消息的智能指针类型定义
Definition broker.hpp:39
void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const deleteExchangeRequestPtr message, muduo::Timestamp)
处理删除交换机的请求
Definition broker.hpp:160
void onQueueUnBind(const muduo::net::TcpConnectionPtr &conn, const queueUnBindRequestPtr message, muduo::Timestamp)
处理队列解绑的请求
Definition broker.hpp:252
void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const basicPublishRequestPtr message, muduo::Timestamp)
处理消息发布的请求
Definition broker.hpp:275
bool onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const openChannelRequestPtr message, muduo::Timestamp)
处理打开信道的请求
Definition broker.hpp:103
Server(int port, const std::string &basedir)
Server类的构造函数
Definition broker.hpp:44
VirtualHost::ptr _virtual_host
虚拟机句柄
Definition broker.hpp:393
threadpool::ptr _threadpool
线程池管理句柄
Definition broker.hpp:396
虚拟机模块
Definition host.hpp:23
std::shared_ptr< VirtualHost > ptr
Definition host.hpp:25
Definition protocol.pb.h:2015
Definition protocol.pb.h:2428
Definition protocol.pb.h:2216
Definition protocol.pb.h:1794
Definition protocol.pb.h:293
Definition protocol.pb.h:490
Definition protocol.pb.h:947
Definition protocol.pb.h:734
Definition protocol.pb.h:1191
Definition protocol.pb.h:124
Definition protocol.pb.h:1376
Definition protocol.pb.h:1593
Definition threadpool.hpp:16
std::shared_ptr< threadpool > ptr
Definition threadpool.hpp:18
Definition channel.hpp:22
std::shared_ptr< openChannelRequest > openChannelRequestPtr
打开信道请求
Definition channel.hpp:33
const char * HOSTNAME
虚拟机名称
Definition broker.hpp:32
std::shared_ptr< deleteQueueRequest > deleteQueueRequestPtr
删除队列请求
Definition channel.hpp:38
std::shared_ptr< basicPublishRequest > basicPublishRequestPtr
消息发布请求
Definition channel.hpp:41
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::shared_ptr< declareQueueRequest > declareQueueRequestPtr
声明队列请求
Definition channel.hpp:37
std::shared_ptr< queueUnBindRequest > queueUnBindRequestPtr
解除绑定请求
Definition channel.hpp:40
std::shared_ptr< declareExchangeRequest > declareExchangeRequestPtr
声明交换机请求
Definition channel.hpp:35
std::shared_ptr< basicConsumeRequest > basicConsumeRequestPtr
取消订阅请求
Definition channel.hpp:44
std::shared_ptr< closeChannelRequest > closeChannelRequestPtr
关闭信道请求
Definition channel.hpp:34
std::shared_ptr< ProtobufCodec > ProtobufCodecPtr
协议处理句柄
Definition channel.hpp:24
std::shared_ptr< basicAckRequest > basicAckRequestPtr
消息应答请求
Definition channel.hpp:42
std::shared_ptr< basicCancelRequest > basicCancelRequestPtr
取消订阅请求
Definition channel.hpp:43
std::shared_ptr< queueBindRequest > queueBindRequestPtr
绑定请求
Definition channel.hpp:39
const char * DBFILE
数据库名称
Definition broker.hpp:31
std::unordered_map< std::string, MsgQueue::ptr > QueueMap
消息队列映射表 消息队列名称->消息队列指针
Definition queue.hpp:82
std::shared_ptr< deleteExchangeRequest > deleteExchangeRequestPtr
删除交换机请求
Definition channel.hpp:36
声明了连接类和连接管理类,主要用于管理连接的建立、维护与信道的操作。