Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
broker.hpp
浏览该文件的文档.
1
13#pragma once
14#include "muduo/protobuf/codec.h"
15#include "muduo/protobuf/dispatcher.h"
16#include "muduo/base/Logging.h"
17#include "muduo/base/Mutex.h"
18#include "muduo/net/EventLoop.h"
19#include "muduo/net/TcpServer.h"
20
21#include "connection.hpp"
22#include "consumer.hpp"
23#include "host.hpp"
24#include "../common/threadpool.hpp"
25#include "../common/msg.pb.h"
26#include "../common/protocol.pb.h"
27#include "../common/logger.hpp"
28
29namespace XuMQ
30{
31 const char *DBFILE = "/meta.db";
32 const char *HOSTNAME = "VirtualHost";
33
36 class Server
37 {
38 public:
39 using MessagePtr = std::shared_ptr<google::protobuf::Message>;
40
44 Server(int port, const std::string &basedir) : _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),
45 "Server", muduo::net::TcpServer::kReusePort),
46 _dispatcher(std::bind(&Server::onUnknowMessage, this,
47 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
48 _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
49 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
50 _virtual_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),
51 _consumer_manager(std::make_shared<ConsumerManager>()),
52 _connection_manager(std::make_shared<ConnectionManager>()),
53 _threadpool(std::make_shared<threadpool>())
54 {
55 // 初始化队列的消费者管理结构
56 QueueMap qm = _virtual_host->allQueues();
57 for (auto &q : qm)
58 _consumer_manager->initQueueConsumer(q.first);
59 // 注册业务请求处理函数
60 _dispatcher.registerMessageCallback<XuMQ::openChannelRequest>(std::bind(&Server::onOpenChannel, this, std::placeholders::_1,
61 std::placeholders::_2, std::placeholders::_3));
62 _dispatcher.registerMessageCallback<XuMQ::closeChannelRequest>(std::bind(&Server::onCloseChannel, this, std::placeholders::_1,
63 std::placeholders::_2, std::placeholders::_3));
64 _dispatcher.registerMessageCallback<XuMQ::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this, std::placeholders::_1,
65 std::placeholders::_2, std::placeholders::_3));
66 _dispatcher.registerMessageCallback<XuMQ::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this, std::placeholders::_1,
67 std::placeholders::_2, std::placeholders::_3));
68 _dispatcher.registerMessageCallback<XuMQ::declareQueueRequest>(std::bind(&Server::onDeClareQueue, this, std::placeholders::_1,
69 std::placeholders::_2, std::placeholders::_3));
70 _dispatcher.registerMessageCallback<XuMQ::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this, std::placeholders::_1,
71 std::placeholders::_2, std::placeholders::_3));
72 _dispatcher.registerMessageCallback<XuMQ::queueBindRequest>(std::bind(&Server::onQueueBind, this, std::placeholders::_1,
73 std::placeholders::_2, std::placeholders::_3));
74 _dispatcher.registerMessageCallback<XuMQ::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this, std::placeholders::_1,
75 std::placeholders::_2, std::placeholders::_3));
76 _dispatcher.registerMessageCallback<XuMQ::basicPublishRequest>(std::bind(&Server::onBasicPublish, this, std::placeholders::_1,
77 std::placeholders::_2, std::placeholders::_3));
78 _dispatcher.registerMessageCallback<XuMQ::basicAckRequest>(std::bind(&Server::onBasicAck, this, std::placeholders::_1,
79 std::placeholders::_2, std::placeholders::_3));
80 _dispatcher.registerMessageCallback<XuMQ::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this, std::placeholders::_1,
81 std::placeholders::_2, std::placeholders::_3));
82 _dispatcher.registerMessageCallback<XuMQ::basicCancelRequest>(std::bind(&Server::onBasicCancel, this, std::placeholders::_1,
83 std::placeholders::_2, std::placeholders::_3));
84 _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1,
85 std::placeholders::_2, std::placeholders::_3));
86 _server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));
87 }
89 void start()
90 {
91 _server.start();
92 _baseloop.loop();
93 }
94
95 private:
103 bool onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const openChannelRequestPtr message, muduo::Timestamp)
104 {
105 Connection::ptr mconn = _connection_manager->getConnection(conn);
106 if (mconn.get() == nullptr)
107 {
108 error(logger, "打开信道时 没有找到连接对应的Connection对象!");
109 conn->shutdown();
110 return false;
111 }
112 return mconn->openChannel(message);
113 }
120 void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const closeChannelRequestPtr message, muduo::Timestamp)
121 {
122 Connection::ptr mconn = _connection_manager->getConnection(conn);
123 if (mconn.get() == nullptr)
124 {
125 error(logger, "关闭信道时 没有找到连接对应的Connection对象!");
126 conn->shutdown();
127 return;
128 }
129 return mconn->closeChannel(message);
130 }
137 void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const declareExchangeRequestPtr message, muduo::Timestamp)
138 {
139 Connection::ptr mconn = _connection_manager->getConnection(conn);
140 if (mconn.get() == nullptr)
141 {
142 error(logger, "声明交换机时 没有找到连接对应的Connection对象!");
143 conn->shutdown();
144 return;
145 }
146 Channel::ptr cp = mconn->getChannel(message->cid());
147 if (cp.get() == nullptr)
148 {
149 error(logger, "声明交换机时 没有找到信道!");
150 return;
151 }
152 return cp->declareExchange(message);
153 }
160 void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const deleteExchangeRequestPtr message, muduo::Timestamp)
161 {
162 Connection::ptr mconn = _connection_manager->getConnection(conn);
163 if (mconn.get() == nullptr)
164 {
165 error(logger, "删除交换机时 没有找到连接对应的Connection对象!");
166 conn->shutdown();
167 return;
168 }
169 Channel::ptr cp = mconn->getChannel(message->cid());
170 if (cp.get() == nullptr)
171 {
172 error(logger, "删除交换机时 没有找到信道!");
173 return;
174 }
175 return cp->deleteExchange(message);
176 }
183 void onDeClareQueue(const muduo::net::TcpConnectionPtr &conn, const declareQueueRequestPtr message, muduo::Timestamp)
184 {
185 Connection::ptr mconn = _connection_manager->getConnection(conn);
186 if (mconn.get() == nullptr)
187 {
188 error(logger, "声明队列时 没有找到连接对应的Connection对象!");
189 conn->shutdown();
190 return;
191 }
192 Channel::ptr cp = mconn->getChannel(message->cid());
193 if (cp.get() == nullptr)
194 {
195 error(logger, "声明队列时 没有找到信道!");
196 return;
197 }
198 return cp->declareQueue(message);
199 }
206 void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn, const deleteQueueRequestPtr message, muduo::Timestamp)
207 {
208 Connection::ptr mconn = _connection_manager->getConnection(conn);
209 if (mconn.get() == nullptr)
210 {
211 error(logger, "删除队列时 没有找到连接对应的Connection对象!");
212 conn->shutdown();
213 return;
214 }
215 Channel::ptr cp = mconn->getChannel(message->cid());
216 if (cp.get() == nullptr)
217 {
218 error(logger, "删除队列时 没有找到信道!");
219 return;
220 }
221 return cp->deleteQueue(message);
222 }
229 void onQueueBind(const muduo::net::TcpConnectionPtr &conn, const queueBindRequestPtr message, muduo::Timestamp)
230 {
231 Connection::ptr mconn = _connection_manager->getConnection(conn);
232 if (mconn.get() == nullptr)
233 {
234 error(logger, "队列绑定时 没有找到连接对应的Connection对象!");
235 conn->shutdown();
236 return;
237 }
238 Channel::ptr cp = mconn->getChannel(message->cid());
239 if (cp.get() == nullptr)
240 {
241 error(logger, "队列绑定时 没有找到信道!");
242 return;
243 }
244 return cp->queueBind(message);
245 }
252 void onQueueUnBind(const muduo::net::TcpConnectionPtr &conn, const queueUnBindRequestPtr message, muduo::Timestamp)
253 {
254 Connection::ptr mconn = _connection_manager->getConnection(conn);
255 if (mconn.get() == nullptr)
256 {
257 error(logger, "队列解绑时 没有找到连接对应的Connection对象!");
258 conn->shutdown();
259 return;
260 }
261 Channel::ptr cp = mconn->getChannel(message->cid());
262 if (cp.get() == nullptr)
263 {
264 error(logger, "队列解绑时 没有找到信道!");
265 return;
266 }
267 return cp->queueUnBind(message);
268 }
275 void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const basicPublishRequestPtr message, muduo::Timestamp)
276 {
277 Connection::ptr mconn = _connection_manager->getConnection(conn);
278 if (mconn.get() == nullptr)
279 {
280 error(logger, "消息发布时 没有找到连接对应的Connection对象!");
281 conn->shutdown();
282 return;
283 }
284 Channel::ptr cp = mconn->getChannel(message->cid());
285 if (cp.get() == nullptr)
286 {
287 error(logger, "消息发布时 没有找到信道!");
288 return;
289 }
290 return cp->basicPublish(message);
291 }
298 void onBasicAck(const muduo::net::TcpConnectionPtr &conn, const basicAckRequestPtr message, muduo::Timestamp)
299 {
300 Connection::ptr mconn = _connection_manager->getConnection(conn);
301 if (mconn.get() == nullptr)
302 {
303 error(logger, "消息确认时 没有找到连接对应的Connection对象!");
304 conn->shutdown();
305 return;
306 }
307 Channel::ptr cp = mconn->getChannel(message->cid());
308 if (cp.get() == nullptr)
309 {
310 error(logger, "消息确认时 没有找到信道!");
311 return;
312 }
313 return cp->basicAck(message);
314 }
321 void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const basicConsumeRequestPtr message, muduo::Timestamp)
322 {
323 Connection::ptr mconn = _connection_manager->getConnection(conn);
324 if (mconn.get() == nullptr)
325 {
326 error(logger, "消息订阅时 没有找到连接对应的Connection对象!");
327 conn->shutdown();
328 return;
329 }
330 Channel::ptr cp = mconn->getChannel(message->cid());
331 if (cp.get() == nullptr)
332 {
333 error(logger, "消息订阅时 没有找到信道!");
334 return;
335 }
336 return cp->basicConsume(message);
337 }
344 void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const basicCancelRequestPtr message, muduo::Timestamp)
345 {
346 Connection::ptr mconn = _connection_manager->getConnection(conn);
347 if (mconn.get() == nullptr)
348 {
349 error(logger, "取消订阅时 没有找到连接对应的Connection对象!");
350 conn->shutdown();
351 return;
352 }
353 Channel::ptr cp = mconn->getChannel(message->cid());
354 if (cp.get() == nullptr)
355 {
356 error(logger, "取消订阅时 没有找到信道!");
357 return;
358 }
359 return cp->basicCancel(message);
360 }
367 void onUnknowMessage(const muduo::net::TcpConnectionPtr &conn, const Server::MessagePtr message, muduo::Timestamp)
368 {
369 INFO("onUnknowMessage: %s", message->GetTypeName());
370 conn->shutdown();
371 }
376 void onConnection(const muduo::net::TcpConnectionPtr &conn)
377 {
378 if (conn->connected())
379 {
381 }
382 else
383 {
384 _connection_manager->deleteConnection(conn);
385 }
386 }
387
388 private:
389 muduo::net::EventLoop _baseloop;
390 muduo::net::TcpServer _server;
391 ProtobufDispatcher _dispatcher;
397 };
398}
std::shared_ptr< Channel > ptr
信道句柄
Definition channel.hpp:32
std::shared_ptr< Connection > ptr
连接管理句柄
Definition connection.hpp:31
连接管理类
Definition connection.hpp:87
std::shared_ptr< ConnectionManager > ptr
连接管理句柄
Definition connection.hpp:89
消费者队列管理器
Definition consumer.hpp:150
std::shared_ptr< ConsumerManager > ptr
消息队列管理器指针
Definition consumer.hpp:152
服务器类,负责处理客户端请求、管理连接、分发消息。
Definition broker.hpp:37
void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const basicConsumeRequestPtr message, muduo::Timestamp)
处理消息订阅的请求
Definition broker.hpp:321
ProtobufCodecPtr _codec
protobuf协议处理器 -> 对收到的请求数据进行protobuf协议处理
Definition broker.hpp:392
void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const basicCancelRequestPtr message, muduo::Timestamp)
处理取消订阅的请求
Definition broker.hpp:344
void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const declareExchangeRequestPtr message, muduo::Timestamp)
处理声明交换机的请求
Definition broker.hpp:137
void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const closeChannelRequestPtr message, muduo::Timestamp)
处理关闭信道的请求
Definition broker.hpp:120
void onQueueBind(const muduo::net::TcpConnectionPtr &conn, const queueBindRequestPtr message, muduo::Timestamp)
处理队列绑定的请求
Definition broker.hpp:229
ProtobufDispatcher _dispatcher
请求分发器对象 -> 注册请求处理函数
Definition broker.hpp:391
void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn, const deleteQueueRequestPtr message, muduo::Timestamp)
处理删除队列的请求
Definition broker.hpp:206
muduo::net::TcpServer _server
服务器对象
Definition broker.hpp:390
void onDeClareQueue(const muduo::net::TcpConnectionPtr &conn, const declareQueueRequestPtr message, muduo::Timestamp)
处理声明队列的请求
Definition broker.hpp:183
void onConnection(const muduo::net::TcpConnectionPtr &conn)
处理新连接的回调函数
Definition broker.hpp:376
muduo::net::EventLoop _baseloop
基础时间循环
Definition broker.hpp:389
void onUnknowMessage(const muduo::net::TcpConnectionPtr &conn, const Server::MessagePtr message, muduo::Timestamp)
处理未知消息类型的请求
Definition broker.hpp:367
ConnectionManager::ptr _connection_manager
连接管理句柄
Definition broker.hpp:395
void start()
启动服务器,开始监听并处理客户端请求
Definition broker.hpp:89
ConsumerManager::ptr _consumer_manager
消费者管理句柄
Definition broker.hpp:394
void onBasicAck(const muduo::net::TcpConnectionPtr &conn, const basicAckRequestPtr message, muduo::Timestamp)
处理消息应答的请求
Definition broker.hpp:298
std::shared_ptr< google::protobuf::Message > MessagePtr
protobuf消息的智能指针类型定义
Definition broker.hpp:39
void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const deleteExchangeRequestPtr message, muduo::Timestamp)
处理删除交换机的请求
Definition broker.hpp:160
void onQueueUnBind(const muduo::net::TcpConnectionPtr &conn, const queueUnBindRequestPtr message, muduo::Timestamp)
处理队列解绑的请求
Definition broker.hpp:252
void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const basicPublishRequestPtr message, muduo::Timestamp)
处理消息发布的请求
Definition broker.hpp:275
bool onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const openChannelRequestPtr message, muduo::Timestamp)
处理打开信道的请求
Definition broker.hpp:103
Server(int port, const std::string &basedir)
Server类的构造函数
Definition broker.hpp:44
VirtualHost::ptr _virtual_host
虚拟机句柄
Definition broker.hpp:393
threadpool::ptr _threadpool
线程池管理句柄
Definition broker.hpp:396
虚拟机模块
Definition host.hpp:23
std::shared_ptr< VirtualHost > ptr
Definition host.hpp:25
Definition protocol.pb.h:2015
Definition protocol.pb.h:2428
Definition protocol.pb.h:2216
Definition protocol.pb.h:1794
Definition protocol.pb.h:293
Definition protocol.pb.h:490
Definition protocol.pb.h:947
Definition protocol.pb.h:734
Definition protocol.pb.h:1191
Definition protocol.pb.h:124
Definition protocol.pb.h:1376
Definition protocol.pb.h:1593
Definition threadpool.hpp:16
std::shared_ptr< threadpool > ptr
Definition threadpool.hpp:18
XuMQ 虚拟机模块定义
Definition channel.hpp:22
std::shared_ptr< openChannelRequest > openChannelRequestPtr
打开信道请求
Definition channel.hpp:33
const char * HOSTNAME
虚拟机名称
Definition broker.hpp:32
std::shared_ptr< deleteQueueRequest > deleteQueueRequestPtr
删除队列请求
Definition channel.hpp:38
std::shared_ptr< basicPublishRequest > basicPublishRequestPtr
消息发布请求
Definition channel.hpp:41
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::shared_ptr< declareQueueRequest > declareQueueRequestPtr
声明队列请求
Definition channel.hpp:37
std::shared_ptr< queueUnBindRequest > queueUnBindRequestPtr
解除绑定请求
Definition channel.hpp:40
std::shared_ptr< declareExchangeRequest > declareExchangeRequestPtr
声明交换机请求
Definition channel.hpp:35
std::shared_ptr< basicConsumeRequest > basicConsumeRequestPtr
取消订阅请求
Definition channel.hpp:44
std::shared_ptr< closeChannelRequest > closeChannelRequestPtr
关闭信道请求
Definition channel.hpp:34
std::shared_ptr< ProtobufCodec > ProtobufCodecPtr
协议处理句柄
Definition channel.hpp:24
std::shared_ptr< basicAckRequest > basicAckRequestPtr
消息应答请求
Definition channel.hpp:42
std::shared_ptr< basicCancelRequest > basicCancelRequestPtr
取消订阅请求
Definition channel.hpp:43
std::shared_ptr< queueBindRequest > queueBindRequestPtr
绑定请求
Definition channel.hpp:39
const char * DBFILE
数据库名称
Definition broker.hpp:31
std::unordered_map< std::string, MsgQueue::ptr > QueueMap
消息队列映射表 消息队列名称->消息队列指针
Definition queue.hpp:82
std::shared_ptr< deleteExchangeRequest > deleteExchangeRequestPtr
删除交换机请求
Definition channel.hpp:36
声明了连接类和连接管理类,主要用于管理连接的建立、维护与信道的操作。
消费者管理模块