Message-Queues beta 1.1
A Message-Queues based Cpp
 
载入中...
搜索中...
未找到
connection.hpp
浏览该文件的文档.
1
9#pragma once
10
11#include "muduo/protobuf/dispatcher.h"
12#include "muduo/protobuf/codec.h"
13
14#include "muduo/base/Mutex.h"
15#include "muduo/base/Logging.h"
16#include "muduo/net/EventLoop.h"
17#include "muduo/net/TcpClient.h"
18#include "muduo/net/EventLoopThread.h"
19#include "muduo/base/CountDownLatch.h"
20#include "channel.hpp"
21#include "worker.hpp"
22#include "../common/logger.hpp"
23
24namespace XuMQ
25{
29 {
30 public:
31 using ptr = std::shared_ptr<Connection>;
36 Connection(const std::string &sip, int sport, const AsyncWorker::ptr &worker)
37 : _latch(1), _client(worker->_loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"),
38 _dispatcher(std::bind(&Connection::onUnknowMessage, this,
39 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
40 _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,
41 std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
42 _worker(worker), _channels(std::make_shared<ChannelManager>())
43 {
44 _dispatcher.registerMessageCallback<basicResponse>(std::bind(&Connection::commonResponse, this, std::placeholders::_1,
45 std::placeholders::_2, std::placeholders::_3));
46 _dispatcher.registerMessageCallback<basicConsumeResponse>(std::bind(&Connection::consumeResponse, this, std::placeholders::_1,
47 std::placeholders::_2, std::placeholders::_3));
48 _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec, std::placeholders::_1,
49 std::placeholders::_2, std::placeholders::_3));
50 _client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));
51 _client.connect();
52 _latch.wait();
53 }
57 {
58 Channel::ptr channel = _channels->create(_conn, _codec);
59 bool ret = channel->openChannel();
60 if (ret == false)
61 {
62 error(logger, "打开信道失败!");
63 return Channel::ptr();
64 }
65 return channel;
66 }
69 void closeChannel(const Channel::ptr &channel)
70 {
71 channel->closeChannel();
72 _channels->remove(channel->cid());
73 }
74
75 private:
80 void commonResponse(const muduo::net::TcpConnectionPtr &conn, const basicResponsePtr message, muduo::Timestamp)
81 {
82 // 找到信道
83 Channel::ptr channel = _channels->get(message->cid());
84 if (channel == nullptr)
85 {
86 error(logger, "未找到信道!");
87 return;
88 }
89 // 将得到的响应对象 添加到信道的基础响应中
90 channel->putBasicResponse(message);
91 }
96 void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr message, muduo::Timestamp)
97 {
98 // 找到信道
99 Channel::ptr channel = _channels->get(message->cid());
100 if (channel == nullptr)
101 {
102 error(logger, "未找到信道!");
103 return;
104 }
105 // 封装异步任务交给线程池
106 _worker->_threadpool.push([channel, message]()
107 { channel->consume(message); });
108 }
111 void onConnection(const muduo::net::TcpConnectionPtr &conn)
112 {
113 if (conn->connected())
114 {
115 _latch.countDown();
116 _conn = conn;
117 }
118 else
119 {
120 _conn.reset();
121 }
122 }
127 void onUnknowMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
128 {
129 info(logger, "onUnknowMessage: %s", message->GetTypeName());
130 conn->shutdown();
131 }
132
133 private:
134 muduo::CountDownLatch _latch;
135 muduo::net::TcpConnectionPtr _conn;
136 muduo::net::TcpClient _client;
137 ProtobufDispatcher _dispatcher;
141 };
142
143}
std::shared_ptr< AsyncWorker > ptr
异步工作线程管理句柄
Definition worker.hpp:17
std::shared_ptr< Channel > ptr
信道句柄
Definition channel.hpp:32
信道管理类
Definition channel.hpp:317
std::shared_ptr< ChannelManager > ptr
信道管理句柄
Definition channel.hpp:319
连接管理模块
Definition connection.hpp:29
ProtobufCodecPtr _codec
协议处理器
Definition connection.hpp:138
Channel::ptr openChannel()
创建信道
Definition connection.hpp:56
ProtobufDispatcher _dispatcher
请求分发器
Definition connection.hpp:137
void closeChannel(const Channel::ptr &channel)
关闭信道
Definition connection.hpp:69
Connection(const std::string &sip, int sport, const AsyncWorker::ptr &worker)
连接构造函数
Definition connection.hpp:36
std::shared_ptr< Connection > ptr
连接管理句柄
Definition connection.hpp:31
muduo::CountDownLatch _latch
实现同步
Definition connection.hpp:134
void onConnection(const muduo::net::TcpConnectionPtr &conn)
连接回调函数
Definition connection.hpp:111
muduo::net::TcpClient _client
客户端句柄
Definition connection.hpp:136
ChannelManager::ptr _channels
信道管理句柄
Definition connection.hpp:140
AsyncWorker::ptr _worker
异步线程工作器
Definition connection.hpp:139
void commonResponse(const muduo::net::TcpConnectionPtr &conn, const basicResponsePtr message, muduo::Timestamp)
处理一般响应的回调函数
Definition connection.hpp:80
muduo::net::TcpConnectionPtr _conn
客户端连接
Definition connection.hpp:135
void onUnknowMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp)
未知响应回调函数
Definition connection.hpp:127
void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr message, muduo::Timestamp)
处理消费响应的回调函数
Definition connection.hpp:96
Definition protocol.pb.h:2629
Definition protocol.pb.h:2834
Definition channel.hpp:22
Xulog::Logger::ptr logger
日志器的智能指针类型
Definition logger.hpp:24
std::shared_ptr< google::protobuf::Message > MessagePtr
消息句柄
Definition channel.hpp:23
std::shared_ptr< basicResponse > basicResponsePtr
其他响应句柄
Definition channel.hpp:26
std::shared_ptr< basicConsumeResponse > basicConsumeResponsePtr
消费响应句柄
Definition channel.hpp:25
std::shared_ptr< ProtobufCodec > ProtobufCodecPtr
协议处理句柄
Definition channel.hpp:24
信道与信道管理模块的头文件,包含了信道的声明和其管理类的定义。
异步工作线程模块