当前位置:网站首页>[Muduo] EventLoop event cycle
[Muduo] EventLoop event cycle
2022-07-25 23:31:00 【CID( ͡ _ ͡ °)】
Get thread ID
CurrenThread.h
#pragma once
#include <unistd.h>
#include <sys/syscall.h>
namespace CurrenThread
{
extern __thread int t_cacheTid;
void cacheTid();
inline int tid()
{
// If it is 0 This thread id Not yet obtained
if(__builtin_expect( t_cacheTid== 0, 0) )
{
cacheTid(); // Get the current thread id
}
return t_cacheTid;
}
}CurrenThread.cc
#include "CurrenThread.h"
namespace CurrenThread
{
void cacheTid()
{
if(t_cacheTid == 0)
{
// adopt linux system call , Get the current thread id
t_cacheTid = static_cast<pid_t>(::syscall(SYS_gettid));
}
}
}__thread EventLoop* t_loopInThisHread = nullptr;
Prevent a thread from creating multiple EventLoop,__thread amount to thread_local. If not , This is a global variable , All threads share
There is one in a thread EventLoop, This will become the time when each thread accesses it , There is a copy, that is, there can only be one in a thread EventLoop
eventfd Wake up the thread
Used to inform sleeping EventLoop, There is a newly connected user Channel I'll deal with it for you . establish wakeupfd, use notify Wake up the subReactor Deal with new arrivals Channel(mainReactor Wake up by polling )void EventLoop::loop() There are notes inside
DefaultPoller.cc
#include "Poller.h"
#include "EPollPoller.h"
#include <stdlib.h>
Poller* Poller::newDefaultPoller(EventLoop *loop)
{
// Default event distribution , It's through MUDUO_USE_POLL This variable is used to control
if(::getenv("MUDUO_USE_POLL"))
{
return nullptr; // Generate poll Example
}
else
{
return new EPollPoller(loop); // Generate epoll Example
}
}Channel.cc
// Open the comments of the following two functions
void Channel::update()
{
// adopt Channel Of EventLoop, call poller The corresponding method of , register fd Of Events event
loop_->updateChannel(this);
}
// stay channel Of EventLoop In the container , Delete yourself
void Channel::remove()
{
loop_->removeChannel(this);
}
EventLoop.h
#pragma once
#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrenThread.h"
#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>
class Poller;
class Channel;
// Event loop class , Mainly consists of Poller Channel Two categories:
class EventLoop :noncopyable
{
public:
using Functor = std::function<void()>;
public: // Constructors and destructors
EventLoop();
~EventLoop();
// Turn on the event loop
void loop();
// Exit event loop
void quit();
Timestamp pollReturnTime() const { return pollReturnTime_; }
// Execute on current thread cb
void runInLoop(Functor cb);
// hold cb Put it in the queue , Wake up the corresponding Loop The thread where it is running cb
void queueInLoop(Functor cb);
// Used to wake up the thread
void wakeup();
//EventLoop call Poller Method
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
// Determine whether it is current Loop Whether it is in its own thread
bool isInLoopThread() const { return threadId_ == CurrenThread::tid(); }
private:
// Handle wakeup
void handleRead();
// perform pendingFunctors_ Container callback
void doPendingFunctors();
private:
//Channel list
using ChannelList = std::vector<Channel*>;
// Judge whether the event cycle operates normally : Atomic operation bool value ,CAS Realized
std::atomic_bool looping_;
// Identify exit Loop loop
std::atomic_bool quit_;
// Record the current Loop The thread Id
const pid_t threadId_;
//poller Return to the occurrence event channel The timing of the
Timestamp pollReturnTime_;
//
std::unique_ptr<Poller> poller_;
/**
* The main role , When mainLoop Get a new user's channel,
* Select a by polling algorithm subLoop( That is to say EventLoop), Wake up through this member subLoop Handle Channel
*/
int wakeupFd_;
/**
* wakeupChannel You must wakeupFd_ encapsulated , Because we are Poller There is no direct operation inside FD, What we operate is Channel
*/
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
Channel *currenActiveChannel_;
// Identify the current loop Whether there is a callback operation to be performed
std::atomic_bool callingPendingFunctors_;
// Storage loop All callback operations that need to be performed
std::vector<Functor> pendingFunctors_;
// The mutex , Used to protect the upper vector Thread safe operation of container
std::mutex mutex_;
};EventLoop.cc
#include "Poller.h"
#include "EventLoop.h"
#include "Logger.h"
#include "Channel.h"
#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
// Prevent a thread from creating multiple EventLoop
__thread EventLoop* t_loopInThisThread = nullptr;
// Definition IO Timeout of multiplex interface
const int kPollTimeMs = 10000;
// establish wakeupfd, use notify Wake up the subReactor Deal with new arrivals Channel(mainReactor Wake up by polling )
int createEventfd()
{
// Used to inform sleeping EventLoop, There is a newly connected user Channel I'll deal with it for you
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
// Not created successfully , There is no way to inform , So quit
LOG_FATAL("eventfd err = %d\n", errno);
}
return evtfd;
}
EventLoop::EventLoop()
:looping_(false)
,quit_(false)
,callingPendingFunctors_(false)
,threadId_(CurrenThread::tid())
,poller_(Poller::newDefaultPoller(this)) // initialization Poller inside e poll_create1
,wakeupFd_(createEventfd())
,wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
if(t_loopInThisThread)
{
LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_);
}
else
{
t_loopInThisThread = this;
}
// Set up wakeupfd The type of event and the callback operation after the event
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// every last EventLoop Listen to me wakeupChannel Of EPOLLIN event
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop()
{
// hold fd Set as not interested in all events
wakeupChannel_->disableAll();
// Put this Channel Delete the
wakeupChannel_->remove();
// close fd You need to delete the corresponding event and Channel
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
// The main task is to schedule the underlying Poller Start the event distributor to listen for events
void EventLoop::loop()
{
// Identify start event cycle
looping_ = true;
// The logo is not over yet
quit_ = false;
LOG_INFO("EventLoop %p start looping \n", this);
while(!quit_)
{
activeChannels_.clear();
// Turn on epoll_wait Monitoring events fd, One is client Of fd, One is wakeupfd That is to say mainLoop Used to wake up fd
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel* channel: activeChannels_)
{
//Poller Monitor what channel Something happened , Then report it to EventLoop, notice Channel Deal with the corresponding events
channel->handleEvent(pollReturnTime_);
}
// Execute the current EventLoop Callback operation to be handled by event loop
/**
* IO Threads mainLoop conscientious accept Connections for new users , Get fd after , issue EvenLoop,EventLoop Throw it to Channel Handle packing
* mainLoop A callback function is registered in advance cb, need EventLoop To execute
* annotation :
* That is, we create it in advance wakeupfd Created this fd, It also belongs to a kind of fd,epoll_wait, In the blocking state ,
* We cannot execute mainLoop Of Newly distributed fd Connect
* So pass eventfd Created fd, Come similar epoll_wait received client Events like , stay wakeup Write a data in the function
* then Channel It's readable , Writing is similar to writing a data to the client , And then I was epoll_wait It's listening to
* So it's equivalent to the thread being awakened
* Finally, use the following function to put channel Sign up to EventLoop Upper ChannelList
*/
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping.\n" , this);
looping_ = false;
}
// Exit event loop 1.loop Invoke in your own thread quit. 2. In Africa loop In the thread of , call loop Of quit
void EventLoop::quit()
{
quit_ = true;
//
/**
* Scenario as follows : If client There are fewer new connections , Too many threads , Some threads need to be shut down
* So it could be mainLoop You want to call when sending quit, So first wake up the thread you want to end
*/
if(!isInLoopThread())
{
wakeup();
}
}
// Execute on current thread cb
void EventLoop::runInLoop(Functor cb)
{
if(isInLoopThread()) // In the current loop In the thread , perform cb
{
cb();
}
else // In non current loop Execute in thread cb, You need to wake up loop The thread , perform cb
{
queueInLoop(cb);
}
}
// hold cb Put it in the queue , Wake up the corresponding Loop The thread where it is running cb
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_); // Lock it
pendingFunctors_.emplace_back(cb);
}
// Wake up corresponding , Need to perform the above callback operation loop The thread of
//||callingPendingFunctors_ It means : At present loop Executing callback , however loop There is a new callback
if(!isInLoopThread() || callingPendingFunctors_)
{
wakeup(); // Wake up the thread
}
}
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8", n);
}
}
// Wake up the subLoop, That is to say wakefd Write a data , wakeupChannel A read event occurs , At present loop The thread will wake up
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() write %lu bytes instead of 8 \n", n);
}
}
//EventLoop call Poller Method
void EventLoop::updateChannel(Channel* channel)
{
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel* channel)
{
poller_->removeChannel(channel);
}
bool EventLoop::hasChannel(Channel* channel){
poller_->hasChannel(channel);
}
// perform pendingFunctors_ Container callback
void EventLoop::doPendingFunctors(){
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor &functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}MUDUO Threads in the library do not communicate through consumers -- Producer model to perform the wake-up , But through evenfd To wake up a thread , Then distribute it to the thread for processing
边栏推荐
- chown: changing ownership of ‘/var/lib/mysql/‘: Operation not permitted
- Classes and objects (3)
- 在应用中使用 Jetpack 库
- 新手哪个券商开户最好 开户最安全
- 网格参数化Least Squares Conformal Maps实现(3D网格映射到2D平面)
- 2022牛客多校第二场
- Npm+ module loading mechanism
- Strategy mode_
- What is a physical firewall? What's the effect?
- The new UI people help task help PHP source code with a value of 1500 / reward task Tiktok Kwai headline like source code / with three-level distribution can be packaged applet
猜你喜欢

Discuz atmosphere game style template / imitation lol hero League game DZ game template GBK

Npm+ module loading mechanism

Query commodity cases (operate data with array addition method) / key points
![[Database Foundation] summary of MySQL Foundation](/img/89/e22c232b0183eaae35a9f45a40ff36.jpg)
[Database Foundation] summary of MySQL Foundation
![[code case] blog page design (with complete source code)](/img/9e/0e7cab956515b9cc75a7567eb477d2.png)
[code case] blog page design (with complete source code)

Discuz magazine / news report template (jeavi_line) utf8 GBK / DZ template download

npm+模块加载机制

Dynamic memory management

数组中重复的数字

Apple CMS V10 template /mxone Pro adaptive film and television website template
随机推荐
@Autowired注解 required属性
行云管家V6.5.1/2/3系列版本发布:数据库OpenAPI能力持续强化
Apple CMS V10 template /mxone Pro adaptive film and television website template
动态内存管理
物理防火墙是什么?有什么作用?
Serialize data type
Idea sets get and set templates to solve the naming problem of boolean type fields
Network Security Learning notes-1 file upload
利用用户脚本优化 Yandere/Konachan 站点浏览体验
Practical skills of easyexcel
Serialize addition, deletion, modification and query
[testing technology automated testing pytest] basic summary of pytest
TS class
Node基础
Recommended system - an embedded learning framework for numerical features in CTR prediction
Implementation of date class
Summary of common PHP functions
The difference between MySQL clustered index and non clustered index
模拟实现string类常用接口
Dynamic memory management
