17#include "../common/logger.hpp"
18#include "../common/helper.hpp"
19#include "../common/msg.pb.h"
21#include <unordered_map>
32 using MessagePtr = std::shared_ptr<XuMQ::Message>;
44 if (basedir.back() !=
'/' && basedir.back() !=
'\\')
45 basedir.push_back(
'/');
96 std::string body = msg->payload().SerializeAsString();
97 if (body.size() != msg->length())
99 error(
logger,
"不能修改文件中的数据信息, 新生成的数据与原数据长度不一致!");
104 bool ret = helper.
write(body.c_str(), msg->offset(), body.size());
116 std::list<MessagePtr> result;
118 bool ret =
load(result);
121 error(
logger,
"加载有效数据失败!");
126 for (
auto &msg : result)
163 std::string body = msg->payload().SerializeAsString();
166 size_t fsize = helper.
size();
168 size_t message_size = body.size();
169 bool ret = helper.
write((
char *)&message_size, fsize,
sizeof(
size_t));
172 error(
logger,
" %s :队列数据文件写入长度失败!", filename.c_str());
177 ret = helper.
write(body.c_str(), fsize +
sizeof(
size_t), body.size());
180 error(
logger,
" %s :队列数据文件写入内容失败!", filename.c_str());
184 msg->set_offset(fsize +
sizeof(
size_t));
185 msg->set_length(body.size());
191 bool load(std::list<MessagePtr> &result)
194 size_t offset = 0, msg_size;
195 size_t fsize = helper.
size();
197 while (offset < fsize)
199 ret = helper.
read((
char *)&msg_size, offset,
sizeof(
size_t));
205 offset +=
sizeof(size_t);
206 std::string msg_body(msg_size,
'\0');
207 ret = helper.
read(&msg_body[0], offset, msg_size);
214 MessagePtr msgp = std::make_shared<Message>();
215 msgp->mutable_payload()->ParseFromString(msg_body);
218 result.push_back(msgp);
234 using ptr = std::shared_ptr<QueueMessage>;
245 std::unique_lock<std::mutex> lock(
_mutex);
247 for (
auto &msg :
_msgs)
248 _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
260 msg->mutable_payload()->set_body(body);
264 msg->mutable_payload()->mutable_properties()->set_id(bp->
id());
265 msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
266 msg->mutable_payload()->mutable_properties()->set_routing_key(bp->
routing_key());
272 msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);
273 msg->mutable_payload()->mutable_properties()->set_routing_key(
"");
275 std::unique_lock<std::mutex> lock(
_mutex);
279 msg->mutable_payload()->set_valid(
MSG_VALID);
284 error(
logger,
" %s :持久化存储消息失败!", body.c_str());
289 _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
292 _msgs.push_back(msg);
299 if (
_msgs.size() == 0)
301 std::unique_lock<std::mutex> lock(
_mutex);
306 _waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
314 std::unique_lock<std::mutex> lock(
_mutex);
319 warn(
logger,
"没有找到要删除的消息! 消息id: %s", msg_id.c_str());
339 std::unique_lock<std::mutex> lock(
_mutex);
346 std::unique_lock<std::mutex> lock(
_mutex);
353 std::unique_lock<std::mutex> lock(
_mutex);
360 std::unique_lock<std::mutex> lock(
_mutex);
366 std::unique_lock<std::mutex> lock(
_mutex);
392 for (
auto &msg : msgs)
394 auto it =
_durable_msgs.find(msg->payload().properties().id());
397 _msgs.push_back(msg);
398 _durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));
399 info(
logger,
"垃圾回收后 有一条消息在内存尚未被管理 已插入待推送消息列表");
402 it->second->set_offset(msg->offset());
403 it->second->set_length(msg->length());
424 using ptr = std::shared_ptr<MessageManager>;
435 std::unique_lock<std::mutex> lock(
_mutex);
439 qmp = std::make_shared<QueueMessage>(
_basedir, qname);
450 std::unique_lock<std::mutex> lock(
_mutex);
469 std::unique_lock<std::mutex> lock(
_mutex);
473 error(
logger,
"插入消息失败, 没有找到 %s 队列", qname.c_str());
478 return qmp->insert(bp, body, mode);
487 std::unique_lock<std::mutex> lock(
_mutex);
491 error(
logger,
"获取队头消息失败, 没有找到 %s 队列", qname.c_str());
501 void ack(
const std::string &qname,
const std::string &msg_id)
505 std::unique_lock<std::mutex> lock(
_mutex);
509 error(
logger,
"确认消息失败, 没有找到 %s 队列", qname.c_str());
523 std::unique_lock<std::mutex> lock(
_mutex);
527 error(
logger,
"获取可获取消息数量失败, 没有找到 %s 队列", qname.c_str());
532 return qmp->availableCount();
540 std::unique_lock<std::mutex> lock(
_mutex);
544 error(
logger,
"获取总消息数量失败, 没有找到 %s 队列", qname.c_str());
549 return qmp->totalCount();
557 std::unique_lock<std::mutex> lock(
_mutex);
561 error(
logger,
"获取待确认消息数量失败, 没有找到 %s 队列", qname.c_str());
566 return qmp->waitAckCount();
574 std::unique_lock<std::mutex> lock(
_mutex);
578 error(
logger,
"获取持久化消息数量失败, 没有找到 %s 队列", qname.c_str());
583 return qmp->durableCount();
591 qmsg.second->clear();
::XuMQ::DeliveryMode delivery_mode() const
Definition msg.pb.h:737
const std::string & id() const
Definition msg.pb.h:684
const std::string & routing_key() const
Definition msg.pb.h:754
文件操作帮助类
Definition helper.hpp:226
static bool removeFile(const std::string filename)
删除文件
Definition helper.hpp:403
static bool createDirectory(const std::string &pathname)
创建目录
Definition helper.hpp:414
static bool createFile(const std::string filename)
创建新文件
Definition helper.hpp:385
bool rename(const std::string &nname)
重命名文件
Definition helper.hpp:374
size_t size()
获取文件大小
Definition helper.hpp:255
bool write(const std::string &body)
写入字符串到文件
Definition helper.hpp:315
bool read(std::string &body)
读取文件内容
Definition helper.hpp:270
消息管理类
Definition message.hpp:422
size_t availableCount(const std::string &qname)
获取可获取消息数量
Definition message.hpp:519
std::shared_ptr< MessageManager > ptr
消息管理类指针
Definition message.hpp:424
std::mutex _mutex
互斥锁
Definition message.hpp:596
bool insert(const std::string &qname, BasicProperties *bp, const std::string &body, bool mode)
向指定队列插入新消息
Definition message.hpp:465
void initQueueMessage(const std::string &qname)
初始化推送消息队列管理类
Definition message.hpp:431
void destroyQueueMessage(const std::string &qname)
销毁推送消息队列管理类
Definition message.hpp:446
void ack(const std::string &qname, const std::string &msg_id)
应答消息
Definition message.hpp:501
MessageManager(const std::string &basedir)
构造函数
Definition message.hpp:428
void clear()
清空
Definition message.hpp:586
MessagePtr front(const std::string &qname)
获取队头消息
Definition message.hpp:483
size_t waitAckCount(const std::string &qname)
获取待确认消息数量
Definition message.hpp:553
size_t totalCount(const std::string &qname)
获取总消息数量
Definition message.hpp:536
std::unordered_map< std::string, QueueMessage::ptr > _queue_msgs
消息队列
Definition message.hpp:598
size_t durableCount(const std::string &qname)
获取持久化消息数量
Definition message.hpp:570
std::string _basedir
基础目录
Definition message.hpp:597
处理消息队列的文件存储和管理类
Definition message.hpp:36
bool load(std::list< MessagePtr > &result)
加载有效消息 从数据文件中读取所有消息并存为有效的消息对象
Definition message.hpp:191
std::list< MessagePtr > garbageCollection()
垃圾回收 加载所有有效消息 存储到临时文件后更新数据文件
Definition message.hpp:114
void removeMsgFile()
移除消息文件 包括移除数据文件和临时文件
Definition message.hpp:76
bool remove(MessagePtr &msg)
移除消息 将消息中的有效标记置为false 更新到数据文件中
Definition message.hpp:91
std::string _qname
队列名称
Definition message.hpp:224
bool insert(const MessagePtr &msg)
插入消息 将消息添加到数据文件中
Definition message.hpp:84
MessageMapper(std::string &basedir, const std::string &qname)
构造函数 创建必要的目录和数据文件
Definition message.hpp:41
std::string _tmpfile
临时文件
Definition message.hpp:226
bool createMsgFile()
创建消息文件
Definition message.hpp:62
bool insert(const std::string &filename, const MessagePtr &msg)
插入消息到指定文件 负责数据文件和临时文件的写入工作
Definition message.hpp:159
std::string _datafile
数据文件
Definition message.hpp:225
推送消息队列管理
Definition message.hpp:232
std::unordered_map< std::string, MessagePtr > _waitack_msgs
待确认消息映射表
Definition message.hpp:417
std::mutex _mutex
互斥锁
Definition message.hpp:410
MessageMapper _mapper
消息队列持久化管理类
Definition message.hpp:414
bool garbageCollectionCheck()
垃圾回收条件检测
Definition message.hpp:378
std::string _qname
队列名称
Definition message.hpp:411
void recovery()
恢复历史消息
Definition message.hpp:243
MessagePtr front()
获取队头消息
Definition message.hpp:297
size_t availableCount()
获取可获取消息数量
Definition message.hpp:337
std::shared_ptr< QueueMessage > ptr
Definition message.hpp:234
bool insert(const BasicProperties *bp, const std::string &body, bool queue_id_durable)
插入推送消息队列
Definition message.hpp:256
size_t totalCount()
获取总消息数量
Definition message.hpp:344
size_t durableCount()
获取持久化消息数量
Definition message.hpp:358
size_t waitAckCount()
获取待确认消息数量
Definition message.hpp:351
std::list< MessagePtr > _msgs
待推送消息列表
Definition message.hpp:415
std::unordered_map< std::string, MessagePtr > _durable_msgs
持久化消息映射表
Definition message.hpp:416
size_t _valid_count
有效消息数量
Definition message.hpp:412
void garbageCollection()
垃圾回收
Definition message.hpp:385
bool remove(const std::string &msg_id)
移除接收到确认ack的消息
Definition message.hpp:312
size_t _total_count
总消息数量
Definition message.hpp:413
QueueMessage(std::string &basedir, const std::string &qname)
推送消息队列构造函数 恢复历史消息
Definition message.hpp:238
void clear()
清空数据
Definition message.hpp:364
static std::string uuid()
生成一个随机 UUID。
Definition helper.hpp:194
Definition channel.hpp:22
DeliveryMode
Definition msg.pb.h:93
@ UNDURABLE
Definition msg.pb.h:95
@ DURABLE
Definition msg.pb.h:96
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::shared_ptr< google::protobuf::Message > MessagePtr
消息句柄
Definition channel.hpp:23
const char * DATAFILE_SUBFIX
数据文件后缀名
Definition message.hpp:28
const char * MSG_INVALID
消息无效标志
Definition message.hpp:31
const char * MSG_VALID
消息有效标志
Definition message.hpp:30
const char * TMPFILE_SUBFIX
临时文件后缀名
Definition message.hpp:29