当前位置:网站首页>学习Muduo中ChatRoom实现的各种细节和思考
学习Muduo中ChatRoom实现的各种细节和思考
2022-07-26 18:43:00 【炸毛疯兔】
Muduo中ChatRoom
1.长连接中的分包解决
- 固定消息长度, 比如说固定消息长度是16字节
- 使用特殊的字符作为分割符号,比如说 HTTP协议中的\r\n作为头部的分割符
- 每个消息头部加上长度字段,这在很多场景都用到过,比如之前的
LEVELDB- 消息本身的格式进行分包
这个聊天室的代码使用了加上长度字段作为消息的分割
2.消息格式
消息本身就是字符串,前4字节的头部存放字符串的长度。
消息之间没有分割符,也没有\0
3.使用间接层帮助收发消息
消息从
onMessage函数中读取出来,但是我们并不希望用户还要处理这么麻烦的一些格式等因此抽象了一层收发数据(封装成为string类型)
构造函数传入回调函数,调用server, or. client中的对于消息的处理的函数
#ifndef MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
#define MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
#include "muduo/base/Logging.h"
#include "muduo/net/Buffer.h"
#include "muduo/net/Endian.h"
#include "muduo/net/TcpConnection.h"
class LengthHeaderCodec : muduo::noncopyable
{
public:
//回调函数,处理完之后
typedef std::function<void (const muduo::net::TcpConnectionPtr&,
const muduo::string& message,
muduo::Timestamp)> StringMessageCallback;
explicit LengthHeaderCodec(const StringMessageCallback& cb)
: messageCallback_(cb)
{
}
//server或者client收发数据中最先调用的函数,进行原始buffer的处理
//封装成为string后调用回调函数
void onMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buf,
muduo::Timestamp receiveTime)
{
while (buf->readableBytes() >= kHeaderLen) // kHeaderLen == 4
{
// FIXME: use Buffer::peekInt32()
const void* data = buf->peek();
int32_t be32 = *static_cast<const int32_t*>(data); // SIGBUS
const int32_t len = muduo::net::sockets::networkToHost32(be32);
if (len > 65536 || len < 0)
{
LOG_ERROR << "Invalid length " << len;
conn->shutdown(); // FIXME: disable reading
break;
}
else if (buf->readableBytes() >= len + kHeaderLen)
{
buf->retrieve(kHeaderLen); //readIdx跳过消息头部的4个字节
muduo::string message(buf->peek(), len); //封装提取到的消息
messageCallback_(conn, message, receiveTime); //回调函数,将封装好的string发送过去
buf->retrieve(len); //跳过buffer中的消息,表示这个已经读取完了
}
else
{
break;
}
}
}
// FIXME: TcpConnectionPtr
void send(muduo::net::TcpConnection* conn,
const muduo::StringPiece& message)
{
muduo::net::Buffer buf;
buf.append(message.data(), message.size());
int32_t len = static_cast<int32_t>(message.size());
int32_t be32 = muduo::net::sockets::hostToNetwork32(len);
buf.prepend(&be32, sizeof be32); //将长度放到buffer 的前8个字节
conn->send(&buf);
}
private:
StringMessageCallback messageCallback_;
const static size_t kHeaderLen = sizeof(int32_t);
};
#endif // MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
4.server端的实现
使用
set将对于不同的客户端的连接进行封装起来,发送消息的时候直接遍历容器发送
typedef std::set<TcpConnectionPtr> ConnectionList
- 其中回调函数
onStringMessage就是服务端onMessage接受到可读后进行处理,封装消息成为string之后回调的函数,用于处理服务端的逻辑,只需要关注消息发送即可
#include "examples/asio/chat/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include <set>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
class ChatServer : noncopyable
{
public:
ChatServer(EventLoop* loop,
const InetAddress& listenAddr)
: server_(loop, listenAddr, "ChatServer"),
codec_(std::bind(&ChatServer::onStringMessage, this, _1, _2, _3))
{
server_.setConnectionCallback(
std::bind(&ChatServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected())
{
connections_.insert(conn);
}
else
{
connections_.erase(conn);
}
}
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
for (ConnectionList::iterator it = connections_.begin();
it != connections_.end();
++it)
{
codec_.send(get_pointer(*it), message);
}
}
typedef std::set<TcpConnectionPtr> ConnectionList;
TcpServer server_;
LengthHeaderCodec codec_;
ConnectionList connections_;
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 1)
{
EventLoop loop;
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
InetAddress serverAddr(port);
ChatServer server(&loop, serverAddr);
server.start();
loop.loop();
}
else
{
printf("Usage: %s port\n", argv[0]);
}
}
5.client端的实现
其中client端同server端的流程也是大概相似的,还是调用回调函数进行处理
不需要关注于格式问题,能够进行发送肯定是已经封装好了(由codec.h帮助实现)
#include "examples/asio/chat/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/net/TcpClient.h"
#include <iostream>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
class ChatClient : noncopyable
{
public:
ChatClient(EventLoop* loop, const InetAddress& serverAddr)
: client_(loop, serverAddr, "ChatClient"),
codec_(std::bind(&ChatClient::onStringMessage, this, _1, _2, _3))
{
client_.setConnectionCallback(
std::bind(&ChatClient::onConnection, this, _1));
client_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
client_.enableRetry();
}
void connect()
{
client_.connect();
}
void disconnect()
{
client_.disconnect();
}
void write(const StringPiece& message)
{
MutexLockGuard lock(mutex_);
if (connection_)
{
codec_.send(get_pointer(connection_), message);
}
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
MutexLockGuard lock(mutex_);
if (conn->connected())
{
connection_ = conn;
}
else
{
connection_.reset();
}
}
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
printf("<<< %s\n", message.c_str());
}
TcpClient client_;
LengthHeaderCodec codec_;
MutexLock mutex_;
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 2)
{
EventLoopThread loopThread;
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
InetAddress serverAddr(argv[1], port);
ChatClient client(loopThread.startLoop(), serverAddr);
client.connect();
std::string line;
while (std::getline(std::cin, line))
{
client.write(line);
}
client.disconnect();
CurrentThread::sleepUsec(1000*1000); // wait for disconnect, see ace/logging/client.cc
}
else
{
printf("Usage: %s host_ip port\n", argv[0]);
}
}
边栏推荐
- SEO、客户端渲染‘、服务端渲染、搜索引擎的理解
- EN 1504-7 products for protection and repair of concrete structures corrosion prevention of reinforcement - CE certification
- Aof & RDB of J2 redis
- 中信建投启牛学堂开户是安全的吗,启牛是干嘛的
- JS中的 作用域
- 论文精读:YOLOV2——YOLO9000:Better, Faster, Stronger
- Volatile keyword of JVM memory model
- LeetCode每日一练 —— 88. 合并两个有序数组
- 这22个绘图(可视化)方法很重要,值得收藏!
- torch. Usage and comparison of unsqueeze() squeeze() expand() repeat()
猜你喜欢

Software process that testers must know

Pychart loads CONDA to create a pytorch virtual environment and reports an error. It is normal on the CONDA command line

LeetCode每日一练 —— 88. 合并两个有序数组

Still using xshell? You are out. I recommend a more modern terminal connection tool

YOLO V1详解

【PHP】将 SESSION 数据保存到 Redis

Machine learning notes - building a recommendation system (6) six automatic encoders for collaborative filtering

LeetCode每日一练 —— 189. 轮转数组

这22个绘图(可视化)方法很重要,值得收藏!

还在用Xshell?你out了,推荐一个更现代的终端连接工具
随机推荐
Server memory failure prediction can actually do this
YOLO V2详解
Intensive reading of the paper: yolov2 - yolo9000: better, faster, stronger
Fair lock process of reentrantlock learning
Bug 反馈:同步失败
DDL, DQL, DML statements
cuda11.2对应pytorch安装
What is federated graph machine learning? A summary of the latest "federal map machine learning: concepts, techniques, and Applications" at the University of Virginia
IM即时通讯开发如何压缩移动网络下APP的流量消耗
Familiarize you with the "phone book" of cloud network: DNS
基于华为云 IOT 设计智能称重系统 (STM32)【一】
Linux regularly backs up the database and deletes the data n days ago
Machine learning notes - building a recommendation system (6) six automatic encoders for collaborative filtering
EN 1504-7 products for protection and repair of concrete structures corrosion prevention of reinforcement - CE certification
Win11 U盘驱动异常怎么调整为正常?
什么是联邦图机器学习?弗吉尼亚大学最新《联邦图机器学习:概念、技术和应用》综述
J3: redis master-slave replication
Software process that testers must know
线性代数第3章向量
[PHP] MySQL native PHP operation - Tianlong eight steps