Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
binding.hpp
浏览该文件的文档.
1
12#pragma once
13#include "../common/logger.hpp"
14#include "../common/helper.hpp"
15#include "../common/msg.pb.h"
16#include <iostream>
17#include <unordered_map>
18#include <mutex>
19#include <memory>
20
21namespace XuMQ
22{
25 struct Binding
26 {
27 using ptr = std::shared_ptr<Binding>;
28 std::string exchange_name;
29 std::string msgqueue_name;
30 std::string binding_key;
37 Binding(const std::string &ename, const std::string &qname, const std::string &key)
38 : exchange_name(ename), msgqueue_name(qname), binding_key(key) {}
39 };
43 using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;
55 using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;
59 {
60 public:
64 BindingMapper(const std::string &dbfile)
65 : _sql_helper(dbfile)
66 {
67 std::string path = FileHelper::parentDirectory(dbfile);
69 assert(_sql_helper.open());
71 }
74 {
75 const char *CREATE_TABLE = "create table if not exists binding_table(exchange_name varchar(32), msgqueue_name varchar(32), binding_key varchar(128));";
76 bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);
77 if (ret == false)
78 {
79 fatal(logger, "创建绑定信息数据库表失败!");
80 abort();
81 }
82 }
85 {
86 const char *DROP_TABLE = "drop table if exists binding_table;";
87 bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);
88 if (ret == false)
89 {
90 fatal(logger, "删除绑定信息数据库表失败!");
91 abort();
92 }
93 }
97 bool insert(Binding::ptr &binding)
98 {
99 const char *INSERT_SQL = "insert into binding_table values('%s', '%s', '%s');";
100 char sql_str[4096] = {0};
101 sprintf(sql_str, INSERT_SQL, binding->exchange_name.c_str(),
102 binding->msgqueue_name.c_str(), binding->binding_key.c_str());
103 bool ret = _sql_helper.exec(sql_str, nullptr, nullptr);
104 if (ret == false)
105 {
106 error(logger, "数据库: 插入绑定信息失败!");
107 return false;
108 }
109 return true;
110 }
115 bool remove(const std::string &ename, const std::string &qname)
116 {
117 const char *DELETE_SQL = "delete from binding_table where exchange_name = '%s' \
118 and msgqueue_name = '%s';";
119 char sql_str[4096] = {0};
120 sprintf(sql_str, DELETE_SQL, ename.c_str(), qname.c_str());
121 bool ret = _sql_helper.exec(sql_str, nullptr, nullptr);
122 if (ret == false)
123 {
124 error(logger, "数据库: 删除绑定信息失败!");
125 return false;
126 }
127 return true;
128 }
132 bool removeExchangeBindings(const std::string &ename)
133 {
134 const char *DELETE_SQL = "delete from binding_table where exchange_name = '%s';";
135 char sql_str[4096] = {0};
136 sprintf(sql_str, DELETE_SQL, ename.c_str());
137 bool ret = _sql_helper.exec(sql_str, nullptr, nullptr);
138 if (ret == false)
139 {
140 error(logger, "数据库: 删除交换机绑定信息失败!");
141 return false;
142 }
143 return true;
144 }
148 bool removeQueueBindings(const std::string &qname)
149 {
150 const char *DELETE_SQL = "delete from binding_table where msgqueue_name = '%s';";
151 char sql_str[4096] = {0};
152 sprintf(sql_str, DELETE_SQL, qname.c_str());
153 bool ret = _sql_helper.exec(sql_str, nullptr, nullptr);
154 if (ret == false)
155 {
156 error(logger, "数据库: 删除消息队列绑定信息失败!");
157 return false;
158 }
159 return true;
160 }
165 {
166 BindingMap result;
167 const char *SELECT_SQL = "select * from binding_table;";
168 _sql_helper.exec(SELECT_SQL, selectCallback, &result);
169 return result;
170 }
171
172 private:
179 static int selectCallback(void *arg, int numcol, char **row, char **fields)
180 {
181 BindingMap *result = (BindingMap *)arg;
182 Binding::ptr bp = std::make_shared<Binding>(row[0], row[1], row[2]);
183 MsgQueueBindingMap &qmap = (*result)[bp->exchange_name];
188 qmap.insert(std::make_pair(bp->msgqueue_name, bp));
189 return 0;
190 }
191
192 private:
194 };
198 {
199 public:
200 using ptr = std::shared_ptr<BindingManager>;
203 BindingManager(const std::string &dbfile) : _mapper(dbfile) {
205 }
213 bool bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable)
214 {
215 // 加锁 构造一个队列信息绑定对象 添加映射关系
216 std::unique_lock<std::mutex> lock(_mutex);
217 auto it = _bindings.find(ename);
218 if (it != _bindings.end() && it->second.find(qname) != it->second.end()) // 绑定信息已经存在
219 return true;
220 Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);
221 if (durable)
222 {
223 bool ret = _mapper.insert(bp);
224 if (ret == false)
225 return false;
226 }
227 auto &qbmap = _bindings[ename];
228 qbmap.insert(std::make_pair(qname, bp));
229 return true;
230 }
234 void unbind(const std::string &ename, const std::string &qname)
235 {
236 std::unique_lock<std::mutex> lock(_mutex);
237 auto eit = _bindings.find(ename);
238 if (eit == _bindings.end()) // 没有交换机的绑定信息
239 return;
240 auto qit = eit->second.find(qname); // 没有交换机对应队列的绑定信息
241 if (qit == eit->second.end())
242 return;
243 _mapper.remove(ename, qname);
244 _bindings[ename].erase(qname);
245 }
248 void removeExchangeBindings(const std::string &ename)
249 {
250 std::unique_lock<std::mutex> lock(_mutex);
252 _bindings.erase(ename);
253 }
256 void removeMsgQueueBindings(const std::string &qname)
257 {
258 std::unique_lock<std::mutex> lock(_mutex);
260 for (auto &binding : _bindings) // 遍历所有交换机
261 binding.second.erase(qname);
262 }
266 MsgQueueBindingMap getExchangeBindings(const std::string &ename)
267 {
268 std::unique_lock<std::mutex> lock(_mutex);
269 auto eit = _bindings.find(ename);
270 if (eit == _bindings.end())
271 return MsgQueueBindingMap();
272 return eit->second;
273 }
274
279 Binding::ptr getBinding(const std::string &ename, const std::string &qname)
280 {
281 std::unique_lock<std::mutex> lock(_mutex);
282 auto eit = _bindings.find(ename);
283 if (eit == _bindings.end())
284 return Binding::ptr();
285 auto qit = eit->second.find(qname);
286 if (qit == eit->second.end())
287 return Binding::ptr();
288 return qit->second;
289 }
294 bool exists(const std::string &ename, const std::string &qname)
295 {
296 std::unique_lock<std::mutex> lock(_mutex);
297 auto eit = _bindings.find(ename);
298 if (eit == _bindings.end())
299 return false;
300 auto qit = eit->second.find(qname);
301 if (qit == eit->second.end())
302 return false;
303 return true;
304 }
307 size_t size()
308 {
309 size_t total_size = 0;
310 std::unique_lock<std::mutex> lock(_mutex);
311 for (auto &it : _bindings)
312 total_size += it.second.size();
313 return total_size;
314 }
316 void clear()
317 {
318 std::unique_lock<std::mutex> lock(_mutex);
320 _bindings.clear();
321 }
322
323 private:
324 std::mutex _mutex;
327 };
328}
绑定信息内存管理类
Definition binding.hpp:198
bool exists(const std::string &ename, const std::string &qname)
判断绑定信息是否存在
Definition binding.hpp:294
size_t size()
获取绑定信息数量
Definition binding.hpp:307
BindingMap _bindings
绑定映射表
Definition binding.hpp:326
MsgQueueBindingMap getExchangeBindings(const std::string &ename)
获取指定交换机的绑定信息
Definition binding.hpp:266
void removeExchangeBindings(const std::string &ename)
移除指定交换机的所有绑定信息
Definition binding.hpp:248
void removeMsgQueueBindings(const std::string &qname)
移除指定消息队列的所有绑定信息
Definition binding.hpp:256
void clear()
清除绑定信息
Definition binding.hpp:316
BindingMapper _mapper
绑定信息持久化管理类
Definition binding.hpp:325
std::mutex _mutex
互斥锁
Definition binding.hpp:324
void unbind(const std::string &ename, const std::string &qname)
解除绑定信息
Definition binding.hpp:234
BindingManager(const std::string &dbfile)
绑定信息数据内存管理类 构造函数 从数据库中恢复数据
Definition binding.hpp:203
bool bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable)
添加绑定信息
Definition binding.hpp:213
Binding::ptr getBinding(const std::string &ename, const std::string &qname)
获取绑定信息
Definition binding.hpp:279
std::shared_ptr< BindingManager > ptr
绑定信息内存管理类指针
Definition binding.hpp:200
绑定信息持久化管理类
Definition binding.hpp:59
static int selectCallback(void *arg, int numcol, char **row, char **fields)
select语句的回调函数 将获取到的数据存入参数中
Definition binding.hpp:179
bool insert(Binding::ptr &binding)
新增一个绑定信息
Definition binding.hpp:97
bool remove(const std::string &ename, const std::string &qname)
移除一个绑定信息
Definition binding.hpp:115
void removeTable()
移除一张表
Definition binding.hpp:84
bool removeExchangeBindings(const std::string &ename)
移除交换机绑定信息
Definition binding.hpp:132
bool removeQueueBindings(const std::string &qname)
移除消息队列绑定信息
Definition binding.hpp:148
SqliteHelper _sql_helper
数据库操作对象
Definition binding.hpp:193
void createTable()
创建一张表
Definition binding.hpp:73
BindingMap recovery()
获取所有绑定信息 从数据库加载到内存
Definition binding.hpp:164
BindingMapper(const std::string &dbfile)
绑定信息持久化管理类 构造函数
Definition binding.hpp:64
static bool createDirectory(const std::string &pathname)
创建目录
Definition helper.hpp:414
static std::string parentDirectory(const std::string &filename)
获取文件的父目录
Definition helper.hpp:357
SQLite 数据库操作助手类
Definition helper.hpp:58
bool exec(const std::string &sql, SqliteCallback cb, void *arg)
执行 SQL 语句
Definition helper.hpp:109
bool open(int safe_level=SQLITE_OPEN_FULLMUTEX)
打开数据库
Definition helper.hpp:90
Definition channel.hpp:22
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::unordered_map< std::string, MsgQueueBindingMap > BindingMap
绑定映射表-—交换机->消息队列绑定映射表的映射表
Definition binding.hpp:55
std::unordered_map< std::string, Binding::ptr > MsgQueueBindingMap
消息队列绑定映射表-—消息队列->绑定信息的映射表
Definition binding.hpp:43
绑定信息结构体
Definition binding.hpp:26
Binding()
无参构造
Definition binding.hpp:32
Binding(const std::string &ename, const std::string &qname, const std::string &key)
绑定信息结构体 构造函数
Definition binding.hpp:37
std::string exchange_name
交换机名称
Definition binding.hpp:28
std::string binding_key
绑定关键字
Definition binding.hpp:30
std::shared_ptr< Binding > ptr
绑定信息指针
Definition binding.hpp:27
std::string msgqueue_name
消息队列名称
Definition binding.hpp:29