Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
channel.hpp
浏览该文件的文档.
1
13#pragma once
14#include "muduo/net/TcpConnection.h"
15#include "muduo/protobuf/codec.h"
16#include "muduo/protobuf/dispatcher.h"
17#include "muduo/base/Atomic.h"
18#include "muduo/base/Types.h"
19
20#include "../common/logger.hpp"
21#include "../common/helper.hpp"
22#include "../common/msg.pb.h"
23#include "../common/protocol.pb.h"
24#include "../common/threadpool.hpp"
25#include <google/protobuf/map.h>
26#include "consumer.hpp"
27#include "host.hpp"
28#include "route.hpp"
29
30namespace XuMQ
31{
32 using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
33 using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;
34 using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;
35 using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;
36 using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;
37 using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;
38 using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;
39 using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;
40 using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;
41 using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;
42 using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;
43 using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;
44 using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;
47 class Channel
48 {
49 public:
50 using ptr = std::shared_ptr<Channel>;
58 Channel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp,
59 const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool)
60 : _cid(id), _conn(conn), _codec(codec), _cmp(cmp), _host(host), _pool(pool) {}
63 {
64 if (_consumer.get() != nullptr)
65 _cmp->remove(_consumer->tag, _consumer->qname);
66 }
70 {
71 bool ret = _host->declareExchange(req->exchange_name(), req->exchange_type(),
72 req->durable(), req->auto_delete(), req->args());
73 basicRespFunc(ret, req->rid(), req->cid());
74 }
78 {
79 _host->deleteExchange(req->exchange_name());
80 basicRespFunc(true, req->rid(), req->cid());
81 }
85 {
86 bool ret = _host->declareQueue(req->queue_name(), req->durable(),
87 req->exclusive(), req->auto_delete(), req->args());
88 if (ret == false)
89 basicRespFunc(ret, req->rid(), req->cid());
90 debug(logger, "声明队列成功 队列名称为%s", req->queue_name().c_str());
91 _cmp->initQueueConsumer(req->queue_name()); // 初始化队列消费者管理句柄
92 basicRespFunc(ret, req->rid(), req->cid());
93 }
97 {
98 _cmp->destroyQueueConsumer(req->queue_name());
99 _host->deleteQueue(req->queue_name());
100 basicRespFunc(true, req->rid(), req->cid());
101 }
105 {
106 bool ret = _host->bind(req->exchange_name(), req->queue_name(), req->binding_key());
107 basicRespFunc(ret, req->rid(), req->cid());
108 }
112 {
113 _host->unbind(req->exchange_name(), req->queue_name());
114 basicRespFunc(true, req->rid(), req->cid());
115 }
119 {
120 // 获取交换机
121 Exchange::ptr ep = _host->selectExchange(req->exchange_name());
122 if (ep.get() == nullptr)
123 basicRespFunc(false, req->rid(), req->cid());
124 // 获取指定交换机的绑定信息
125 MsgQueueBindingMap mqbm = _host->exchangeBindings(req->exchange_name());
126 BasicProperties *properties = nullptr;
127 std::string routing_key;
128
129 if (req->has_properties())
130 {
131 properties = req->mutable_properties();
132 routing_key = properties->routing_key();
133 }
134 for (auto &binding : mqbm)
135 {
136 // 交换路由 找到对应的队列
137 if (Router::route(ep->type, routing_key, binding.second->binding_key))
138 {
139 // 将消息添加到队列中
140 _host->basicPublish(binding.first, properties, req->body());
141 // 向线程池中添加一个消息消费任务(向指定队列的订阅者推送消息)
142 auto task = std::bind(&Channel::consume, this, binding.first);
143 _pool->push(task);
144 }
145 basicRespFunc(true, req->rid(), req->cid());
146 }
147 }
151 {
152 _host->basicAck(req->queue_name(), req->msg_id());
153 basicRespFunc(true, req->rid(), req->cid());
154 }
158 {
159 // 判断队列是否存在
160 bool ret = _host->existsQueue(req->queue_name());
161 if (ret == false)
162 basicRespFunc(false, req->rid(), req->cid());
163 // 创建队列消费者
164 auto cb = std::bind(&Channel::callback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
165 // 创建消费者之后 信道的角色就是消费者
166 _consumer = _cmp->create(req->consumer_tag(), req->queue_name(), req->auto_ack(), cb);
167 if(_consumer==nullptr)
168 {
169 fatal(logger,"消费者创建失败!");
170 }
171 else
172 {
173 info(logger,"消费者创建成功!");
174 }
175 basicRespFunc(true, req->rid(), req->cid());
176 }
180 {
181 _cmp->remove(req->consumer_tag(), req->queue_name());
182 basicRespFunc(true, req->rid(), req->cid());
183 }
184
185 private:
190 void basicRespFunc(bool ok, const std::string &rid, const std::string &cid)
191 {
192 basicResponse resp;
193 resp.set_rid(rid);
194 resp.set_cid(cid);
195 resp.set_ok(ok);
196 _codec->send(_conn, resp);
197 }
200 void consume(const std::string &qname)
201 {
202 MessagePtr mp = _host->basicConsume(qname);
203 if (mp.get() == nullptr)
204 {
205 error(logger, "消费任务失败, 指定队列中没有消息: %s", qname.c_str());
206 return;
207 }
208 Consumer::ptr cp = _cmp->choose(qname);
209 if (cp.get() == nullptr)
210 {
211 error(logger, "消费任务失败, 指定队列中没有消费者: %s", qname.c_str());
212 return;
213 }
214 cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());
215 if (cp->auto_ack == true)
216 _host->basicAck(qname, mp->payload().properties().id());
217 }
222 void callback(const std::string &tag, const BasicProperties *bp, const std::string &body)
223 {
225 resp.set_cid(_cid);
226 resp.set_body(body);
227 resp.set_consumer_tag(tag);
228 if (bp)
229 {
230 resp.mutable_properties()->set_id(bp->id());
233 }
234 _codec->send(_conn, resp);
235 }
236
237 private:
238 std::string _cid;
240 muduo::net::TcpConnectionPtr _conn;
245 };
248 class ChannelManager
249 {
250 public:
251 using ptr = std::shared_ptr<ChannelManager>;
252
263 bool openChannel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp,
264 const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool)
265 {
266 std::unique_lock<std::mutex> lock(_mutex);
267 auto it = _channels.find(id);
268 if (it != _channels.end())
269 return false;
270 auto channel = std::make_shared<Channel>(id, host, cmp, codec, conn, pool);
271 _channels.insert(std::make_pair(id, channel));
272 return true;
273 }
276 void closeChannel(const std::string &id)
277 {
278 std::unique_lock<std::mutex> lock(_mutex);
279 _channels.erase(id);
280 }
284 Channel::ptr getChannel(const std::string &id)
285 {
286 std::unique_lock<std::mutex> lock(_mutex);
287 auto it = _channels.find(id);
288 if (it == _channels.end())
289 return Channel::ptr();
290 return it->second;
291 }
292
293 private:
294 std::mutex _mutex;
295 std::unordered_map<std::string, Channel::ptr> _channels;
296 };
297}
Definition msg.pb.h:122
void set_routing_key(ArgT0 &&arg0, ArgT... args)
void set_id(ArgT0 &&arg0, ArgT... args)
::XuMQ::DeliveryMode delivery_mode() const
Definition msg.pb.h:737
const std::string & id() const
Definition msg.pb.h:684
const std::string & routing_key() const
Definition msg.pb.h:754
void set_delivery_mode(::XuMQ::DeliveryMode value)
Definition msg.pb.h:745
客户端信道类
Definition channel.hpp:30
Channel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool)
信道构造函数
Definition channel.hpp:58
void basicRespFunc(bool ok, const std::string &rid, const std::string &cid)
基础响应发送函数
Definition channel.hpp:190
void deleteQueue(const deleteQueueRequestPtr &req)
删除队列请求处理函数
Definition channel.hpp:96
std::string _cid
信道id
Definition channel.hpp:306
ConsumerManager::ptr _cmp
消费者管理句柄
Definition channel.hpp:242
std::shared_ptr< Channel > ptr
信道句柄
Definition channel.hpp:32
ProtobufCodecPtr _codec
协议处理句柄
Definition channel.hpp:308
void consume(const basicConsumeResponsePtr &resp)
连接收到推送消息 找到对应的消费者对象 通过回调函数进行消息处理
Definition channel.hpp:271
Consumer::ptr _consumer
信道角色描述
Definition channel.hpp:309
void basicCancel(const basicCancelRequestPtr &req)
取消订阅请求处理函数
Definition channel.hpp:179
void deleteExchange(const deleteExchangeRequestPtr &req)
删除交换机请求处理函数
Definition channel.hpp:77
void declareExchange(const declareExchangeRequestPtr &req)
声明交换机请求处理函数
Definition channel.hpp:69
std::string cid()
Definition channel.hpp:286
void declareQueue(const declareQueueRequestPtr &req)
声明队列请求处理函数
Definition channel.hpp:84
void queueBind(const queueBindRequestPtr &req)
队列绑定请求处理函数
Definition channel.hpp:104
threadpool::ptr _pool
线程池
Definition channel.hpp:244
void callback(const std::string &tag, const BasicProperties *bp, const std::string &body)
消费者回调函数
Definition channel.hpp:222
void basicConsume(const basicConsumeRequestPtr &req)
订阅消息请求处理函数
Definition channel.hpp:157
~Channel()
析构函数
Definition channel.hpp:62
VirtualHost::ptr _host
虚拟机
Definition channel.hpp:243
void queueUnBind(const queueUnBindRequestPtr &req)
队列解绑请求处理函数
Definition channel.hpp:111
void consume(const std::string &qname)
消费调用函数
Definition channel.hpp:200
muduo::net::TcpConnectionPtr _conn
连接
Definition channel.hpp:307
void basicPublish(const basicPublishRequestPtr &req)
消息发布请求处理函数
Definition channel.hpp:118
void basicAck(const basicAckRequestPtr &req)
确认消息请求处理函数
Definition channel.hpp:150
信道管理类
Definition channel.hpp:317
std::unordered_map< std::string, Channel::ptr > _channels
信道id和信道句柄的映射表
Definition channel.hpp:356
ChannelManager()
构造函数
Definition channel.hpp:254
std::shared_ptr< ChannelManager > ptr
信道管理句柄
Definition channel.hpp:319
void closeChannel(const std::string &id)
关闭信道
Definition channel.hpp:276
std::mutex _mutex
互斥锁
Definition channel.hpp:355
Channel::ptr getChannel(const std::string &id)
获取信道句柄
Definition channel.hpp:284
bool openChannel(const std::string &id, const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn, const threadpool::ptr &pool)
打开一个信道
Definition channel.hpp:263
std::shared_ptr< ConsumerManager > ptr
消息队列管理器指针
Definition consumer.hpp:152
static bool route(ExchangeType type, const std::string &routing_key, const std::string &binding_key)
路由选择
Definition route.hpp:105
std::shared_ptr< VirtualHost > ptr
Definition host.hpp:25
Definition protocol.pb.h:2629
::XuMQ::BasicProperties * mutable_properties()
Definition protocol.pb.h:5616
void set_body(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_consumer_tag(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:2834
void set_ok(bool value)
Definition protocol.pb.h:5761
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
std::shared_ptr< threadpool > ptr
Definition threadpool.hpp:18
XuMQ 虚拟机模块定义
XuMQ::ConsumerManager::ptr cmp
Definition mqconsumer.cpp:4
Definition channel.hpp:22
std::shared_ptr< openChannelRequest > openChannelRequestPtr
打开信道请求
Definition channel.hpp:33
std::shared_ptr< deleteQueueRequest > deleteQueueRequestPtr
删除队列请求
Definition channel.hpp:38
std::shared_ptr< basicPublishRequest > basicPublishRequestPtr
消息发布请求
Definition channel.hpp:41
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::shared_ptr< google::protobuf::Message > MessagePtr
消息句柄
Definition channel.hpp:23
std::shared_ptr< declareQueueRequest > declareQueueRequestPtr
声明队列请求
Definition channel.hpp:37
std::shared_ptr< queueUnBindRequest > queueUnBindRequestPtr
解除绑定请求
Definition channel.hpp:40
std::shared_ptr< declareExchangeRequest > declareExchangeRequestPtr
声明交换机请求
Definition channel.hpp:35
std::shared_ptr< basicConsumeRequest > basicConsumeRequestPtr
取消订阅请求
Definition channel.hpp:44
std::unordered_map< std::string, Binding::ptr > MsgQueueBindingMap
消息队列绑定映射表-—消息队列->绑定信息的映射表
Definition binding.hpp:43
std::shared_ptr< closeChannelRequest > closeChannelRequestPtr
关闭信道请求
Definition channel.hpp:34
std::shared_ptr< ProtobufCodec > ProtobufCodecPtr
协议处理句柄
Definition channel.hpp:24
std::shared_ptr< basicAckRequest > basicAckRequestPtr
消息应答请求
Definition channel.hpp:42
std::shared_ptr< basicCancelRequest > basicCancelRequestPtr
取消订阅请求
Definition channel.hpp:43
std::shared_ptr< queueBindRequest > queueBindRequestPtr
绑定请求
Definition channel.hpp:39
std::shared_ptr< deleteExchangeRequest > deleteExchangeRequestPtr
删除交换机请求
Definition channel.hpp:36
消费者管理模块
std::shared_ptr< Consumer > ptr
消费者结构管理指针
Definition consumer.hpp:24
std::shared_ptr< Exchange > ptr
使用智能指针管理交换机对象
Definition exchange.hpp:24