Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
channel.hpp
浏览该文件的文档.
1
9#pragma once
10#include "muduo/net/TcpConnection.h"
11#include "muduo/protobuf/codec.h"
12#include "muduo/protobuf/dispatcher.h"
13#include "../common/logger.hpp"
14#include "../common/helper.hpp"
15#include "../common/msg.pb.h"
16#include "../common/protocol.pb.h"
17#include "consumer.hpp"
18#include <mutex>
19#include <condition_variable>
20
21namespace XuMQ
22{
23 using MessagePtr = std::shared_ptr<google::protobuf::Message>;
24 using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
25 using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;
26 using basicResponsePtr = std::shared_ptr<basicResponse>;
29 class Channel
30 {
31 public:
32 using ptr = std::shared_ptr<Channel>;
36 Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
37 : _cid(UUIDHelper::uuid()), _conn(conn), _codec(codec) {}
39 {
41 }
45 {
46 std::string rid = UUIDHelper::uuid();
48 req.set_rid(rid);
49 req.set_cid(_cid);
50 _codec->send(_conn, req);
52 return resp->ok();
53 }
56 {
57 std::string rid = UUIDHelper::uuid();
59 req.set_rid(rid);
60 req.set_cid(_cid);
61 _codec->send(_conn, req);
62 waitResponse(rid);
63 }
71 bool declareExchange(const std::string &name,
72 ExchangeType type,
73 bool durable,
74 bool auto_delete,
75 google::protobuf::Map<std::string, std::string> &args)
76 {
77 // 构造一个声明交换机的请求对象
79 std::string rid = UUIDHelper::uuid();
80 req.set_rid(rid);
81 req.set_cid(_cid);
82 req.set_exchange_name(name);
83 req.set_exchange_type(type);
84 req.set_durable(durable);
85 req.set_auto_delete(auto_delete);
86 req.mutable_args()->swap(args);
87 // 发送请求
88 _codec->send(_conn, req);
89 // 等待服务器响应
91 return resp->ok();
92 }
95 void deleteExchange(const std::string &name)
96 {
98 std::string rid = UUIDHelper::uuid();
99 req.set_rid(rid);
100 req.set_cid(_cid);
101 req.set_exchange_name(name);
102 // 发送请求
103 _codec->send(_conn, req);
104 waitResponse(rid);
105 }
113 bool declareQueue(const std::string &qname,
114 bool qdurable,
115 bool qexclusive,
116 bool qauto_delete,
117 google::protobuf::Map<std::string, std::string> &qargs)
118 {
120 std::string rid = UUIDHelper::uuid();
121 req.set_rid(rid);
122 req.set_cid(_cid);
123 req.set_queue_name(qname);
124 req.set_exclusive(qexclusive);
125 req.set_durable(qdurable);
126 req.set_auto_delete(qauto_delete);
127 req.mutable_args()->swap(qargs);
128 _codec->send(_conn, req);
129 basicResponsePtr resp = waitResponse(rid);
130 return resp->ok();
131 }
134 void deleteQueue(const std::string &qname)
135 {
137 std::string rid = UUIDHelper::uuid();
138 req.set_rid(rid);
139 req.set_cid(_cid);
140 req.set_queue_name(qname);
141 _codec->send(_conn, req);
142 waitResponse(rid);
143 }
150 bool queueBind(const std::string &ename, const std::string &qname, const std::string &key)
151 {
153 std::string rid = UUIDHelper::uuid();
154 req.set_rid(rid);
155 req.set_cid(_cid);
156 req.set_queue_name(qname);
157 req.set_exchange_name(ename);
158 req.set_binding_key(key);
159 _codec->send(_conn, req);
160 basicResponsePtr resp = waitResponse(rid);
161 return resp->ok();
162 }
166 void queueUnBind(const std::string &ename, const std::string &qname)
167 {
169 std::string rid = UUIDHelper::uuid();
170 req.set_rid(rid);
171 req.set_cid(_cid);
172 req.set_queue_name(qname);
173 req.set_exchange_name(ename);
174 _codec->send(_conn, req);
175 waitResponse(rid);
176 }
181 void basicPublish(const std::string &ename, const BasicProperties *bp, const std::string &body)
182 {
184 std::string rid = UUIDHelper::uuid();
185 req.set_rid(rid);
186 req.set_cid(_cid);
187 req.set_exchange_name(ename);
188 req.set_body(body);
189 if (bp != nullptr)
190 {
191 req.mutable_properties()->set_id(bp->id());
194 }
195 _codec->send(_conn, req);
196 waitResponse(rid);
197 }
200 void basicAck(const std::string &msg_id)
201 {
202 if (_consumer.get() == nullptr) // 消费者不存在 无法进行确认
203 {
204 error(logger, "消息确认时 当前信道未设置消费者!");
205 return;
206 }
207 basicAckRequest req;
208 std::string rid = UUIDHelper::uuid();
209 req.set_rid(rid);
210 req.set_cid(_cid);
211 req.set_queue_name(_consumer->qname);
212 req.set_msg_id(msg_id);
213 _codec->send(_conn, req);
214 waitResponse(rid);
215 }
222 bool basicConsume(const std::string &tag, const std::string &qname, bool auto_ack, const ConsumerCallback &cb)
223 {
224 if (_consumer.get() != nullptr) // 不为空时 说明消费者已经存在 不需要再创建
225 return false;
226
228 std::string rid = UUIDHelper::uuid();
229 req.set_rid(rid);
230 req.set_cid(_cid);
231 req.set_queue_name(qname);
232 req.set_consumer_tag(tag);
233 req.set_auto_ack(auto_ack);
234 _codec->send(_conn, req);
235 basicResponsePtr resp = waitResponse(rid);
236 debug(logger,"请求创建一个消费者 消费者tag为%s 消费者所在队列是%s",tag.c_str(), qname.c_str());
237 if (resp->ok() == false)
238 {
239 error(logger, "添加订阅失败!");
240 return false;
241 }
242 _consumer = std::make_shared<Consumer>(tag, qname, auto_ack, cb);
243 return true;
244 }
247 {
248 if (_consumer.get() == nullptr) // 消费者为空 无法取消订阅
249 return;
251 std::string rid = UUIDHelper::uuid();
252 req.set_rid(rid);
253 req.set_cid(_cid);
254 req.set_queue_name(_consumer->qname);
255 req.set_consumer_tag(_consumer->tag);
256 _codec->send(_conn, req);
257 waitResponse(rid);
258 _consumer.reset();
259 }
260
264 {
265 std::unique_lock<std::mutex> lock(_mutex);
266 _basic_resp.insert(std::make_pair(resp->rid(), resp));
267 _cv.notify_all();
268 }
272 {
273 if (_consumer.get() == nullptr)
274 {
275 warn(logger, "消息处理时未找到订阅者信息!");
276 return;
277 }
278 if (_consumer->tag != resp->consumer_tag())
279 {
280 error(logger, "推送消息中消费者标识与信道消费者标识不一致!");
281 return;
282 }
283 _consumer->callback(resp->consumer_tag(), resp->mutable_properties(), resp->body());
284 }
285
286 std::string cid()
287 {
288 return _cid;
289 }
290
291 private:
295 basicResponsePtr waitResponse(const std::string &rid)
296 {
297 std::unique_lock<std::mutex> lock(_mutex);
298 _cv.wait(lock, [&rid, this]
299 { return _basic_resp.find(rid) != _basic_resp.end(); });
300 basicResponsePtr resp = _basic_resp[rid];
301 _basic_resp.erase(rid);
302 return resp;
303 }
304
305 private:
306 std::string _cid;
307 muduo::net::TcpConnectionPtr _conn;
310 std::mutex _mutex;
311 std::condition_variable _cv;
312 std::unordered_map<std::string, basicResponsePtr> _basic_resp;
313 };
317 {
318 public:
319 using ptr = std::shared_ptr<ChannelManager>;
326 Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
327 {
328 std::unique_lock<std::mutex> lock(_mutex);
329 auto channel = std::make_shared<Channel>(conn, codec);
330 _channels.insert(std::make_pair(channel->cid(), channel));
331 return channel;
332 }
335 void remove(const std::string &cid)
336 {
337 std::unique_lock<std::mutex> lock(_mutex);
338 _channels.erase(cid);
339 }
343 Channel::ptr get(const std::string &cid)
344 {
345 std::unique_lock<std::mutex> lock(_mutex);
346 auto it = _channels.find(cid);
347 if (it == _channels.end())
348 {
349 return Channel::ptr();
350 }
351 return it->second;
352 }
353
354 private:
355 std::mutex _mutex;
356 std::unordered_map<std::string, Channel::ptr> _channels;
357 };
358}
Definition msg.pb.h:122
void set_routing_key(ArgT0 &&arg0, ArgT... args)
void set_id(ArgT0 &&arg0, ArgT... args)
::XuMQ::DeliveryMode delivery_mode() const
Definition msg.pb.h:737
const std::string & id() const
Definition msg.pb.h:684
const std::string & routing_key() const
Definition msg.pb.h:754
void set_delivery_mode(::XuMQ::DeliveryMode value)
Definition msg.pb.h:745
客户端信道类
Definition channel.hpp:30
void closeChannel()
关闭信道请求
Definition channel.hpp:55
std::condition_variable _cv
条件变量
Definition channel.hpp:311
std::string _cid
信道id
Definition channel.hpp:306
Channel(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
构造函数
Definition channel.hpp:36
std::shared_ptr< Channel > ptr
信道句柄
Definition channel.hpp:32
ProtobufCodecPtr _codec
协议处理句柄
Definition channel.hpp:308
void basicCancel()
取消订阅
Definition channel.hpp:246
void consume(const basicConsumeResponsePtr &resp)
连接收到推送消息 找到对应的消费者对象 通过回调函数进行消息处理
Definition channel.hpp:271
basicResponsePtr waitResponse(const std::string &rid)
等待处理响应
Definition channel.hpp:295
void basicAck(const std::string &msg_id)
应答消息
Definition channel.hpp:200
Consumer::ptr _consumer
信道角色描述
Definition channel.hpp:309
void putBasicResponse(const basicResponsePtr &resp)
连接收到基础响应后向映射表添加
Definition channel.hpp:263
std::mutex _mutex
互斥锁
Definition channel.hpp:310
bool basicConsume(const std::string &tag, const std::string &qname, bool auto_ack, const ConsumerCallback &cb)
订阅消息
Definition channel.hpp:222
void queueUnBind(const std::string &ename, const std::string &qname)
解除绑定信息
Definition channel.hpp:166
std::string cid()
Definition channel.hpp:286
bool queueBind(const std::string &ename, const std::string &qname, const std::string &key)
添加绑定信息
Definition channel.hpp:150
std::unordered_map< std::string, basicResponsePtr > _basic_resp
基础响应映射表
Definition channel.hpp:312
bool openChannel()
创建信道请求
Definition channel.hpp:44
~Channel()
Definition channel.hpp:38
void deleteExchange(const std::string &name)
删除指定交换机
Definition channel.hpp:95
void basicPublish(const std::string &ename, const BasicProperties *bp, const std::string &body)
向指定交换机发布消息
Definition channel.hpp:181
bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete, google::protobuf::Map< std::string, std::string > &qargs)
声明消息队列
Definition channel.hpp:113
muduo::net::TcpConnectionPtr _conn
连接
Definition channel.hpp:307
void deleteQueue(const std::string &qname)
删除消息队列
Definition channel.hpp:134
bool declareExchange(const std::string &name, ExchangeType type, bool durable, bool auto_delete, google::protobuf::Map< std::string, std::string > &args)
声明交换机
Definition channel.hpp:71
信道管理类
Definition channel.hpp:317
std::unordered_map< std::string, Channel::ptr > _channels
信道id和信道句柄的映射表
Definition channel.hpp:356
Channel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec)
新建一个信道
Definition channel.hpp:326
ChannelManager()
构造函数
Definition channel.hpp:321
Channel::ptr get(const std::string &cid)
获取一个信道
Definition channel.hpp:343
std::shared_ptr< ChannelManager > ptr
信道管理句柄
Definition channel.hpp:319
void remove(const std::string &cid)
移除一个信道
Definition channel.hpp:335
std::mutex _mutex
互斥锁
Definition channel.hpp:355
提供生成 UUID 的工具类。
Definition helper.hpp:184
static std::string uuid()
生成一个随机 UUID。
Definition helper.hpp:194
Definition protocol.pb.h:2015
void set_msg_id(ArgT0 &&arg0, ArgT... args)
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:2428
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_consumer_tag(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:2216
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_auto_ack(bool value)
Definition protocol.pb.h:5194
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_consumer_tag(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:1794
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
void set_body(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
::XuMQ::BasicProperties * mutable_properties()
Definition protocol.pb.h:4745
void set_cid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:293
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:490
::PROTOBUF_NAMESPACE_ID::Map< std::string, std::string > * mutable_args()
Definition protocol.pb.h:3466
void set_auto_delete(bool value)
Definition protocol.pb.h:3437
void set_exchange_type(::XuMQ::ExchangeType value)
Definition protocol.pb.h:3397
void set_durable(bool value)
Definition protocol.pb.h:3417
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:947
void set_queue_name(ArgT0 &&arg0, ArgT... args)
::PROTOBUF_NAMESPACE_ID::Map< std::string, std::string > * mutable_args()
Definition protocol.pb.h:3865
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_durable(bool value)
Definition protocol.pb.h:3816
void set_exclusive(bool value)
Definition protocol.pb.h:3796
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_auto_delete(bool value)
Definition protocol.pb.h:3836
Definition protocol.pb.h:734
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:1191
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:124
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:1376
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
void set_binding_key(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_queue_name(ArgT0 &&arg0, ArgT... args)
Definition protocol.pb.h:1593
void set_queue_name(ArgT0 &&arg0, ArgT... args)
void set_exchange_name(ArgT0 &&arg0, ArgT... args)
void set_rid(ArgT0 &&arg0, ArgT... args)
void set_cid(ArgT0 &&arg0, ArgT... args)
Definition channel.hpp:22
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::shared_ptr< google::protobuf::Message > MessagePtr
消息句柄
Definition channel.hpp:23
std::shared_ptr< basicResponse > basicResponsePtr
其他响应句柄
Definition channel.hpp:26
ExchangeType
Definition msg.pb.h:66
std::shared_ptr< basicConsumeResponse > basicConsumeResponsePtr
消费响应句柄
Definition channel.hpp:25
std::shared_ptr< ProtobufCodec > ProtobufCodecPtr
协议处理句柄
Definition channel.hpp:24
std::function< void(const std::string &, const BasicProperties *, const std::string &)> ConsumerCallback
消费者回调函数
Definition consumer.hpp:19
消费者管理模块
std::shared_ptr< Consumer > ptr
消费者结构管理指针
Definition consumer.hpp:24