当前位置:网站首页>muduo源码剖析——以三个切片浅析muduo库代码设计的严谨性、高效性与灵活性
muduo源码剖析——以三个切片浅析muduo库代码设计的严谨性、高效性与灵活性
2022-06-10 17:47:00 【高自强的博客】
0 前言
陈硕大佬的muduo网络库的源码我已经看了好久了,奈何本人实力有限,每每看到其代码设计的精巧之处只能内心称赞,无法用言语表达出来。实在令人汗颜。最近在看到网络设计部分时有了一些体会,结合自己之前在网络编程方面的积累,特对代码设计中的一些精巧之处做一些总结。
就muduo在多线程并发服务器设计而言,除了其高效的并发服务架构之外,其在代码设计方面的高效性和灵活性可以从下面三个切片得以体现。
在这之前,首先说明muduo的并发服务器架构为multi-reactors+thread pool架构,其架构图如下:
简单而言,mainReactor负责处理新连接IO的创建和管理,subReactor+thread pool负责已连接IO的读写事件。其中,为了降低subReactor的压力、提高IO读写效率,可以添加多个subReactor。
1 切片一:EventLoop线程绑定与跨线程调用
muduo_net设计中,每一个EventLoop对象对应一个reactor,并且每个EventLoop对象绑定一个线程,EventLoop对象中的loop()在内的多数函数只能由该对象所属的线程调用,即EventLoop对象有严格的线程所属特性。某些函数可以由外部线程调用,可以起到激活睡眠或阻塞的EventLoop当前线程的作用。
首先看一下EventLoop类的成员变量(部分):
typedef std::vector<Channel*> ChannelList;
bool looping_; /* atomic */
bool quit_; /* atomic */
bool eventHandling_; /* atomic */
bool callingPendingFunctors_; /* atomic */
const pid_t threadId_; // 当前对象所属线程ID
Timestamp pollReturnTime_;
boost::scoped_ptr<Poller> poller_;
boost::scoped_ptr<TimerQueue> timerQueue_;
int wakeupFd_; // 用于保存eventfd创建的file descriptor--进程间通信
// unlike in TimerQueue, which is an internal class,
// we don't expose Channel to client.
// eventfd的wakeupFd_对应的通道 该通道将会纳入poller_来管理
boost::scoped_ptr<Channel> wakeupChannel_;// EventLoop对象负责wakeupChannel_对象的创建/生命周期
// Poller返回的活动通道
ChannelList activeChannels_;// 这些channel的生存期不由EventLoop管理和负责-->由TcpConnection和TimerQueue管理
Channel* currentActiveChannel_; // 当前正在处理的活动通道
MutexLock mutex_;
std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_
具体体现如下。
1.1 判断当前线程是否已经拥有所属EventLoop对象
主要通过下面两条实现。
1 使用线程局部变量记录当前线程是否已经拥有所属EventLoop对象
// 当前线程EventLoop对象指针
// 线程局部存储
__thread EventLoop* t_loopInThisThread = 0;// 指向EventLoop对象
2 在创建EventLoop对象前进行判断
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),// 创建一个eventfd
wakeupChannel_(new Channel(this, wakeupFd_)),// 创建一个通道 并将wawkeupFd_传进Channel
currentActiveChannel_(NULL)
{
LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
/* 如果当前线程已经创建了EventLoop对象,终止(LOG_FATAL) */
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;// 指向EventLoop对象
}
// 绑定eventfd的回调处理函数handleRead()
wakeupChannel_->setReadCallback(
boost::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
// 绑定eventfd对应事件--EPOLLIN可读事件
wakeupChannel_->enableReading();
}
1.2 执行函数前判断当前线程是否为EventLoop对象所属线程
通过assertInLoopThread()和isInLoopThread()函数进行判断。
void assertInLoopThread()
{
if (!isInLoopThread())
{
abortNotInLoopThread();
}
}
// ...
bool isInLoopThread() const {
return threadId_ == CurrentThread::tid(); }
1.3 部分函数可以跨线程调用以唤醒阻塞的EventLoop所属线程
以quit()函数为例,当要停止loop()循环时,此时EventLoop对象所属线程可能正处于poll的阻塞中,因此此时需要借助外部进程调用该quit()同时唤醒处于阻塞中的EventLoop对象所属线程,并退出poll循环。
// 该函数可以跨线程调用
void EventLoop::quit()
{
quit_ = true;
if (!isInLoopThread())
{
wakeup();
}
}
2 切片二:IO线程与计算线程的灵活调度
为了充分了利用CPU,在IO线程空间时,可以为loop()线程分配一些计算任务。因此,muduo_net既可以完成IO处理又能进行计算任务。
具体如下。
2.1 poll+handleEvent
该部分主要负责IO事件的监听、响应和处理。
// 事件循环,该函数不能跨线程调用
// 只能在创建该对象的线程中调用
void EventLoop::loop()
{
assert(!looping_);
// 断言当前处于创建该对象的线程中
assertInLoopThread();
looping_ = true;
quit_ = false;
LOG_TRACE << "EventLoop " << this << " start looping";
//::poll(NULL, 0, 5*1000);
while (!quit_)
{
activeChannels_.clear();
/* IO事件监听、响应和处理 */
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
//++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
/* 处理计算任务 */
doPendingFunctors();// 让IO线程不繁忙的时候也能执行一些计算任务-->避免IO线程一直处于阻塞状态
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
2.2 queueInLoop+doPendingFunctors
该部分负责计算任务的添加和处理。其中添加计算任务queueInLoop可以由外部线程和EventLoop所属线程添加,执行计算任务doPendingFunctors只能由EventLoop线程处理。
// 将cb添加到队列中
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);// 添加外部任务到pendingFunctors_数组
}
// 调用queueInLoop的线程不是IO线程需要唤醒(唤醒EventLoop对应的线程 以便该线程及时执行cb函数)
// 或者调用queueInLoop的线程是EventLoop对应的IO线程,并且此时该线程正在调用pending functor(正在执行计算任务),需要唤醒
// 只有IO线程的事件回调中调用queueInLoop才不需要唤醒
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
// 添加互斥锁
// 互斥访问vector--此时pendingFunctors_位于临界区 不能被其他线程访问
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);// swap:交换两容器的内容 pendingFunctors_变为空
}
// 为什么functors的执行没有放在临界区?
// 1 减小临界区长度 减少其他进程queueInloop()的阻塞时间
// 2 loop()线程发生IO事件时 可以及时中断该doPendingFunctors()函数中的计算任务,优先处理IO事件
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();// 执行函数
}
callingPendingFunctors_ = false;
}
3 切片三:线程安全与执行效率
下面通过doPendingFunctors()函数观察muduo在实现线程安全方与保证执行效率之间权衡的设计。
该函数主要包含两个部分:获取待处理计算任务序列、依次执行计算任务。
其中pendingFunctors_存放计算任务的数组作为外部线程和EventLoop线程都能够处理(插入数据)的共享变量,EventLoop线程在获取计算任务序列时需要对pendingFunctors_进行加锁处理。
这其中涉及到锁的作用范围即临界区的作用范围的问题。muduo此处只将对pendingFunctors_的操作置于临界区,而将执行计算任务的操作置于临界区之外。原因主要有两个:
1 减小临界区长度 减少其他进程queueInloop()的阻塞时间;
2 loop()线程发生IO事件时 可以及时中断该doPendingFunctors()函数中的计算任务,优先处理IO事件。
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
/* 1 获取待处理计算任务序列 */
// 添加互斥锁
// 互斥访问vector--此时pendingFunctors_位于临界区 不能被其他线程访问(其他线程不能操作pendingFunctors_数组)
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);// swap:交换两容器的内容 pendingFunctors_变为空
}
/* 依次执行计算任务 */
// 为什么functors的执行没有放在临界区?
// 1 减小临界区长度 减少其他进程queueInloop()的阻塞时间
// 2 loop()线程发生IO事件时 可以及时中断该doPendingFunctors()函数中的计算任务,优先处理IO事件
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();// 执行函数
}
callingPendingFunctors_ = false;
}
4 参考材料
边栏推荐
- Abbexa 8-OHdG CLIA 试剂盒解决方案
- LeetCode 321. 拼接最大數***
- Unity踩坑记录:如果继承MonoBehaviour,类的构造函数可能会被Unity调用多次,不要在构造函数做初始化工作
- yml文件配置参数定义字典和列表
- 红色垂直左侧边菜单导航代码
- Container containing the most water
- (CVPR 2020) RandLA-Net: Efficient Semantic Segmentation of Large-Scale Point Clouds
- Adding rendererdata of URP with script
- 改变世界的开发者丨玩转“俄罗斯方块”的瑶光少年
- CodeCraft-22 and Codeforces Round #795 (Div. 2)
猜你喜欢

Canvas fire burning H5 animation JS special effects

【ceph】ceph配置源码分析|common/config.*

红色垂直左侧边菜单导航代码

【FAQ】运动健康服务REST API接口使用过程中常见问题和解决方法总结

Step on the pit. The BigDecimal was improperly used, resulting in P0 accident!

Set up an online help center to easily help customers solve problems

(CVPR 2020) RandLA-Net: Efficient Semantic Segmentation of Large-Scale Point Clouds

Red vertical left side menu navigation code

After the qtmqtt source code compilation is set to keepalive, the Ping package timeout error does not return a problem repair (qmqtt:: mqttnopingresponse, qmqtt:: clientprivate:: onpingtimeo)

c语言---8 初识常见关键字
随机推荐
AOV网拓扑排序
Abbexa 8-OHdG CLIA 试剂盒解决方案
作为程序员,对于底层原理真的有那么重要吗?
【FAQ】运动健康服务REST API接口使用过程中常见问题和解决方法总结
AFL fuzzy multithreading
nfs网络挂载制作服务器镜像
C语言在底层如何对double和float压栈
换根呀呀啊呀
CodeCraft-22 and Codeforces Round #795 (Div. 2)
[FAQ] summary of common problems and solutions during the use of rest API interface of sports health service
LeetCode 255. 验证前序遍历序列二叉搜索树*
c语言---5 初识字符串、转义字符、注释
Generate XML based on annotations and reflection
图像搜索是什么
yml文件配置参数定义字典和列表
afl-fuzz多线程
c语言---10 初识结构体
AI 加持实时互动|ZegoAvatar 面部表情随动技术解析
Adding rendererdata of URP with script
JS special effect of canvas divergent particle H5 animation