13#include "../common/logger.hpp"
14#include "../common/helper.hpp"
15#include "../common/msg.pb.h"
17#include <unordered_map>
27 using ptr = std::shared_ptr<MsgQueue>;
32 google::protobuf::Map<std::string, std::string>
args;
45 const google::protobuf::Map<std::string, std::string> &qargs) :
name(qname),
durable(qdurable),
exclusive(qexclusive),
56 std::vector<std::string> sub_args;
58 for (
auto &str : sub_args)
60 size_t pos = str.find(
'=');
61 std::string key = str.substr(0, pos);
62 std::string value = str.substr(pos + 1);
73 for (
auto &arg :
args)
75 result += (arg.first +
"=" + arg.second +
"&");
82 using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;
102 const char *CREATE_TABLE =
"create table if not exists queue_table(name varchar(32) primary key,\
103 durable tinyint, exclusive tinyint,\
104 auto_delete tinyint, args varchar(128));";
108 fatal(
logger,
"创建消息队列数据库表失败!");
115 const char *DROP_TABLE =
"drop table if exists queue_table;";
119 fatal(
logger,
"删除消息队列数据库表失败!");
128 const char *INSERT_SQL =
"insert into queue_table values('%s', %d, %d, %d, '%s');";
129 char sql_str[4096] = {0};
130 sprintf(sql_str, INSERT_SQL, queue->name.c_str(), queue->durable, queue->exclusive,
131 queue->auto_delete, queue->getArgs().c_str());
135 error(
logger,
"数据库: 插入消息队列失败!");
145 const char *DELETE_SQL =
"delete from queue_table where name = '%s';";
146 char sql_str[4096] = {0};
147 sprintf(sql_str, DELETE_SQL, name.c_str());
151 error(
logger,
"数据库: 删除消息队列失败!");
162 const char *SELECT_SQL =
"select * from queue_table;";
177 auto mqp = std::make_shared<MsgQueue>();
179 mqp->durable = (bool)std::stoi(row[1]);
180 mqp->exclusive = (bool)std::stoi(row[2]);
181 mqp->auto_delete = (bool)std::stoi(row[3]);
183 mqp->setArgs(row[4]);
184 result->insert(std::make_pair(
mqp->name,
mqp));
196 using ptr = std::shared_ptr<MsgQueueManager>;
215 const google::protobuf::Map<std::string, std::string> &qargs)
217 std::unique_lock<std::mutex> lock(
_mutex);
221 auto mqp = std::make_shared<MsgQueue>(qname, qdurable, qexclusive, qauto_delete, qargs);
235 std::unique_lock<std::mutex> lock(
_mutex);
240 if (it->second->durable ==
true)
248 std::unique_lock<std::mutex> lock(
_mutex);
259 std::unique_lock<std::mutex> lock(
_mutex);
267 std::unique_lock<std::mutex> lock(
_mutex);
277 std::unique_lock<std::mutex> lock(
_mutex);
283 std::unique_lock<std::mutex> lock(
_mutex);
static bool createDirectory(const std::string &pathname)
创建目录
Definition helper.hpp:414
static std::string parentDirectory(const std::string &filename)
获取文件的父目录
Definition helper.hpp:357
消息队列数据内存管理类
Definition queue.hpp:194
std::shared_ptr< MsgQueueManager > ptr
消息队列数据内存管理指针
Definition queue.hpp:196
void clear()
清除消息队列
Definition queue.hpp:281
MsgQueue::ptr selectQueue(const std::string &name)
获取指定消息队列
Definition queue.hpp:246
bool exists(const std::string &name)
判断消息队列是否存在
Definition queue.hpp:265
bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map< std::string, std::string > &qargs)
声明消息队列
Definition queue.hpp:211
MsgQueueMapper _mapper
持久化消息队列管理类
Definition queue.hpp:290
QueueMap & allQueue()
获取所有队列
Definition queue.hpp:257
QueueMap _queues
全部消息队列信息
Definition queue.hpp:291
std::mutex _mutex
互斥锁
Definition queue.hpp:289
size_t size()
获取消息队列数量
Definition queue.hpp:275
MsgQueueManager(const std::string &dbfile)
消息队列数据内存管理类 构造函数 从数据库中恢复数据
Definition queue.hpp:199
void deleteQueue(const std::string &name)
删除消息队列
Definition queue.hpp:233
消息队列持久化管理类 将数据存储在sqlite数据库中
Definition queue.hpp:86
void removeTable()
移除一张表
Definition queue.hpp:113
static int selectCallback(void *arg, int numcol, char **row, char **fields)
select语句的回调函数 将获取到的数据存入参数中
Definition queue.hpp:174
void createTable()
创建一张表
Definition queue.hpp:100
bool insert(MsgQueue::ptr &queue)
新增一个消息队列
Definition queue.hpp:126
bool remove(const std::string &name)
移除一个消息队列
Definition queue.hpp:143
SqliteHelper _sql_helper
数据库操作对象
Definition queue.hpp:189
MsgQueueMapper(const std::string &dbfile)
消息队列持久化管理类 构造函数
Definition queue.hpp:91
QueueMap recovery()
获取所有消息队列 从数据库加载到内存
Definition queue.hpp:159
SQLite 数据库操作助手类
Definition helper.hpp:58
bool exec(const std::string &sql, SqliteCallback cb, void *arg)
执行 SQL 语句
Definition helper.hpp:109
bool open(int safe_level=SQLITE_OPEN_FULLMUTEX)
打开数据库
Definition helper.hpp:90
static size_t split(const std::string &str, const std::string &sep, std::vector< std::string > &result)
将字符串分割为多个子字符串
Definition helper.hpp:152
XuMQ::MsgQueueManager::ptr mqp
Definition mqqueuetest.cpp:4
Definition channel.hpp:22
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::unordered_map< std::string, MsgQueue::ptr > QueueMap
消息队列映射表 消息队列名称->消息队列指针
Definition queue.hpp:82
消息队列结构体
Definition queue.hpp:26
void setArgs(const std::string &str_args)
解析字符串并存储到映射成员中
Definition queue.hpp:54
bool durable
持久化标志
Definition queue.hpp:29
bool auto_delete
自动删除标志
Definition queue.hpp:31
MsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map< std::string, std::string > &qargs)
构造函数
Definition queue.hpp:41
std::string getArgs()
将映射成员转化为字符串
Definition queue.hpp:70
std::shared_ptr< MsgQueue > ptr
消息队列指针
Definition queue.hpp:27
bool exclusive
独占标志
Definition queue.hpp:30
google::protobuf::Map< std::string, std::string > args
其他参数
Definition queue.hpp:32
std::string name
消息队列名称
Definition queue.hpp:28
MsgQueue()
无参构造
Definition queue.hpp:34