当前位置:网站首页>学习Muduo中ChatRoom实现的各种细节和思考
学习Muduo中ChatRoom实现的各种细节和思考
2022-07-26 18:43:00 【炸毛疯兔】
Muduo中ChatRoom
1.长连接中的分包解决
- 固定消息长度, 比如说固定消息长度是16字节
- 使用特殊的字符作为分割符号,比如说 HTTP协议中的\r\n作为头部的分割符
- 每个消息头部加上长度字段,这在很多场景都用到过,比如之前的
LEVELDB- 消息本身的格式进行分包
这个聊天室的代码使用了加上长度字段作为消息的分割
2.消息格式
消息本身就是字符串,前4字节的头部存放字符串的长度。
消息之间没有分割符,也没有\0
3.使用间接层帮助收发消息
消息从
onMessage函数中读取出来,但是我们并不希望用户还要处理这么麻烦的一些格式等因此抽象了一层收发数据(封装成为string类型)
构造函数传入回调函数,调用server, or. client中的对于消息的处理的函数
#ifndef MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
#define MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
#include "muduo/base/Logging.h"
#include "muduo/net/Buffer.h"
#include "muduo/net/Endian.h"
#include "muduo/net/TcpConnection.h"
class LengthHeaderCodec : muduo::noncopyable
{
public:
//回调函数,处理完之后
typedef std::function<void (const muduo::net::TcpConnectionPtr&,
const muduo::string& message,
muduo::Timestamp)> StringMessageCallback;
explicit LengthHeaderCodec(const StringMessageCallback& cb)
: messageCallback_(cb)
{
}
//server或者client收发数据中最先调用的函数,进行原始buffer的处理
//封装成为string后调用回调函数
void onMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buf,
muduo::Timestamp receiveTime)
{
while (buf->readableBytes() >= kHeaderLen) // kHeaderLen == 4
{
// FIXME: use Buffer::peekInt32()
const void* data = buf->peek();
int32_t be32 = *static_cast<const int32_t*>(data); // SIGBUS
const int32_t len = muduo::net::sockets::networkToHost32(be32);
if (len > 65536 || len < 0)
{
LOG_ERROR << "Invalid length " << len;
conn->shutdown(); // FIXME: disable reading
break;
}
else if (buf->readableBytes() >= len + kHeaderLen)
{
buf->retrieve(kHeaderLen); //readIdx跳过消息头部的4个字节
muduo::string message(buf->peek(), len); //封装提取到的消息
messageCallback_(conn, message, receiveTime); //回调函数,将封装好的string发送过去
buf->retrieve(len); //跳过buffer中的消息,表示这个已经读取完了
}
else
{
break;
}
}
}
// FIXME: TcpConnectionPtr
void send(muduo::net::TcpConnection* conn,
const muduo::StringPiece& message)
{
muduo::net::Buffer buf;
buf.append(message.data(), message.size());
int32_t len = static_cast<int32_t>(message.size());
int32_t be32 = muduo::net::sockets::hostToNetwork32(len);
buf.prepend(&be32, sizeof be32); //将长度放到buffer 的前8个字节
conn->send(&buf);
}
private:
StringMessageCallback messageCallback_;
const static size_t kHeaderLen = sizeof(int32_t);
};
#endif // MUDUO_EXAMPLES_ASIO_CHAT_CODEC_H
4.server端的实现
使用
set将对于不同的客户端的连接进行封装起来,发送消息的时候直接遍历容器发送
typedef std::set<TcpConnectionPtr> ConnectionList
- 其中回调函数
onStringMessage就是服务端onMessage接受到可读后进行处理,封装消息成为string之后回调的函数,用于处理服务端的逻辑,只需要关注消息发送即可
#include "examples/asio/chat/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include <set>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
class ChatServer : noncopyable
{
public:
ChatServer(EventLoop* loop,
const InetAddress& listenAddr)
: server_(loop, listenAddr, "ChatServer"),
codec_(std::bind(&ChatServer::onStringMessage, this, _1, _2, _3))
{
server_.setConnectionCallback(
std::bind(&ChatServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
}
void start()
{
server_.start();
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
if (conn->connected())
{
connections_.insert(conn);
}
else
{
connections_.erase(conn);
}
}
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
for (ConnectionList::iterator it = connections_.begin();
it != connections_.end();
++it)
{
codec_.send(get_pointer(*it), message);
}
}
typedef std::set<TcpConnectionPtr> ConnectionList;
TcpServer server_;
LengthHeaderCodec codec_;
ConnectionList connections_;
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 1)
{
EventLoop loop;
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
InetAddress serverAddr(port);
ChatServer server(&loop, serverAddr);
server.start();
loop.loop();
}
else
{
printf("Usage: %s port\n", argv[0]);
}
}
5.client端的实现
其中client端同server端的流程也是大概相似的,还是调用回调函数进行处理
不需要关注于格式问题,能够进行发送肯定是已经封装好了(由codec.h帮助实现)
#include "examples/asio/chat/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/net/TcpClient.h"
#include <iostream>
#include <stdio.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
class ChatClient : noncopyable
{
public:
ChatClient(EventLoop* loop, const InetAddress& serverAddr)
: client_(loop, serverAddr, "ChatClient"),
codec_(std::bind(&ChatClient::onStringMessage, this, _1, _2, _3))
{
client_.setConnectionCallback(
std::bind(&ChatClient::onConnection, this, _1));
client_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
client_.enableRetry();
}
void connect()
{
client_.connect();
}
void disconnect()
{
client_.disconnect();
}
void write(const StringPiece& message)
{
MutexLockGuard lock(mutex_);
if (connection_)
{
codec_.send(get_pointer(connection_), message);
}
}
private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
MutexLockGuard lock(mutex_);
if (conn->connected())
{
connection_ = conn;
}
else
{
connection_.reset();
}
}
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
printf("<<< %s\n", message.c_str());
}
TcpClient client_;
LengthHeaderCodec codec_;
MutexLock mutex_;
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 2)
{
EventLoopThread loopThread;
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
InetAddress serverAddr(argv[1], port);
ChatClient client(loopThread.startLoop(), serverAddr);
client.connect();
std::string line;
while (std::getline(std::cin, line))
{
client.write(line);
}
client.disconnect();
CurrentThread::sleepUsec(1000*1000); // wait for disconnect, see ace/logging/client.cc
}
else
{
printf("Usage: %s host_ip port\n", argv[0]);
}
}
边栏推荐
- Wechat applet plug-in -- wxml to canvas (generate pictures)
- 软件测试——自动化测试框架有哪些?
- Difficult performance problems solved in those years -- ext4 defragmentation
- Deeply analyze the execution process of worker threads in the thread pool through the source code
- 测试面试题集-UI自动化测试
- 基于华为云 IOT 设计智能称重系统 (STM32)【二】结尾有资料
- Principle analysis and source code interpretation of service discovery
- 【实习经验】日期校验
- [PHP] save session data to redis
- LeetCode每日一练 —— 26. 删除有序数组中的重复项
猜你喜欢

Talk about how to use redis to realize distributed locks?

Leetcode-138-copy linked list with random pointer

中天钢铁在 GPS、 AIS 调度中使用 TDengine

带你熟悉云网络的“电话簿”:DNS

2022/07/26 learning notes (day16) linked list and stack

LeetCode每日一练 —— 26. 删除有序数组中的重复项

Don't casually pass the request to the asynchronous thread. You can't handle it. You have to use the startasync method

Redis6

LeetCode每日一练 —— 189. 轮转数组

Conda+pytorch environment tutorial
随机推荐
SEO, client rendering ', server rendering, search engine understanding
NLP learning path
Three paradigms of database design
Deeply analyze the execution process of worker threads in the thread pool through the source code
【OBS】Dropped Frames And General Connection Issues
上半年住户存款增加10.33万亿元,平均每天约571亿存款涌向银行
EN 1504-7 products for protection and repair of concrete structures corrosion prevention of reinforcement - CE certification
手机app测试用例怎么写?手机app测试点有哪些?
What is federated graph machine learning? A summary of the latest "federal map machine learning: concepts, techniques, and Applications" at the University of Virginia
The authentication type 10 is not supported
Linear algebra Chapter 3 vector
Redis6
Do you know the difference between safety test, functional test and penetration test?
【实习经验】异常处理与访问url结果响应数据处理
CIO guide to business change
博客维护记录之图片预览嵌入位置问题
[PHP] common header definitions
LeetCode每日一练 —— 26. 删除有序数组中的重复项
Talk about how to use redis to realize distributed locks?
PyQt5快速开发与实战 3.6 打包资源文件