当前位置:网站首页>Brpc source code analysis (II) -- the processing process of brpc receiving requests
Brpc source code analysis (II) -- the processing process of brpc receiving requests
2022-07-25 12:06:00 【wxj1992】
List of articles
As rpc The server , After starting up , The most important process is the processing after receiving the request , This involves the most basic part of network programming : How to deal with it effectively socket Data transmitted , Let's talk about this article in detail brpc How to handle the received request .
One 、 Basic design ideas
As a server , The most typical implementation in the industry is to distinguish I/O Threads and worker threads , One or more I/O The thread is responsible for running from socket Read data into a queue , And then a bunch of worker Thread to get data from the queue and process , perhaps I/O The thread reads the data and hands it directly to worker, Such strict distinction I/O Threads and worker There are several typical problems with the mechanism of threads :
1. One I/O Threads can only read one at a time fd,I/O There are usually only one or several threads , If a fd There is a lot of data on , Reading is time-consuming and can easily lead to other fd Data blocking .
2. If you want to operate the queue, there will be competition , High concurrency seriously affects performance .
3. Current mainstream multicore cpu Under the mechanism , Between the cores cache Synchronization is subtle , Not very soon , It's easy to see cache bouncing,I/O The thread delegates the task to worker Involving cross core , Not very efficient , The same between cpu Data exchange between cache and memory is also slow .
And a lot of rpc The frame is different ,brpc There is a unique mechanism to solve the above problems , In general , Indistinguishes I/O Threads and worker Threads , Through global EventDispatcher To monitor fd Events on the , If fd There is already bthread After processing the direct return , Otherwise start a bthread Handle , By EventDispatcher Starting up bthread Responsible for reading fd News on , Start every time you read one bthread To deal with , The last one is executed in place , Until the end of reading , because EventDispatcher It is not responsible for reading or processing messages , Its own throughput can be very large ,fd Between and fd Messages in can be concurrent , All threads are wait-free Of , This makes brpc It can process messages from different sources in time under high load . Next, we will elaborate the relevant mechanisms in combination with the source code .
Two 、 Implementation details
It has been introduced in the last article , After the server is started, it will be EventDispatcher monitor epoll event , As a server , Received a new connection belonging to epoll_in event , Would call Socket::StartInputEvent To deal with .StartInputEvent Function as follows :
StartInputEvent In the function , The first is to call Address Methods according to the SocketId obtain socket s, Because a fd Events will continue to happen on ,socket Class has a butil::atomic _nevent Variable , To ensure that for a fd, There will only be one bthread Processing , When an event is received ,EventDispatcher to nevent Add 1, Only when adding 1 The previous value is 0 Start a bthread Processing correspondence fd The data on the , The previous value is not 0 It shows that there is already bthread In dealing with this fd It's the data on the Internet , Can return directly , Because what is being processed bthread Will read on . in general StartInputEvent Is only responsible for judging the occurrence of events fd Do you have bthread In the process , No, just start one in place , Go straight back if you have , Note that the bthread_start_urgent, This function starts bthread Will give up the current bthread, That's what the official documents say ,“EventDispatcher Put where you are pthread To the new bthread, Make it have a better cache locality, It can be read as soon as possible fd The data on the ”, Unwanted pthread Switching is helpful for performance ”. If for some reason bthread_start_urgent Boot failure , Then execute in situ ProcessEvent The bottom line .
Called ProcessEvent as follows :
Which is execution socket Inside _on_edge_triggered_events, For the current situation of receiving a new connection request , The event occurs on the listening port , Corresponding _on_edge_triggered_events It is mentioned in the previous article StartAccept Assigned in OnNewConnection, That is, the callback function to be called by the new connection
OnNewConnections Internal calls OnNewConnectionsUntilEAGAIN function ,OnNewConnectionsUntilEAGAIN The core is as follows :
First accept monitor fd To get clinet fd
And then call Create Create a new one socket To deal with the subsequent messages of this new connection , Add a new epoll_in event , The registered handler is InputMessenger::OnNewMessages, Pay attention to the front OnNewConnections contrast , The front is dealing with new connections , This is the new connection processing, and then deal with the new messages on the new connection . stay InputMessenger::OnNewMessages In the function , The core is to read the message and send it to the processing function of the corresponding protocol , Then call the actual business function in the user service .
about clinet fd I received it on the Internet epoll_in event , The entrance is still StartInputEvent, But then ProcessEvent The handler function called is InputMessenger::OnNewMessages, The core part of the code is as follows :
Keep reading until you finish reading .
Do a read 
messenger yes InputMessenger object , In charge of from fd Cutting and processing messages on . The last article introduced brpc Custom protocol for , The most important of every agreement is ParseXXXmessage and ProcessXXX, Corresponding to the two basic steps of processing messages ,Parse and Process,Parse It's usually cutting messages off binary streams , The running time is fixed ;Process It is to further parse the message ( For example, deserializing to protobuf) Callback callback , Time is uncertain . because brpc It supports single port multi protocol , therefore InputMessenger It will try the callback of the protocols specified by the user one by one , When one Parse After successfully cutting the next message , Call corresponding ProcessXXX. Because there is often only one message format on a connection ,InputMessenger The last choice will be recorded , And avoid trying every time .

QueueMessage It's about starting a bthread perform ProcessInputMessage To deal with a msg. If continuous from a fd Read out n A message (n > 1),InputMessenger It will start n-1 individual bthread Before treatment respectively n-1 A message , The last message will be in place Process.
according to handler The parser in gets the parsed result after processing the message
ProcessInputMessage as follows , It's called msg->_process, That is, corresponding to the agreement process_request To deal with it cut Coming down message:
The key to this realization is last_msg, The definition is as follows , It's a custom deleter Of unique_ptr:
Each cycle will execute right last_msg To deal with , If last_msg Not null, Then a new one bthread To deal with , In the first cycle , That is to deal with the first msg When ,last_msg by null, Every subsequent loop will call QueueMessage Handle last_msg,
And then last_msg Set it to this cycle msg, To be processed in the next cycle , They will be right msg->_process ( Message handler ) and msg->_arg Assign values for the following ProcessInputMessage call 

So there is one left after the last cycle last_msg Not dealt with , What shall I do? ,last_msg Of deleter yes RunlastMessage, as follows :
RunlastMessage Follow queueMessage The same is called ProcessInputMessage, When destructing, it will automatically call deleter, So as to realize the last msg Call in place .
Msg->_process It corresponds to the agreement process_request Request handling function , User functions will eventually be called , With http Agreement, for example ,Msg->_proces It refers to what we see when registering the agreement ProcessHttpRequest,
You can see msg After entering as a parameter , adopt msg->arg() Got the point server The pointer to ,msg->arg() Back to you msg->_arg, You can see in the picture above msg->_arg What is given is handler Of arg, recall ,handler Of arg In fact, it is what is started server The pointer to , Is in BuildAcceptor() Add all kinds of handler When the assignment of :
ProcessHttpRequest The core processing part is as follows :
To get the first MethodProperty sp
obtain service and method
Call business code , such as echo Example :

The specific package is sent by calling done->Run(), In the recommended usage , from RAII Mechanism guarantees , That's the one above done_guard.
3、 ... and 、 summary
brpc When processing the received request , Indistinguishes io Thread core worker Threads , The second is to make use of EventDispatcher Edge triggered epoll do event Distribution of , This concurrency can be done very much , Specific message cutting 、 Protocol related request processing 、 And the subsequent user logic are handed over to independent bthread To deal with it , Realized wait-free, Don't worry about a request io It takes too much time, resulting in batch IO Blocking , There is no bottleneck caused by the queue of producers and consumers . And give up the present pthread Scheduling in different ways , To ensure the cache locality. The advantages of the above message processing are also brpc One of the cornerstones of high performance .
边栏推荐
- NLP知识----pytorch,反向传播,预测型任务的一些小碎块笔记
- 【AI4Code】CodeX:《Evaluating Large Language Models Trained on Code》(OpenAI)
- 基于TCP/IP在同一局域网下的数据传输
- How to solve the problem of the error reported by the Flink SQL client when connecting to MySQL?
- Video Caption(跨模态视频摘要/字幕生成)
- Power BI----这几个技能让报表更具“逼格“
- brpc源码解析(四)—— Bthread机制
- [multimodal] hit: hierarchical transformer with momentum contract for video text retrieval iccv 2021
- 奉劝那些刚参加工作的学弟学妹们:要想进大厂,这些并发编程知识是你必须要掌握的!完整学习路线!!(建议收藏)
- php curl post Length Required 错误设置header头
猜你喜欢

Brpc source code analysis (VI) -- detailed explanation of basic socket

Week303 of leetcode (20220724)

【GCN-RS】MCL: Mixed-Centric Loss for Collaborative Filtering (WWW‘22)
![[untitled]](/img/83/9b9a0de33d48f7d041acac8cfe5d6a.png)
[untitled]

【多模态】《HiT: Hierarchical Transformer with Momentum Contrast for Video-Text Retrieval》ICCV 2021

brpc源码解析(二)—— brpc收到请求的处理过程
![[multimodal] transferrec: learning transferable recommendation from texture of modality feedback arXiv '22](/img/02/5f24b4af44f2f9933ce0f031d69a19.png)
[multimodal] transferrec: learning transferable recommendation from texture of modality feedback arXiv '22

'C:\xampp\php\ext\php_ zip. Dll'-%1 is not a valid Win32 Application Solution

【CTR】《Towards Universal Sequence Representation Learning for Recommender Systems》 (KDD‘22)

【多模态】《TransRec: Learning Transferable Recommendation from Mixture-of-Modality Feedback》 Arxiv‘22
随机推荐
Zero-Shot Image Retrieval(零样本跨模态检索)
php 一台服务器传图片到另一台上 curl post file_get_contents保存图片
What is the global event bus?
R语言ggplot2可视化:可视化散点图并为散点图中的部分数据点添加文本标签、使用ggrepel包的geom_text_repel函数避免数据点之间的标签互相重叠(为数据点标签添加线段、指定线段的角度
Median (二分答案 + 二分查找)
【AI4Code】《GraphCodeBERT: Pre-Training Code Representations With DataFlow》 ICLR 2021
银行理财子公司蓄力布局A股;现金管理类理财产品整改加速
LeetCode 50. Pow(x,n)
The bank's wealth management subsidiary accumulates power to distribute a shares; The rectification of cash management financial products was accelerated
Knowledge maps are used to recommend system problems (mvin, Ctrl, ckan, Kred, gaeat)
Web APIs (get element event basic operation element)
The JSP specification requires that an attribute name is preceded by whitespace
30套中国风PPT/创意PPT模板
Word中的空白页,怎么也删不掉?如何操作?
brpc源码解析(七)—— worker基于ParkingLot的bthread调度
Video Caption(跨模态视频摘要/字幕生成)
Introduction to redis
Classification parameter stack of JS common built-in object data types
brpc源码解析(三)—— 请求其他服务器以及往socket写数据的机制
【AI4Code】《Pythia: AI-assisted Code Completion System》(KDD 2019)