Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
queue.hpp
浏览该文件的文档.
1
12#pragma once
13#include "../common/logger.hpp"
14#include "../common/helper.hpp"
15#include "../common/msg.pb.h"
16#include <iostream>
17#include <unordered_map>
18#include <mutex>
19#include <memory>
20
21namespace XuMQ
22{
25 struct MsgQueue
26 {
27 using ptr = std::shared_ptr<MsgQueue>;
28 std::string name;
29 bool durable;
30 bool exclusive;
32 google::protobuf::Map<std::string, std::string> args;
41 MsgQueue(const std::string &qname,
42 bool qdurable,
43 bool qexclusive,
44 bool qauto_delete,
45 const google::protobuf::Map<std::string, std::string> &qargs) : name(qname), durable(qdurable), exclusive(qexclusive),
46 auto_delete(qauto_delete), args(qargs)
47 {
48 }
54 void setArgs(const std::string &str_args)
55 {
56 std::vector<std::string> sub_args;
57 StrHelper::split(str_args, "&", sub_args);
58 for (auto &str : sub_args)
59 {
60 size_t pos = str.find('=');
61 std::string key = str.substr(0, pos);
62 std::string value = str.substr(pos + 1);
63 args[key] = value;
64 }
65 }
70 std::string getArgs()
71 {
72 std::string result;
73 for (auto &arg : args)
74 {
75 result += (arg.first + "=" + arg.second + "&");
76 }
77 if (!result.empty())
78 result.pop_back(); // 去除最后一个'&'
79 return result;
80 }
81 };
82 using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;
86 {
87 public:
91 MsgQueueMapper(const std::string &dbfile)
92 : _sql_helper(dbfile)
93 {
94 std::string path = FileHelper::parentDirectory(dbfile);
96 assert(_sql_helper.open());
98 }
101 {
102 const char *CREATE_TABLE = "create table if not exists queue_table(name varchar(32) primary key,\
103 durable tinyint, exclusive tinyint,\
104 auto_delete tinyint, args varchar(128));";
105 bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);
106 if (ret == false)
107 {
108 fatal(logger, "创建消息队列数据库表失败!");
109 abort();
110 }
111 }
114 {
115 const char *DROP_TABLE = "drop table if exists queue_table;";
116 bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);
117 if (ret == false)
118 {
119 fatal(logger, "删除消息队列数据库表失败!");
120 abort();
121 }
122 }
127 {
128 const char *INSERT_SQL = "insert into queue_table values('%s', %d, %d, %d, '%s');";
129 char sql_str[4096] = {0};
130 sprintf(sql_str, INSERT_SQL, queue->name.c_str(), queue->durable, queue->exclusive,
131 queue->auto_delete, queue->getArgs().c_str());
132 bool ret = _sql_helper.exec(sql_str, nullptr, nullptr);
133 if (ret == false)
134 {
135 error(logger, "数据库: 插入消息队列失败!");
136 return false;
137 }
138 return true;
139 }
143 bool remove(const std::string &name)
144 {
145 const char *DELETE_SQL = "delete from queue_table where name = '%s';";
146 char sql_str[4096] = {0};
147 sprintf(sql_str, DELETE_SQL, name.c_str());
148 bool ret = _sql_helper.exec(sql_str, nullptr, nullptr);
149 if (ret == false)
150 {
151 error(logger, "数据库: 删除消息队列失败!");
152 return false;
153 }
154 return true;
155 }
160 {
161 QueueMap result;
162 const char *SELECT_SQL = "select * from queue_table;";
163 _sql_helper.exec(SELECT_SQL, selectCallback, &result);
164 return result;
165 }
166
167 private:
174 static int selectCallback(void *arg, int numcol, char **row, char **fields)
175 {
176 QueueMap *result = (QueueMap *)arg;
177 auto mqp = std::make_shared<MsgQueue>();
178 mqp->name = row[0];
179 mqp->durable = (bool)std::stoi(row[1]);
180 mqp->exclusive = (bool)std::stoi(row[2]);
181 mqp->auto_delete = (bool)std::stoi(row[3]);
182 if (row[4])
183 mqp->setArgs(row[4]);
184 result->insert(std::make_pair(mqp->name, mqp));
185 return 0;
186 }
187
188 private:
190 };
194 {
195 public:
196 using ptr = std::shared_ptr<MsgQueueManager>;
199 MsgQueueManager(const std::string &dbfile)
200 : _mapper(dbfile)
201 {
203 }
211 bool declareQueue(const std::string &qname,
212 bool qdurable,
213 bool qexclusive,
214 bool qauto_delete,
215 const google::protobuf::Map<std::string, std::string> &qargs)
216 {
217 std::unique_lock<std::mutex> lock(_mutex);
218 auto it = _queues.find(qname);
219 if (it != _queues.end())
220 return true;
221 auto mqp = std::make_shared<MsgQueue>(qname, qdurable, qexclusive, qauto_delete, qargs);
222 if (qdurable)
223 {
224 bool ret = _mapper.insert(mqp);
225 if (ret == false)
226 return false;
227 }
228 _queues.insert(std::make_pair(qname, mqp));
229 return true;
230 }
233 void deleteQueue(const std::string &name)
234 {
235 std::unique_lock<std::mutex> lock(_mutex);
236 auto it = _queues.find(name);
237 if (it == _queues.end())
238 return;
239 _queues.erase(name);
240 if (it->second->durable == true)
241 _mapper.remove(name);
242 }
246 MsgQueue::ptr selectQueue(const std::string &name)
247 {
248 std::unique_lock<std::mutex> lock(_mutex);
249 auto it = _queues.find(name);
250 if (it == _queues.end())
251 return MsgQueue::ptr();
252 return it->second;
253 }
258 {
259 std::unique_lock<std::mutex> lock(_mutex);
260 return _queues;
261 }
265 bool exists(const std::string &name)
266 {
267 std::unique_lock<std::mutex> lock(_mutex);
268 auto it = _queues.find(name);
269 if (it == _queues.end())
270 return false;
271 return true;
272 }
275 size_t size()
276 {
277 std::unique_lock<std::mutex> lock(_mutex);
278 return _queues.size();
279 }
281 void clear()
282 {
283 std::unique_lock<std::mutex> lock(_mutex);
285 _queues.clear();
286 }
287
288 private:
289 std::mutex _mutex;
292 };
293}
static bool createDirectory(const std::string &pathname)
创建目录
Definition helper.hpp:414
static std::string parentDirectory(const std::string &filename)
获取文件的父目录
Definition helper.hpp:357
消息队列数据内存管理类
Definition queue.hpp:194
std::shared_ptr< MsgQueueManager > ptr
消息队列数据内存管理指针
Definition queue.hpp:196
void clear()
清除消息队列
Definition queue.hpp:281
MsgQueue::ptr selectQueue(const std::string &name)
获取指定消息队列
Definition queue.hpp:246
bool exists(const std::string &name)
判断消息队列是否存在
Definition queue.hpp:265
bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map< std::string, std::string > &qargs)
声明消息队列
Definition queue.hpp:211
MsgQueueMapper _mapper
持久化消息队列管理类
Definition queue.hpp:290
QueueMap & allQueue()
获取所有队列
Definition queue.hpp:257
QueueMap _queues
全部消息队列信息
Definition queue.hpp:291
std::mutex _mutex
互斥锁
Definition queue.hpp:289
size_t size()
获取消息队列数量
Definition queue.hpp:275
MsgQueueManager(const std::string &dbfile)
消息队列数据内存管理类 构造函数 从数据库中恢复数据
Definition queue.hpp:199
void deleteQueue(const std::string &name)
删除消息队列
Definition queue.hpp:233
消息队列持久化管理类 将数据存储在sqlite数据库中
Definition queue.hpp:86
void removeTable()
移除一张表
Definition queue.hpp:113
static int selectCallback(void *arg, int numcol, char **row, char **fields)
select语句的回调函数 将获取到的数据存入参数中
Definition queue.hpp:174
void createTable()
创建一张表
Definition queue.hpp:100
bool insert(MsgQueue::ptr &queue)
新增一个消息队列
Definition queue.hpp:126
bool remove(const std::string &name)
移除一个消息队列
Definition queue.hpp:143
SqliteHelper _sql_helper
数据库操作对象
Definition queue.hpp:189
MsgQueueMapper(const std::string &dbfile)
消息队列持久化管理类 构造函数
Definition queue.hpp:91
QueueMap recovery()
获取所有消息队列 从数据库加载到内存
Definition queue.hpp:159
SQLite 数据库操作助手类
Definition helper.hpp:58
bool exec(const std::string &sql, SqliteCallback cb, void *arg)
执行 SQL 语句
Definition helper.hpp:109
bool open(int safe_level=SQLITE_OPEN_FULLMUTEX)
打开数据库
Definition helper.hpp:90
static size_t split(const std::string &str, const std::string &sep, std::vector< std::string > &result)
将字符串分割为多个子字符串
Definition helper.hpp:152
XuMQ::MsgQueueManager::ptr mqp
Definition mqqueuetest.cpp:4
Definition channel.hpp:22
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::unordered_map< std::string, MsgQueue::ptr > QueueMap
消息队列映射表 消息队列名称->消息队列指针
Definition queue.hpp:82
消息队列结构体
Definition queue.hpp:26
void setArgs(const std::string &str_args)
解析字符串并存储到映射成员中
Definition queue.hpp:54
bool durable
持久化标志
Definition queue.hpp:29
bool auto_delete
自动删除标志
Definition queue.hpp:31
MsgQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qauto_delete, const google::protobuf::Map< std::string, std::string > &qargs)
构造函数
Definition queue.hpp:41
std::string getArgs()
将映射成员转化为字符串
Definition queue.hpp:70
std::shared_ptr< MsgQueue > ptr
消息队列指针
Definition queue.hpp:27
bool exclusive
独占标志
Definition queue.hpp:30
google::protobuf::Map< std::string, std::string > args
其他参数
Definition queue.hpp:32
std::string name
消息队列名称
Definition queue.hpp:28
MsgQueue()
无参构造
Definition queue.hpp:34