Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
message.hpp
浏览该文件的文档.
1
16#pragma once
17#include "../common/logger.hpp"
18#include "../common/helper.hpp"
19#include "../common/msg.pb.h"
20#include <iostream>
21#include <unordered_map>
22#include <mutex>
23#include <memory>
24#include <list>
25
26namespace XuMQ
27{
28 const char *DATAFILE_SUBFIX = ".mqd";
29 const char *TMPFILE_SUBFIX = ".mqd.tmp";
30 const char *MSG_VALID = "1";
31 const char *MSG_INVALID = "0";
32 using MessagePtr = std::shared_ptr<XuMQ::Message>;
36 {
37 public:
41 MessageMapper(std::string &basedir, const std::string &qname)
42 : _qname(qname)
43 {
44 if (basedir.back() != '/' && basedir.back() != '\\')
45 basedir.push_back('/');
46 _datafile = basedir + qname + DATAFILE_SUBFIX;
47 _tmpfile = basedir + qname + TMPFILE_SUBFIX;
48
49 if (FileHelper(basedir).exists() == false)
50 {
51 int ret = FileHelper::createDirectory(basedir);
52 if (ret == false)
53 {
54 fatal(logger, "创建文件夹失败!");
55 abort();
56 }
57 }
59 }
63 {
64 if (FileHelper(_datafile).exists() == false)
65 {
67 if (ret == false)
68 {
69 error(logger, " %s :创建队列数据文件失败!", _datafile.c_str());
70 return false;
71 }
72 }
73 return true;
74 }
84 bool insert(const MessagePtr &msg)
85 {
86 return insert(_datafile, msg);
87 }
91 bool remove(MessagePtr &msg)
92 {
93 // 将msg中的有效标志为设置为'0'(false)
94 msg->mutable_payload()->set_valid(MSG_INVALID);
95 // 对msg进行序列化
96 std::string body = msg->payload().SerializeAsString();
97 if (body.size() != msg->length())
98 {
99 error(logger, "不能修改文件中的数据信息, 新生成的数据与原数据长度不一致!");
100 return false;
101 }
102 // 将序列化的消息 写入到数据中的指定位置(覆盖原有的数据)
103 FileHelper helper(_datafile);
104 bool ret = helper.write(body.c_str(), msg->offset(), body.size());
105 if (ret == false)
106 {
107 error(logger, " %s :队列数据文件写入失败!", _datafile.c_str());
108 return false;
109 }
110 return true;
111 }
114 std::list<MessagePtr> garbageCollection()
115 {
116 std::list<MessagePtr> result;
117 // 加载文件中所有的有效数据 存储格式 4字节长度|数据|4字节长度|数据...
118 bool ret = load(result);
119 if (ret == false)
120 {
121 error(logger, "加载有效数据失败!");
122 return result;
123 }
124 // 有效数据进行序列化存储到临时文件中
126 for (auto &msg : result)
127 {
128 ret = insert(_tmpfile, msg);
129 if (ret == false)
130 {
131 error(logger, " %s :临时文件写入消息数据失败!", _tmpfile);
132 return result;
133 }
134 }
135 // 删除原文件
137 if (ret == false)
138 {
139 error(logger, " %s :删除原文件失败!", _datafile);
140 return result;
141 }
142
143 // 修改临时文件名为原文件名称
145 if (ret == false)
146 {
147 error(logger, " %s :修改临时文件名称失败!", _tmpfile.c_str());
148 return result;
149 }
150 // 返回新的有效数据
151 return result;
152 }
153
154 private:
159 bool insert(const std::string &filename, const MessagePtr &msg)
160 {
161 // 新增数据添加在文件末尾
162 // 消息序列化
163 std::string body = msg->payload().SerializeAsString();
164 // 获取文件长度
165 FileHelper helper(filename);
166 size_t fsize = helper.size();
167 // 先写入4字节数据长度
168 size_t message_size = body.size();
169 bool ret = helper.write((char *)&message_size, fsize, sizeof(size_t));
170 if (ret == false)
171 {
172 error(logger, " %s :队列数据文件写入长度失败!", filename.c_str());
173 return false;
174 }
175
176 // 写入数据到指定位置
177 ret = helper.write(body.c_str(), fsize + sizeof(size_t), body.size());
178 if (ret == false)
179 {
180 error(logger, " %s :队列数据文件写入内容失败!", filename.c_str());
181 return false;
182 }
183 // 更新msg中的存储信息
184 msg->set_offset(fsize + sizeof(size_t));
185 msg->set_length(body.size());
186 return true;
187 }
191 bool load(std::list<MessagePtr> &result)
192 {
193 FileHelper helper(_datafile);
194 size_t offset = 0, msg_size;
195 size_t fsize = helper.size();
196 bool ret;
197 while (offset < fsize)
198 {
199 ret = helper.read((char *)&msg_size, offset, sizeof(size_t));
200 if (ret == false)
201 {
202 error(logger, " %s :读取消息长度失败!", _datafile);
203 return false;
204 }
205 offset += sizeof(size_t);
206 std::string msg_body(msg_size, '\0');
207 ret = helper.read(&msg_body[0], offset, msg_size);
208 if (ret == false)
209 {
210 error(logger, " %s :读取消息数据失败!", _datafile);
211 return false;
212 }
213 offset += msg_size;
214 MessagePtr msgp = std::make_shared<Message>();
215 msgp->mutable_payload()->ParseFromString(msg_body);
216 if (msgp->payload().valid() == MSG_INVALID) // 无效消息则处理下一个
217 continue;
218 result.push_back(msgp); // 有效消息保存
219 }
220 return true;
221 }
222
223 private:
224 std::string _qname;
225 std::string _datafile;
226 std::string _tmpfile;
227 };
228
232 {
233 public:
234 using ptr = std::shared_ptr<QueueMessage>;
238 QueueMessage(std::string &basedir, const std::string &qname)
239 : _mapper(basedir, qname), _qname(qname), _valid_count(0), _total_count(0)
240 {
241 }
243 void recovery()
244 {
245 std::unique_lock<std::mutex> lock(_mutex);
247 for (auto &msg : _msgs)
248 _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
250 }
256 bool insert(const BasicProperties *bp, const std::string &body, bool queue_id_durable)
257 {
258 // 构造消息对象
259 MessagePtr msg = std::make_shared<Message>();
260 msg->mutable_payload()->set_body(body);
261 if (bp != nullptr)
262 {
263 DeliveryMode mode = queue_id_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;
264 msg->mutable_payload()->mutable_properties()->set_id(bp->id());
265 msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
266 msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());
267 }
268 else
269 {
271 msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
272 msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
273 msg->mutable_payload()->mutable_properties()->set_routing_key("");
274 }
275 std::unique_lock<std::mutex> lock(_mutex);
276 // 判断消息是否需要持久化
277 if (msg->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
278 {
279 msg->mutable_payload()->set_valid(MSG_VALID); // 持久化存储中表示数据有效
280 // 持久化存储
281 bool ret = _mapper.insert(msg);
282 if (ret == false)
283 {
284 error(logger, " %s :持久化存储消息失败!", body.c_str());
285 return false;
286 }
287 _valid_count++;
288 _total_count++;
289 _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
290 }
291 // 内存管理
292 _msgs.push_back(msg);
293 return true;
294 }
298 {
299 if (_msgs.size() == 0)
300 return MessagePtr();
301 std::unique_lock<std::mutex> lock(_mutex);
302 // 获取队头消息 从msgs取出数据
303 MessagePtr msg = _msgs.front();
304 _msgs.pop_front();
305 // 将消息对象插入待确认映射表 等到收到确认ack后删除
306 _waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
307 return msg;
308 }
312 bool remove(const std::string &msg_id)
313 {
314 std::unique_lock<std::mutex> lock(_mutex);
315 // 从待确认映射表中查找消息
316 auto it = _waitack_msgs.find(msg_id);
317 if (it == _waitack_msgs.end())
318 {
319 warn(logger, "没有找到要删除的消息! 消息id: %s", msg_id.c_str());
320 return true;
321 }
322 // 查看持久化模式
323 if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE)
324 {
325 // 删除持久化信息
326 _mapper.remove(it->second);
327 _durable_msgs.erase(msg_id);
328 _valid_count--;
330 }
331 // 删除内存中的信息
332 _waitack_msgs.erase(msg_id);
333 return true;
334 }
338 {
339 std::unique_lock<std::mutex> lock(_mutex);
340 return _msgs.size();
341 }
344 size_t totalCount()
345 {
346 std::unique_lock<std::mutex> lock(_mutex);
347 return _total_count;
348 }
352 {
353 std::unique_lock<std::mutex> lock(_mutex);
354 return _waitack_msgs.size();
355 }
359 {
360 std::unique_lock<std::mutex> lock(_mutex);
361 return _durable_msgs.size();
362 }
364 void clear()
365 {
366 std::unique_lock<std::mutex> lock(_mutex);
368 _msgs.clear();
369 _durable_msgs.clear();
370 _waitack_msgs.clear();
371 _valid_count = 0;
372 _total_count = 0;
373 }
374
375 private:
379 {
380 if (_total_count > 2000 && _valid_count * 10 / _total_count < 5)
381 return true;
382 return false;
383 }
386 {
387 // 垃圾回收 获取有效消息信息链表
388 if (garbageCollectionCheck() == false)
389 return;
390 // 更新消息的实际存储位置
391 std::list<MessagePtr> msgs = _mapper.garbageCollection();
392 for (auto &msg : msgs)
393 {
394 auto it = _durable_msgs.find(msg->payload().properties().id());
395 if (it == _durable_msgs.end())
396 {
397 _msgs.push_back(msg);
398 _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
399 info(logger, "垃圾回收后 有一条消息在内存尚未被管理 已插入待推送消息列表");
400 continue;
401 }
402 it->second->set_offset(msg->offset());
403 it->second->set_length(msg->length());
404 }
405 // 更新有效消息数量 总持久化消息数量
406 _valid_count = _total_count = msgs.size();
407 }
408
409 private:
410 std::mutex _mutex;
411 std::string _qname;
415 std::list<MessagePtr> _msgs;
416 std::unordered_map<std::string, MessagePtr> _durable_msgs;
417 std::unordered_map<std::string, MessagePtr> _waitack_msgs;
418 };
419
422 {
423 public:
424 using ptr = std::shared_ptr<MessageManager>;
428 MessageManager(const std::string &basedir) : _basedir(basedir) {}
431 void initQueueMessage(const std::string &qname)
432 {
434 {
435 std::unique_lock<std::mutex> lock(_mutex);
436 auto it = _queue_msgs.find(qname);
437 if (it != _queue_msgs.end())
438 return;
439 qmp = std::make_shared<QueueMessage>(_basedir, qname);
440 _queue_msgs.insert(std::make_pair(qname, qmp));
441 }
442 qmp->recovery();
443 }
446 void destroyQueueMessage(const std::string &qname)
447 {
449 {
450 std::unique_lock<std::mutex> lock(_mutex);
451 auto it = _queue_msgs.find(qname);
452 if (it == _queue_msgs.end())
453 return;
454 qmp = it->second;
455 _queue_msgs.erase(qname);
456 }
457 qmp->clear();
458 }
465 bool insert(const std::string &qname, BasicProperties *bp, const std::string &body, bool mode)
466 {
468 {
469 std::unique_lock<std::mutex> lock(_mutex);
470 auto it = _queue_msgs.find(qname);
471 if (it == _queue_msgs.end())
472 {
473 error(logger, "插入消息失败, 没有找到 %s 队列", qname.c_str());
474 return false;
475 }
476 qmp = it->second;
477 }
478 return qmp->insert(bp, body, mode);
479 }
483 MessagePtr front(const std::string &qname)
484 {
486 {
487 std::unique_lock<std::mutex> lock(_mutex);
488 auto it = _queue_msgs.find(qname);
489 if (it == _queue_msgs.end())
490 {
491 error(logger, "获取队头消息失败, 没有找到 %s 队列", qname.c_str());
492 return MessagePtr();
493 }
494 qmp = it->second;
495 }
496 return qmp->front();
497 }
501 void ack(const std::string &qname, const std::string &msg_id)
502 {
504 {
505 std::unique_lock<std::mutex> lock(_mutex);
506 auto it = _queue_msgs.find(qname);
507 if (it == _queue_msgs.end())
508 {
509 error(logger, "确认消息失败, 没有找到 %s 队列", qname.c_str());
510 return;
511 }
512 qmp = it->second;
513 }
514 qmp->remove(msg_id);
515 }
516
519 size_t availableCount(const std::string &qname)
520 {
522 {
523 std::unique_lock<std::mutex> lock(_mutex);
524 auto it = _queue_msgs.find(qname);
525 if (it == _queue_msgs.end())
526 {
527 error(logger, "获取可获取消息数量失败, 没有找到 %s 队列", qname.c_str());
528 return 0;
529 }
530 qmp = it->second;
531 }
532 return qmp->availableCount();
533 }
536 size_t totalCount(const std::string &qname)
537 {
539 {
540 std::unique_lock<std::mutex> lock(_mutex);
541 auto it = _queue_msgs.find(qname);
542 if (it == _queue_msgs.end())
543 {
544 error(logger, "获取总消息数量失败, 没有找到 %s 队列", qname.c_str());
545 return 0;
546 }
547 qmp = it->second;
548 }
549 return qmp->totalCount();
550 }
553 size_t waitAckCount(const std::string &qname)
554 {
556 {
557 std::unique_lock<std::mutex> lock(_mutex);
558 auto it = _queue_msgs.find(qname);
559 if (it == _queue_msgs.end())
560 {
561 error(logger, "获取待确认消息数量失败, 没有找到 %s 队列", qname.c_str());
562 return 0;
563 }
564 qmp = it->second;
565 }
566 return qmp->waitAckCount();
567 }
570 size_t durableCount(const std::string &qname)
571 {
573 {
574 std::unique_lock<std::mutex> lock(_mutex);
575 auto it = _queue_msgs.find(qname);
576 if (it == _queue_msgs.end())
577 {
578 error(logger, "获取持久化消息数量失败, 没有找到 %s 队列", qname.c_str());
579 return 0;
580 }
581 qmp = it->second;
582 }
583 return qmp->durableCount();
584 }
586 void clear()
587 {
589 for (auto &qmsg : _queue_msgs)
590 {
591 qmsg.second->clear();
592 }
593 }
594
595 private:
596 std::mutex _mutex;
597 std::string _basedir;
598 std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs;
599 };
600}
Definition msg.pb.h:122
::XuMQ::DeliveryMode delivery_mode() const
Definition msg.pb.h:737
const std::string & id() const
Definition msg.pb.h:684
const std::string & routing_key() const
Definition msg.pb.h:754
文件操作帮助类
Definition helper.hpp:226
static bool removeFile(const std::string filename)
删除文件
Definition helper.hpp:403
static bool createDirectory(const std::string &pathname)
创建目录
Definition helper.hpp:414
static bool createFile(const std::string filename)
创建新文件
Definition helper.hpp:385
bool rename(const std::string &nname)
重命名文件
Definition helper.hpp:374
size_t size()
获取文件大小
Definition helper.hpp:255
bool write(const std::string &body)
写入字符串到文件
Definition helper.hpp:315
bool read(std::string &body)
读取文件内容
Definition helper.hpp:270
消息管理类
Definition message.hpp:422
size_t availableCount(const std::string &qname)
获取可获取消息数量
Definition message.hpp:519
std::shared_ptr< MessageManager > ptr
消息管理类指针
Definition message.hpp:424
std::mutex _mutex
互斥锁
Definition message.hpp:596
bool insert(const std::string &qname, BasicProperties *bp, const std::string &body, bool mode)
向指定队列插入新消息
Definition message.hpp:465
void initQueueMessage(const std::string &qname)
初始化推送消息队列管理类
Definition message.hpp:431
void destroyQueueMessage(const std::string &qname)
销毁推送消息队列管理类
Definition message.hpp:446
void ack(const std::string &qname, const std::string &msg_id)
应答消息
Definition message.hpp:501
MessageManager(const std::string &basedir)
构造函数
Definition message.hpp:428
void clear()
清空
Definition message.hpp:586
MessagePtr front(const std::string &qname)
获取队头消息
Definition message.hpp:483
size_t waitAckCount(const std::string &qname)
获取待确认消息数量
Definition message.hpp:553
size_t totalCount(const std::string &qname)
获取总消息数量
Definition message.hpp:536
std::unordered_map< std::string, QueueMessage::ptr > _queue_msgs
消息队列
Definition message.hpp:598
size_t durableCount(const std::string &qname)
获取持久化消息数量
Definition message.hpp:570
std::string _basedir
基础目录
Definition message.hpp:597
处理消息队列的文件存储和管理类
Definition message.hpp:36
bool load(std::list< MessagePtr > &result)
加载有效消息 从数据文件中读取所有消息并存为有效的消息对象
Definition message.hpp:191
std::list< MessagePtr > garbageCollection()
垃圾回收 加载所有有效消息 存储到临时文件后更新数据文件
Definition message.hpp:114
void removeMsgFile()
移除消息文件 包括移除数据文件和临时文件
Definition message.hpp:76
bool remove(MessagePtr &msg)
移除消息 将消息中的有效标记置为false 更新到数据文件中
Definition message.hpp:91
std::string _qname
队列名称
Definition message.hpp:224
bool insert(const MessagePtr &msg)
插入消息 将消息添加到数据文件中
Definition message.hpp:84
MessageMapper(std::string &basedir, const std::string &qname)
构造函数 创建必要的目录和数据文件
Definition message.hpp:41
std::string _tmpfile
临时文件
Definition message.hpp:226
bool createMsgFile()
创建消息文件
Definition message.hpp:62
bool insert(const std::string &filename, const MessagePtr &msg)
插入消息到指定文件 负责数据文件和临时文件的写入工作
Definition message.hpp:159
std::string _datafile
数据文件
Definition message.hpp:225
推送消息队列管理
Definition message.hpp:232
std::unordered_map< std::string, MessagePtr > _waitack_msgs
待确认消息映射表
Definition message.hpp:417
std::mutex _mutex
互斥锁
Definition message.hpp:410
MessageMapper _mapper
消息队列持久化管理类
Definition message.hpp:414
bool garbageCollectionCheck()
垃圾回收条件检测
Definition message.hpp:378
std::string _qname
队列名称
Definition message.hpp:411
void recovery()
恢复历史消息
Definition message.hpp:243
MessagePtr front()
获取队头消息
Definition message.hpp:297
size_t availableCount()
获取可获取消息数量
Definition message.hpp:337
std::shared_ptr< QueueMessage > ptr
Definition message.hpp:234
bool insert(const BasicProperties *bp, const std::string &body, bool queue_id_durable)
插入推送消息队列
Definition message.hpp:256
size_t totalCount()
获取总消息数量
Definition message.hpp:344
size_t durableCount()
获取持久化消息数量
Definition message.hpp:358
size_t waitAckCount()
获取待确认消息数量
Definition message.hpp:351
std::list< MessagePtr > _msgs
待推送消息列表
Definition message.hpp:415
std::unordered_map< std::string, MessagePtr > _durable_msgs
持久化消息映射表
Definition message.hpp:416
size_t _valid_count
有效消息数量
Definition message.hpp:412
void garbageCollection()
垃圾回收
Definition message.hpp:385
bool remove(const std::string &msg_id)
移除接收到确认ack的消息
Definition message.hpp:312
size_t _total_count
总消息数量
Definition message.hpp:413
QueueMessage(std::string &basedir, const std::string &qname)
推送消息队列构造函数 恢复历史消息
Definition message.hpp:238
void clear()
清空数据
Definition message.hpp:364
static std::string uuid()
生成一个随机 UUID。
Definition helper.hpp:194
Definition channel.hpp:22
DeliveryMode
Definition msg.pb.h:93
@ UNDURABLE
Definition msg.pb.h:95
@ DURABLE
Definition msg.pb.h:96
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::shared_ptr< google::protobuf::Message > MessagePtr
消息句柄
Definition channel.hpp:23
const char * DATAFILE_SUBFIX
数据文件后缀名
Definition message.hpp:28
const char * MSG_INVALID
消息无效标志
Definition message.hpp:31
const char * MSG_VALID
消息有效标志
Definition message.hpp:30
const char * TMPFILE_SUBFIX
临时文件后缀名
Definition message.hpp:29