Muduo
Muduo是陈硕大佬开发的,一个基于非阻塞IO和事件驱动的C++高并发网络编程库
这是一个基于主从Reactor模型的网络编程库,线程模型是one loop per thread
意思是一个线程只能有一个事件循环(event loop), 用于响应计时器和事件
一个文件描述符只能由一个线程进行读写,也就是一个Tcp连接,必须归属于一个EventLoop管理
基本逻辑是这样的

所谓的主从Reactor就是,在主线程中有一个主Reactor进行事件触发,而在其他其他线程中就是普通的Reactor进行事件触发
在主线程中主要任务就是监控新连接的到来,保证尽可能高效的获取新建连接,再按照负载均衡的方式分发到普通Reactor的线程中,对对应的IO事件进行监控
主从Reactor必然是一个多执行流的并发模式,也就是one thread one loop
Server常见接口
TcpServer类
这个类用来生成服务器对象
==构造函数是这样的==
1 2 3 4
| TcpServer(EventLoop *loop, const InetAddress &listenAddr, const string &nameArg, Option option = kNoReusePort);
|
第一个参数loop对象在下面来介绍
第二个参数是一个地址信息,结构是这样的
1 2 3 4 5
| class InetAddress : public muduo::copyable { public: InetAddress(StringArg ip, uint16_t port, bool ipv6 = false); };
|
包含了ip和端口两个信息,是服务器需要监听和绑定的地址
第三个参数是字符串的名称
最后一个参数是一个选项,是否启用端口重用的功能
1 2 3 4 5
| enum Option { kNoReusePort, kReusePort, };
|
我们直到当请求释放连接时,会有一段time wait状态,这段时间内同一个端口号是无法再次被绑定使用的
打开端口重用就可以解除这个限制,继续使用这个端口号
==setThreadNum()==
1
| void setThreadNum(int numThreads);
|
这个成员函数就是用于设置从属Reactor线程的数量的
==Start()==
启动事件监听
==setConnectionCallback()==
这是一个回调函数,需要我们自己来编写
1 2 3 4 5
| typedef std::function<void(const TcpConnectionPtr &)> ConnectionCallback; void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }
|
当连接建立成功时,会调用这个回调接口,完成我们的功能
==setMessageCallback()==
这也是一个回调函数,是用于业务处理的回调函数
1 2 3 4 5 6 7 8
| typedef std::function<void(const TcpConnectionPtr &, Buffer *, Timestamp)> MessageCallback; void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }
|
当收到连接的新的消息的时候,调用的函数
EventLoop类
这个类主要是用于事件监控和业务处理,在构造TcpServer之前,就需要构造这个EventLoop对象,最重要的就是loop成员函数
TcpConnection类
connected()和disconnect()是用于查看连接状态的
send()是用于发送数据的
服务器搭建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| #include "muduo/include/muduo/net/TcpServer.h" #include "muduo/include/muduo/net/EventLoop.h" #include "muduo/include/muduo/net/TcpConnection.h" #include "../logs/Xulog.h" #include "TransLate.hpp" #include <iostream> #include <functional> #include <unordered_map>
class TranslateServer { public: TranslateServer(int port) : _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "TranslateServer", muduo::net::TcpServer::kReusePort) { auto func_1 = std::bind(&TranslateServer::onConnection, this, std::placeholders::_1); auto func_2 = std::bind(&TranslateServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); _server.setConnectionCallback(func_1); _server.setMessageCallback(func_2); }
void start() { _server.start(); _baseloop.loop(); }
private: void onConnection(const muduo::net::TcpConnectionPtr &conn) { if (conn->connected()) { INFO("新连接建立成功!"); } else { INFO("连接关闭!"); } } std::string translate(const std::string &str) { return Translate(str, "en", "zh"); } void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp) { std::string str = buf->retrieveAllAsString(); std::string resp = translate(str); conn->send(resp); }
private: muduo::net::EventLoop _baseloop; muduo::net::TcpServer _server; };
int main() { TranslateServer server(8085); server.start(); return 0; }
|
这里我们使用了百度翻译的api,可以将英文翻译成中文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| #include <iostream> #include <string> #include <curl/curl.h> #include <cstdlib> #include <cstring> #include <openssl/evp.h> #include <openssl/err.h> #include <jsoncpp/json/json.h> #include <sstream> #include <iomanip>
static size_t WriteCallback(void *contents, size_t size, size_t nmemb, std::string *userp) { size_t total_size = size * nmemb; userp->append(static_cast<char *>(contents), total_size); return total_size; }
std::string generateSign(const std::string &appid, const std::string &q, const std::string &salt, const std::string &secret_key) { std::string sign = appid + q + salt + secret_key; unsigned char md[EVP_MAX_MD_SIZE]; unsigned int md_len;
EVP_MD_CTX *ctx = EVP_MD_CTX_new(); if (!ctx) { std::cerr << "Failed to create context for MD5." << std::endl; return ""; }
if (EVP_DigestInit_ex(ctx, EVP_md5(), nullptr) != 1 || EVP_DigestUpdate(ctx, sign.c_str(), sign.length()) != 1 || EVP_DigestFinal_ex(ctx, md, &md_len) != 1) { std::cerr << "Error generating MD5 digest." << std::endl; EVP_MD_CTX_free(ctx); return ""; }
EVP_MD_CTX_free(ctx);
char buf[33] = {0}; for (unsigned int i = 0; i < md_len; ++i) { sprintf(buf + i * 2, "%02x", md[i]); } return std::string(buf); }
std::string parseJsonResponse(const std::string &response) { Json::CharReaderBuilder reader; Json::Value jsonData; std::string errs;
std::istringstream s(response); if (Json::parseFromStream(reader, s, &jsonData, &errs)) { std::string from = jsonData["from"].asString(); std::string to = jsonData["to"].asString(); std::string translatedText = jsonData["trans_result"][0]["dst"].asString();
return translatedText; } else { std::cerr << "Failed to parse JSON: " << errs << std::endl; exit(0); } }
std::string urlEncode(const std::string &value) { std::ostringstream escaped; escaped << std::hex << std::setfill('0'); for (const char c : value) { if (isalnum(c) || c == '-' || c == '_' || c == '.' || c == '~') { escaped << c; } else { escaped << '%' << std::setw(2) << static_cast<int>(static_cast<unsigned char>(c)); } } return escaped.str(); }
std::string _translate(const std::string &appid, const std::string &secret_key, const std::string &q, const std::string &from, const std::string &to) { char salt[60]; sprintf(salt, "%d", rand());
std::string sign = generateSign(appid, q, salt, secret_key); std::string q_encoded = urlEncode(q); std::string myurl = "http://api.fanyi.baidu.com/api/trans/vip/translate?appid=" + appid + "&q=" + q_encoded + "&from=" + from + "&to=" + to + "&salt=" + salt + "&sign=" + sign; CURL *curl = curl_easy_init(); std::string response_string;
if (curl) { curl_easy_setopt(curl, CURLOPT_URL, myurl.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_string); CURLcode res = curl_easy_perform(curl); if (res != CURLE_OK) { std::cerr << "curl_easy_perform() failed: " << curl_easy_strerror(res) << std::endl; } curl_easy_cleanup(curl); } return response_string; }
enum { EnToZh, ZhToEn, Exit };
std::string Translate(std::string text_to_translate, std::string from, std::string to) { std::string appid = ""; std::string secret_key = ""; std::string response = _translate(appid, secret_key, text_to_translate, from, to); return parseJsonResponse(response); }
|
Client常见接口
TcpClient类
==构造函数==
1 2 3
| TcpClient(EventLoop *loop, const InetAddress &serverAddr, const string &nameArg);
|
这里需要传入一个Eventloop,我们的epoll监控就在这里面
第二个参数是要连接的服务器的地址信息
第三个参数是名称
==connect()==
这个成员函数是用来连接服务器的
==disconnect()==
停止连接
==stop()==
停止客户端运行
==connection()==
1 2 3 4 5
| TcpConnectionPtr connection() const { MutexLockGuard lock(mutex_); return connection_; }
|
这个成员函数是用来获取客户端通信连接的Connection对象的接口
这是一个非阻塞接口,调用之后直接返回,连接不一定建立完成,不能直接发送数据
如果需要在连接完成之后再做操作,需要调用内置的CountDownLatch
类中的wait()成员函数进行同步控制
==setConnectionCallback()==
1 2 3 4
| void setConnectionCallback(ConnectionCallback cb) { connectionCallback_ = std::move(cb); }
|
这是连接服务器成功时的回调函数
==setMessageCallback()==
1 2 3 4
| void setMessageCallback(MessageCallback cb) { messageCallback_ = std::move(cb); }
|
这是收到服务器发送的消息时的回调函数
客户端搭建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| #include "muduo/include/muduo/net/TcpClient.h" #include "muduo/include/muduo/net/EventLoopThread.h" #include "muduo/include/muduo/net/TcpConnection.h" #include "muduo/include/muduo/base/CountDownLatch.h" #include "../logs/Xulog.h" #include <iostream> #include <functional>
class TranslateClient { public: TranslateClient(const std::string &sip, int sport) : _lanch(1), _client(_loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "TranslateClient")
{ auto func_1 = std::bind(&TranslateClient::onConnection, this, std::placeholders::_1); auto func_2 = std::bind(&TranslateClient::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); _client.setConnectionCallback(func_1); _client.setMessageCallback(func_2); } void connect() { _client.connect(); _lanch.wait(); } bool send(const std::string &msg) { if (_conn->connected()) { _conn->send(msg); return true; } return false; }
private: void onConnection(const muduo::net::TcpConnectionPtr &conn) { if (conn->connected()) { _lanch.countDown(); _conn = conn; std::cout << "连接建立成功!" << std::endl; } else { _conn.reset(); std::cout << "连接已经断开!" << std::endl; } }
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp) { std::cout << "翻译完成啦,结果是: " << buf->retrieveAllAsString() << std::endl; }
private: muduo::CountDownLatch _lanch; muduo::net::EventLoopThread _loopthread; muduo::net::TcpClient _client; muduo::net::TcpConnectionPtr _conn; };
int main() { TranslateClient client("127.0.0.1", 8085); client.connect(); while (true) { std::string buffer; std::cin >> buffer; client.send(buffer); } return 0; }
|