当前位置:网站首页>Analysis of Muduo source code -- an analysis of the rigor, efficiency and flexibility of Muduo library code design with three slices

Analysis of Muduo source code -- an analysis of the rigor, efficiency and flexibility of Muduo library code design with three slices

2022-06-10 18:57:00 High self-improvement blog

0 Preface

Mr. Chen Shuo's muduo I have read the source code of the network library for a long time , However, my strength is limited , Every time I see the subtlety of its code design, I can only praise it in my heart , Cannot be expressed in words . It's really embarrassing . Recently, I had some experience when I saw the network design part , Combined with my previous accumulation in network programming , In particular, some of the subtleties in the code design are summarized .
Just muduo In terms of multithreaded concurrent server design , In addition to its efficient concurrent service architecture , Its efficiency and flexibility in code design can be reflected in the following three slices .
before this , First of all muduo Of Concurrent server architecture by multi-reactors+thread pool framework , The architecture is shown below :
Briefly ,mainReactor Responsible for handling new connections IO The creation and management of ,subReactor+thread pool Responsible for connected IO Reading and writing events . among , In order to reduce subReactor The pressure of the 、 Improve IO Efficiency of reading and writing , Multiple can be added subReactor.
 Insert picture description here

1 Slice one :EventLoop Thread binding and cross thread invocation

muduo_net In design , every last EventLoop Object corresponds to a reactor, And each EventLoop Object is bound to a thread ,EventLoop Object loop() Most functions including can only be called by the thread to which the object belongs , namely EventLoop Objects have strict thread - owning properties . Some functions can be called by external threads , It can activate sleep or block EventLoop The role of the current thread .
Let's look at it first EventLoop Member variables of class ( part ):

  typedef std::vector<Channel*> ChannelList;
  
  bool looping_; /* atomic */
  bool quit_; /* atomic */
  bool eventHandling_; /* atomic */
  bool callingPendingFunctors_; /* atomic */
  const pid_t threadId_;		//  The thread to which the current object belongs ID
  Timestamp pollReturnTime_;
  boost::scoped_ptr<Poller> poller_;
  boost::scoped_ptr<TimerQueue> timerQueue_;
  int wakeupFd_;				//  Used to hold eventfd Created file descriptor-- Interprocess communication 
  // unlike in TimerQueue, which is an internal class,
  // we don't expose Channel to client.
  // eventfd Of wakeupFd_ The corresponding channel   This channel will incorporate poller_ To manage 
  boost::scoped_ptr<Channel> wakeupChannel_;// EventLoop The object is responsible for wakeupChannel_ Object creation / Life cycle 
  // Poller Active channel returned  
  ChannelList activeChannels_;//  these channel The survival time of EventLoop Manage and be responsible for --> from TcpConnection and TimerQueue management 
  Channel* currentActiveChannel_;	//  Active channel currently being processed 
  MutexLock mutex_;
  std::vector<Functor> pendingFunctors_; // @BuardedBy mutex_

The concrete embodiment is as follows .

1.1 Judge whether the current thread has owned the EventLoop object

Mainly through the following two .
1 Use the thread local variable to record whether the current thread has owned the EventLoop object

//  Current thread EventLoop Object pointer 
//  Thread local storage 
__thread EventLoop* t_loopInThisThread = 0;//  Point to EventLoop object 

2 Creating EventLoop Object to judge

EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),
    timerQueue_(new TimerQueue(this)),
    wakeupFd_(createEventfd()),//  Create a eventfd
    wakeupChannel_(new Channel(this, wakeupFd_)),//  Create a channel   And will wawkeupFd_ Spread Channel
    currentActiveChannel_(NULL)
{
    
  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
  
  /*  If the current thread has been created EventLoop object , End (LOG_FATAL) */
  if (t_loopInThisThread)
  {
    
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    
    t_loopInThisThread = this;//  Point to EventLoop object 
  }
  
  //  binding eventfd The callback handler for handleRead()
  wakeupChannel_->setReadCallback(
      boost::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd
  //  binding eventfd Corresponding events --EPOLLIN Can read the event 
  wakeupChannel_->enableReading();
}

1.2 Before executing the function, judge whether the current thread is EventLoop Thread to which the object belongs

adopt assertInLoopThread() and isInLoopThread() Function to judge .

  void assertInLoopThread()
  {
    
    if (!isInLoopThread())
    {
    
      abortNotInLoopThread();
    }
  }
// ...
bool isInLoopThread() const {
     return threadId_ == CurrentThread::tid(); }

1.3 Some functions can be called across threads to wake up blocked EventLoop Thread

With quit() Function as an example , When you want to stop loop() loop , here EventLoop The thread to which the object belongs may be in poll In a jam , Therefore, you need to call this... With the help of an external process quit() At the same time, wake up the blocked EventLoop Thread to which the object belongs , And exit poll loop .

//  This function can be called across threads 
void EventLoop::quit()
{
    
  quit_ = true;
  if (!isInLoopThread())
  {
    
    wakeup();
  }
}

2 Slice II :IO Flexible scheduling of threads and computing threads

In order to make full use of CPU, stay IO Thread space , It can be for loop() Threads assign some computing tasks . therefore ,muduo_net Both can be done IO Processing and computing tasks .
As follows .

2.1 poll+handleEvent

This part is mainly responsible for IO Event monitoring 、 Response and processing .

//  The event loop , This function cannot be called across threads 
//  Can only be called in the thread that created the object 
void EventLoop::loop()
{
    
  assert(!looping_);
  //  The assertion is currently in the thread that created the object 
  assertInLoopThread();
  looping_ = true;
  quit_ = false;
  LOG_TRACE << "EventLoop " << this << " start looping";

  //::poll(NULL, 0, 5*1000);
  while (!quit_)
  {
    
    activeChannels_.clear();
    /* IO Event monitoring 、 Response and processing  */
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    //++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
    
      printActiveChannels();
    }
    // TODO sort channel by priority
    eventHandling_ = true;
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
    
      currentActiveChannel_ = *it;
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    
    /*  Processing computing tasks  */
    doPendingFunctors();//  Give Way IO Threads can also perform some computing tasks when they are not busy --> avoid IO The thread has been blocked 
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

2.2 queueInLoop+doPendingFunctors

This section is responsible for adding and processing computing tasks . Add calculation tasks queueInLoop External threads and EventLoop The owning thread is added , Perform calculation tasks doPendingFunctors Only by EventLoop threading .

//  take cb Add to queue 
void EventLoop::queueInLoop(const Functor& cb)
{
    
  {
    
  MutexLockGuard lock(mutex_);
  pendingFunctors_.push_back(cb);//  Add external tasks to pendingFunctors_ Array 
  }

  //  call queueInLoop The thread of is not IO The thread needs to wake up ( Wake up the EventLoop The corresponding thread   So that the thread can execute in time cb function )
  //  Or call queueInLoop The thread of is EventLoop Corresponding IO Threads , And the thread is calling pending functor( Executing computing task ), Need to wake up 
  //  Only IO Called in the event callback of the thread queueInLoop There is no need to wake up 
  if (!isInLoopThread() || callingPendingFunctors_)
  {
    
    wakeup();
  }
}


void EventLoop::doPendingFunctors()
{
    
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  //  Add mutex 
  //  Exclusive access vector-- here pendingFunctors_ Located in the critical zone   Cannot be accessed by other threads 
  {
    
  MutexLockGuard lock(mutex_);
  functors.swap(pendingFunctors_);// swap: Exchange the contents of the two containers  pendingFunctors_ Become empty 
  }

  //  Why? functors The implementation of is not placed in the critical area ?
  // 1  Reduce the critical zone length   Reduce other processes queueInloop() Blocking time of 
  // 2 loop() Thread occurs IO When an event is   This can be interrupted in time doPendingFunctors() Calculation tasks in functions , priority IO event 
  for (size_t i = 0; i < functors.size(); ++i)
  {
    
    functors[i]();//  Execute function 
  }
  callingPendingFunctors_ = false;
}

3 Slice three : Thread safety and execution efficiency

Pass below doPendingFunctors() Function observation muduo A design that balances thread safety with execution efficiency .
This function mainly consists of two parts : Get the pending computing task sequence 、 Execute the calculation tasks in turn .
among pendingFunctors_ An array of computing tasks is stored as external threads and EventLoop Threads can handle ( insert data ) Shared variables for ,EventLoop When getting the calculation task sequence, the thread needs to pendingFunctors_ Lock processing .
This involves the problem of the scope of action of the lock, that is, the scope of action of the critical zone .muduo This will only apply to pendingFunctors_ The operation of is placed in the critical zone , The operation of executing computing tasks is placed outside the critical area . There are two main reasons :
1 Reduce the critical zone length Reduce other processes queueInloop() Blocking time of ;
2 loop() Thread occurs IO When an event is This can be interrupted in time doPendingFunctors() Calculation tasks in functions , priority IO event .

void EventLoop::doPendingFunctors()
{
    
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  /* 1  Get the pending computing task sequence  */
  //  Add mutex 
  //  Exclusive access vector-- here pendingFunctors_ Located in the critical zone   Cannot be accessed by other threads ( Other threads cannot operate pendingFunctors_ Array )
  {
    
  MutexLockGuard lock(mutex_);
  functors.swap(pendingFunctors_);// swap: Exchange the contents of the two containers  pendingFunctors_ Become empty 
  }

  /*  Execute the calculation tasks in turn  */
  //  Why? functors The implementation of is not placed in the critical area ?
  // 1  Reduce the critical zone length   Reduce other processes queueInloop() Blocking time of 
  // 2 loop() Thread occurs IO When an event is   This can be interrupted in time doPendingFunctors() Calculation tasks in functions , priority IO event 
  for (size_t i = 0; i < functors.size(); ++i)
  {
    
    functors[i]();//  Execute function 
  }
  callingPendingFunctors_ = false;
}

4 Reference material

https://www.bilibili.com/video/BV11b411q7zr?p=29

原网站

版权声明
本文为[High self-improvement blog]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/161/202206101747009077.html