Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
exchange.hpp
浏览该文件的文档.
1
9#pragma once
10#include "../common/logger.hpp"
11#include "../common/helper.hpp"
12#include "../common/msg.pb.h"
13#include <iostream>
14#include <unordered_map>
15#include <mutex>
16#include <memory>
17
18namespace XuMQ
19{
22 struct Exchange
23 {
24 using ptr = std::shared_ptr<Exchange>;
25 std::string name;
27 bool durable;
29 google::protobuf::Map<std::string, std::string> args;
30
39 Exchange(const std::string &ename,
40 ExchangeType etype,
41 bool edurable,
42 bool eauto_delete,
43 const google::protobuf::Map<std::string, std::string> &eargs)
44 : name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs)
45 {
46 }
52 void setArgs(const std::string &str_args)
53 {
54 std::vector<std::string> sub_args;
55 StrHelper::split(str_args, "&", sub_args);
56 for (auto &str : sub_args)
57 {
58 size_t pos = str.find('=');
59 std::string key = str.substr(0, pos);
60 std::string value = str.substr(pos + 1);
61 args[key] = value;
62 }
63 }
68 std::string getArgs()
69 {
70 std::string result;
71 for (auto &arg : args)
72 {
73 result += (arg.first + "=" + arg.second + "&");
74 }
75 if (!result.empty())
76 result.pop_back(); // 去除最后一个'&'
77 return result;
78 }
79 };
80 using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;
84 {
85 public:
89 ExchangeMapper(const std::string &dbfile)
90 : _sql_helper(dbfile)
91 {
92 std::string path = FileHelper::parentDirectory(dbfile);
94 assert(_sql_helper.open());
96 }
99 {
100 const char *CREATE_TABLE = "create table if not exists exchange_table(name varchar(32) primary key,\
101 type tinyint, durable tinyint,\
102 auto_delete tinyint, args varchar(128));";
103 bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);
104 if (ret == false)
105 {
106 fatal(logger, "创建交换机数据库表失败!");
107 abort();
108 }
109 }
112 {
113 const char *DROP_TABLE = "drop table if exists exchange_table;";
114 bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);
115 if (ret == false)
116 {
117 fatal(logger, "删除交换机数据库表失败!");
118 abort();
119 }
120 }
125 bool insert(Exchange::ptr &exchange)
126 {
127 const char *INSERT_SQL = "insert into exchange_table values('%s', %d, %d, %d, '%s');";
128 char sql_str[4096] = {0};
129 sprintf(sql_str, INSERT_SQL, exchange->name.c_str(), exchange->type, exchange->durable,
130 exchange->auto_delete, exchange->getArgs().c_str());
131 bool ret = _sql_helper.exec(sql_str, nullptr, nullptr);
132 if (ret == false)
133 {
134 error(logger, "数据库: 插入交换机失败!");
135 return false;
136 }
137 return true;
138 }
142 bool remove(const std::string &name)
143 {
144 const char *DELETE_SQL = "delete from exchange_table where name = '%s';";
145 char sql_str[4096] = {0};
146 sprintf(sql_str, DELETE_SQL, name.c_str());
147 bool ret = _sql_helper.exec(sql_str, nullptr, nullptr);
148 if (ret == false)
149 {
150 error(logger, "数据库: 删除交换机失败!");
151 return false;
152 }
153 return true;
154 }
159 {
160 ExchangeMap result;
161 const char *SELECT_SQL = "select * from exchange_table;";
162 _sql_helper.exec(SELECT_SQL, selectCallback, &result);
163 return result;
164 }
165
166 private:
173 static int selectCallback(void *arg, int numcol, char **row, char **fields)
174 {
175 ExchangeMap *result = (ExchangeMap *)arg;
176 auto exp = std::make_shared<Exchange>();
177 exp->name = row[0];
178 exp->type = (XuMQ::ExchangeType)std::stoi(row[1]);
179 exp->durable = (bool)std::stoi(row[2]);
180 exp->auto_delete = (bool)std::stoi(row[3]);
181 if (row[4])
182 exp->setArgs(row[4]);
183 result->insert(std::make_pair(exp->name, exp));
184 return 0;
185 }
186
187 private:
189 };
193 {
194 public:
195 using ptr = std::shared_ptr<ExchangeManager>;
198 ExchangeManager(const std::string &dbfile)
199 : _mapper(dbfile)
200 {
202 }
210 bool declareExchange(const std::string &name,
211 ExchangeType type,
212 bool durable,
213 bool auto_delete,
214 const google::protobuf::Map<std::string, std::string> &args)
215 {
216 std::unique_lock<std::mutex> lock(_mutex);
217 auto it = _exchanges.find(name);
218 if (it != _exchanges.end())
219 return true;
220 auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);
221 if (durable)
222 {
223 bool ret = _mapper.insert(exp);
224 if (ret == false)
225 return false;
226 }
227 _exchanges.insert(std::make_pair(name, exp));
228 return true;
229 }
232 void deleteExchange(const std::string &name)
233 {
234 std::unique_lock<std::mutex> lock(_mutex);
235 auto it = _exchanges.find(name);
236 if (it == _exchanges.end())
237 return;
238 _exchanges.erase(name);
239 if (it->second->durable == true)
240 _mapper.remove(name);
241 }
245 Exchange::ptr selectExchange(const std::string &name)
246 {
247 std::unique_lock<std::mutex> lock(_mutex);
248 auto it = _exchanges.find(name);
249 if (it == _exchanges.end())
250 return Exchange::ptr();
251 return it->second;
252 }
256 bool exists(const std::string &name)
257 {
258 std::unique_lock<std::mutex> lock(_mutex);
259 auto it = _exchanges.find(name);
260 if (it == _exchanges.end())
261 return false;
262 return true;
263 }
265 void clear()
266 {
267 std::unique_lock<std::mutex> lock(_mutex);
269 _exchanges.clear();
270 }
273 size_t size()
274 {
275 std::unique_lock<std::mutex> lock(_mutex);
276 return _exchanges.size();
277 }
278
279 private:
280 std::mutex _mutex;
283 };
284}
交换机数据内存管理类
Definition exchange.hpp:193
bool declareExchange(const std::string &name, ExchangeType type, bool durable, bool auto_delete, const google::protobuf::Map< std::string, std::string > &args)
声明交换机
Definition exchange.hpp:210
ExchangeMapper _mapper
持久化交换机管理类
Definition exchange.hpp:281
std::mutex _mutex
互斥锁
Definition exchange.hpp:280
void clear()
清除所有交换机数据
Definition exchange.hpp:265
ExchangeManager(const std::string &dbfile)
交换机数据内存管理类 构造函数 从数据库中恢复数据
Definition exchange.hpp:198
bool exists(const std::string &name)
判断交换机是否存在
Definition exchange.hpp:256
void deleteExchange(const std::string &name)
删除交换机
Definition exchange.hpp:232
Exchange::ptr selectExchange(const std::string &name)
获取指定交换机
Definition exchange.hpp:245
std::shared_ptr< ExchangeManager > ptr
交换机数据内存管理指针
Definition exchange.hpp:195
ExchangeMap _exchanges
全部交换机信息
Definition exchange.hpp:282
size_t size()
获取交换机数量
Definition exchange.hpp:273
Definition exchange.hpp:84
void createTable()
创建一张表
Definition exchange.hpp:98
void removeTable()
移除一张表
Definition exchange.hpp:111
SqliteHelper _sql_helper
数据库操作对象
Definition exchange.hpp:188
static int selectCallback(void *arg, int numcol, char **row, char **fields)
select语句的回调函数 将获取到的数据存入参数中
Definition exchange.hpp:173
bool remove(const std::string &name)
移除一个交换机
Definition exchange.hpp:142
bool insert(Exchange::ptr &exchange)
新增一个交换机
Definition exchange.hpp:125
ExchangeMap recovery()
获取所有交换机 从数据库加载到内存
Definition exchange.hpp:158
ExchangeMapper(const std::string &dbfile)
交换机持久化管理类 构造函数
Definition exchange.hpp:89
static bool createDirectory(const std::string &pathname)
创建目录
Definition helper.hpp:414
static std::string parentDirectory(const std::string &filename)
获取文件的父目录
Definition helper.hpp:357
SQLite 数据库操作助手类
Definition helper.hpp:58
bool exec(const std::string &sql, SqliteCallback cb, void *arg)
执行 SQL 语句
Definition helper.hpp:109
bool open(int safe_level=SQLITE_OPEN_FULLMUTEX)
打开数据库
Definition helper.hpp:90
static size_t split(const std::string &str, const std::string &sep, std::vector< std::string > &result)
将字符串分割为多个子字符串
Definition helper.hpp:152
Definition channel.hpp:22
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::unordered_map< std::string, Exchange::ptr > ExchangeMap
交换机映射表 交换机名称->交换机对象指针
Definition exchange.hpp:80
ExchangeType
Definition msg.pb.h:66
交换机结构体对象
Definition exchange.hpp:23
bool auto_delete
自动删除标志
Definition exchange.hpp:28
void setArgs(const std::string &str_args)
解析字符串并存储到映射成员中
Definition exchange.hpp:52
std::shared_ptr< Exchange > ptr
使用智能指针管理交换机对象
Definition exchange.hpp:24
Exchange(const std::string &ename, ExchangeType etype, bool edurable, bool eauto_delete, const google::protobuf::Map< std::string, std::string > &eargs)
交换机结构构造函数
Definition exchange.hpp:39
std::string name
交换机名称
Definition exchange.hpp:25
ExchangeType type
交换机类型
Definition exchange.hpp:26
std::string getArgs()
将映射成员转化为字符串
Definition exchange.hpp:68
google::protobuf::Map< std::string, std::string > args
其他参数
Definition exchange.hpp:29
bool durable
数据持久化标志
Definition exchange.hpp:27
Exchange()
无参构造
Definition exchange.hpp:32