当前位置:网站首页>Mongodb source code implementation series network transport layer module implementation 3

Mongodb source code implementation series network transport layer module implementation 3

2020-11-09 18:09:00 InfoQ

{"type":"doc","content":[{"type":"heading","attrs":{"align":"center","level":1},"content":[{"type":"text","text":"transport_layer Network transport layer module source code implementation "}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":" About author "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Former didi travel technology expert , The current OPPO Document database mongodb person in charge , be responsible for oppo Tens of millions of peaks TPS/ One hundred billion level document database mongodb R & D and operation and maintenance work , Always focused on distributed caching 、 High performance server 、 database 、 Middleware and other related research and development .Github Account address :"},{"type":"link","attrs":{"href":"https://github.com/y123456yz","title":null},"content":[{"type":"text","marks":[{"type":"underline"}],"text":"https://github.com/y123456yz"}]}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"1. explain "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"   Prior to "},{"type":"link","attrs":{"href":"https://xie.infoq.cn/article/d4460b807c9835c6d80707410","title":null},"content":[{"type":"text","marks":[{"type":"underline"}],"text":"Mongodb Network module source code implementation and performance tuning ( One )"}]},{"type":"text","text":" and "},{"type":"link","attrs":{"href":"https://xie.infoq.cn/article/1cac5adcd1b4f3fe512a1457a","title":null},"content":[{"type":"text","marks":[{"type":"underline"}],"text":"<>"}]},{"type":"text","text":" This paper analyzes how to read the source code of mega projects 、Asio Network library implementation 、 Threading model 、transport_layer Socket processing and transport layer management sub module 、session Session sub module 、Ticket Data receiving and transmitting sub module 、service_entry_point Service entry sub module ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" This paper will continue to analyze the network transport layer module service_state_machine State machine scheduling sub module kernel source code implementation ."}]},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"2. service_state_machine State machine scheduling sub module "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" service_state_machine The state machine processing module is mainly used to process the state transition of a complete request , Make sure that the request goes smoothly according to the specified process , Finally, the normal implementation of the client mongodb visit . The core code implementation of the module is mainly realized by the following three source files (test For testing related , You can ignore ):"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/ca/cafbb6a9c23c1f755c26eccae7469504.png","alt":null,"title":null,"style":null,"href":null,"fromPaste":true,"pastePass":true}},{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":"2.1 Core code implementation "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" stay service_entry_point Service entry sub module analysis , When a new link is received , stay ServiceEntryPointImpl::startSession(...) The callback function will construct a ServiceStateMachine  ssm class , So the new link is implemented 、session、ssm One to one mapping of . among ,ServiceStateMachine  Class implementation for ThreadGuard( Thread guard ) There is more dependence on , Therefore, this paper analyzes the internal design details of the whole state machine scheduling module from the core code implementation of these two classes ."}]},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.1.1 ThreadGuard Thread Guardian class core code implementation "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" ThreadGuard That is to say ” Thread guard ” class , This class is mainly used for the management and maintenance of worker thread names 、ssm Ownership management 、 The ssm Corresponding session Link recycling and so on . The core members and interfaces of this class are implemented as follows :"}]},{"type":"codeblock","attrs":{"lang":"cpp"},"content":[{"type":"text","text":"1.class ServiceStateMachine::ThreadGuard {  \n2.    ......  \n3.public:  \n4.    // create a ThreadGuard which will take ownership of the SSM in this thread.  \n5.    //ThreadGuard initialization , Mark ssm Ownership belongs to this thread   \n6.    explicit ThreadGuard(ServiceStateMachine* ssm) : _ssm{ssm} {  \n7.        // obtain ssm State machine ownership   \n8.        auto owned = _ssm->_owned.compareAndSwap(Ownership::kUnowned, Ownership::kOwned);  \n9.        // If it's a link, a thread model , Then the condition is satisfied   \n10.        if (owned == Ownership::kStatic) {   \n11.        // One link, one thread model , Because links are always handled by the same thread , So the link corresponds to ssm Always belong to the same thread   \n12.            dassert(haveClient());  \n13.            dassert(Client::getCurrent() == _ssm->_dbClientPtr);  \n14.            // The ownership of the mark has been established   \n15.            _haveTakenOwnership = true;  \n16.            return;  \n17.        }  \n18.   ......\n19.     //adaptive  Dynamic thread mode follows the following pattern   \n20.  \n21.        // Save the thread name of the current thread when it is zero   \n22.        auto oldThreadName = getThreadName();   \n23.     //ssm The saved thread name is different from the current thread name   \n24.        if (oldThreadName != _ssm->_threadName) {  \n25.         // The thread name is about to be modified , Save the modified thread name to _oldThreadName  \n26.            _ssm->_oldThreadName = getThreadName().toString();  \n27.           // Put the runbook ssm The thread name of the state machine is changed to conn-x Threads   \n28.        setThreadName(_ssm->_threadName); // Rename the current thread to _threadName  \n29.        }  \n30.  \n31.        // Set this thread corresponding to client Information , A link corresponds to a client, Logo book client Currently belongs to this thread processing   \n32.        Client::setCurrent(std::move(_ssm->_dbClient));  \n33.     // This state machine ssm The ownership has , It belongs to the operating book ssm The thread of   \n34.        _haveTakenOwnership = true;  \n35.    }  \n36.    ......  \n37.    // Reassign   \n38.    ThreadGuard& operator=(ThreadGuard&& other) {  \n39.        if (this != &other) {  \n40.            _ssm = other._ssm;  \n41.            _haveTakenOwnership = other._haveTakenOwnership;  \n42.         // The original other Ownership lapse   \n43.            other._haveTakenOwnership = false;  \n44.        }  \n45.    // return   \n46.        return *this;  \n47.    };  \n48.  \n49.    // Destructor   \n50.    ~ThreadGuard() {  \n51.    //ssm Ownership has been established , Then the time of deconstruction , call release Handle , Restore the original thread name of the thread   \n52.        if (_haveTakenOwnership)  \n53.            release();  \n54.    }  \n55.  \n56.    // One link, one thread model , Mark _owned by kStatic, That is, the thread name never changes   \n57.    void markStaticOwnership() {  \n58.        dassert(static_cast(*this));  \n59.        _ssm->_owned.store(Ownership::kStatic);  \n60.    }  \n61.  \n62.    // Restore the original thread name , At the same time client Information is returned from the scheduling thread to the state machine   \n63.    void release() {  \n64.         auto owned = _ssm->_owned.load();  \n65.     //adaptive The asynchronous thread pool pattern satisfies if Conditions , Express SSM Fixed belonging to a thread   \n66.        if (owned != Ownership::kStatic) {  \n67.        // This thread has currentClient Information , So he returned it to SSM State machine   \n68.            if (haveClient()) {  \n69.                _ssm->_dbClient = Client::releaseCurrent();  \n70.            }  \n71.         // Restore to the previous thread name   \n72.            if (!_ssm->_oldThreadName.empty()) {  \n73.         // Restore to old thread name   \n74.                setThreadName(_ssm->_oldThreadName);   \n75.            }  \n76.        }  \n77.        // State machine state enters end, Call the corresponding recycle hook Handle   \n78.        if (_ssm->state() == State::Ended) {  \n79.            // Recycling of link closure  ServiceStateMachine::setCleanupHook  \n80.            auto cleanupHook = std::move(_ssm->_cleanupHook);  \n81.            if (cleanupHook)  \n82.                cleanupHook();  \n83.            return;  \n84.        }  \n85.  \n86.     // Loss of ownership   \n87.        _haveTakenOwnership = false;  \n88.        // The belonging state becomes unknown   \n89.        if (owned == Ownership::kOwned) {  \n90.            _ssm->_owned.store(Ownership::kUnowned);  \n91.        }  \n92.    }  \n93.  \n94.private:  \n95.    // This thread guards the current corresponding ssm  \n96.    ServiceStateMachine* _ssm;  \n97.    // Default false, Identify the state machine ssm Does not belong to any thread   \n98.    bool _haveTakenOwnership = false;  \n99.}  "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" From the above code analysis, we can see that the role of thread Guardian class is relatively clear , It is to guard the belonging state of the current thread , And record the state machine ssm Thread names before and after different state changes . Besides , State machine ssm Corresponding session If the link goes into end state , Then the resource recovery and release of the link is also completed by this class ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" see mongod perhaps mongos example , If the instance is configured when starting the instance ”serviceExecutor: adaptive” You will find that there are many threads under these processes called ”conn-x” and ”worker-x” Threads , At the same time, the thread name of the same thread may change , This process is made up of ThreadGuard Thread Guardian class to implement .synchronous A link, a thread model only ”conn-x” Threads ,adaptive The thread model will also have a thread named ”conn-x” and ”worker-x” The thread of , The specific principle will continue to be analyzed later , Here's the picture :"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/25/25646a361dc83b3f64d23275231e9cb3.png","alt":null,"title":null,"style":null,"href":null,"fromPaste":true,"pastePass":true}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong"}],"text":" explain :"},{"type":"text","text":"synchronous The thread pattern corresponds to worker The initial thread name is ”conn-x”,adaptive The thread model corresponds to worker The initial thread name is ”worker-x”."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" ThreadGuard Thread Guardian classes and state machines ssm(service_state_machine) Closely related , The internal part of client request processing ssm State transitions are also closely related to this class , Take a look at the subsequent analysis ."}]},{"type":"heading","attrs":{"align":null,"level":3},"content":[{"type":"text","text":"2.1.2 ServiceStateMachine  Class core code implementation "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" service_state_machine The core code of state machine processing module is realized by ServiceStateMachine Class completion , The core structure members and function interfaces of this class are as follows :"}]},{"type":"codeblock","attrs":{"lang":"cpp"},"content":[{"type":"text","text":"1.// A new link corresponds to a ServiceStateMachine Save to ServiceEntryPointImpl._sessions in   \n2.class ServiceStateMachine : public std::enable_shared_from_this {  \n3.   ......  \n4.public:  \n5.   ......  \n6.   static std::shared_ptr create(...);  \n7.   ServiceStateMachine(...);  \n8.   // The state to which the state machine belongs \n9.   enum class State {  \n10.       //ServiceStateMachine::ServiceStateMachine Constructor initial state   \n11.       Created,        \n12.       //ServiceStateMachine::_runNextInGuard Start to receive network data   \n13.       // Mark this client request completed ( Has sent DB Get the data to the client ), Waiting to schedule the link   \n14.       // The next round of requests , The corresponding processing flow of this state is _sourceMessage  \n15.       Source,         \n16.       // Waiting to get the client's data   \n17.       SourceWait,     \n18.       // Received a complete mongodb Enter the state after the message   \n19.       Process,        \n20.       // Wait for the data to be sent successfully   \n21.       SinkWait,       \n22.       // Abnormal receiving or sending data 、 Link off , Then enter the State   _cleanupSession  \n23.       EndSession,     \n24.       //session Recycle into this state   \n25.       Ended           \n26.   };  \n27.   // Ownership status , It is mainly used to determine whether to follow the new thread name in the state transition , Only works with the dynamic thread model   \n28.   enum class Ownership {   \n29.       // This state represents the state machine SSM It's inactive   \n30.       kUnowned,    \n31.       // This state identifies the state machine SSM Belong to a job worker Threads , In the active scheduling state   \n32.       kOwned,   \n33.       // Express SSM Fixed belonging to a thread   \n34.       kStatic   \n35.   };  \n36.     \n37.   ......  \n38.private:  \n39.   //ThreadGuard Can be understood as thread guardian , There is ThreadGuard Class separately   \n40.   class ThreadGuard;  \n41.   friend class ThreadGuard;  \n42.     \n43.   ......  \n44.   // obtain session Information   \n45.   const transport::SessionHandle& _session()  \n46.   // The following two interfaces are tasks task Scheduling related interfaces   \n47.   void _scheduleNextWithGuard(...);  \n48.   void _runNextInGuard(ThreadGuard guard);  \n49.   // Received a complete mongodb Post message processing   \n50.   inline void _processMessage(ThreadGuard guard);  \n51.   // The following four interfaces complete the underlying data reading and writing and their corresponding callback processing   \n52.   void _sourceCallback(Status status);  \n53.   void _sinkCallback(Status status);  \n54.   void _sourceMessage(ThreadGuard guard);  \n55.   void _sinkMessage(ThreadGuard guard, Message toSink);  \n56.     \n57.   // A client request , At present mongodb The state of the server   \n58.   AtomicWord _state{State::Created};  \n59.   // Service entrance ,ServiceEntryPointMongod ServiceEntryPointMongos mongod And mongos entry point   \n60.   ServiceEntryPoint* _sep;  \n61.   //synchronous And adaptive Pattern , In other words, is the thread model a link to a thread or a dynamic thread pool   \n62.   transport::Mode _transportMode;  \n63.   //ServiceContextMongoD(mongod) perhaps ServiceContextNoop(mongos) Service context   \n64.   ServiceContext* const _serviceContext;  \n65.   // That's Ben ssm Corresponding session Information , Default correspondence ASIOSession   \n66.   transport::SessionHandle _sessionHandle;   \n67.   // according to session Construct correspondence client Information ,ServiceStateMachine::ServiceStateMachine assignment   \n68.   // This is the client information corresponding to this request   \n69.   ServiceContext::UniqueClient _dbClient;  \n70.   // Pointing up to _dbClient  \n71.   const Client* _dbClientPtr;  \n72. // The SSM The thread name of the current processing thread , because adaptive Thread model different states in a request change the thread name \n73.   const std::string _threadName;  \n74.   // Modify the thread name before the thread name   \n75.   std::string _oldThreadName;  \n76.   //ServiceEntryPointImpl::startSession->ServiceStateMachine::setCleanupHook Set assignment in   \n77.   //session Link recycling   \n78.   stdx::function _cleanupHook;  \n79.   // Receiving processing message Information    A complete message is recorded in this msg in   \n80.   Message _inMessage;   \n81.   // Default initialization kUnowned, Logo book SSM The state machine is inactive ,  \n82.   // It is mainly used to determine whether to follow the new thread name in the state transition , Only works with the dynamic thread model   \n83.   AtomicWord _owned{Ownership::kUnowned};  \n84.}"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" The functions of the core members of this class are described in the following table :"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/aa/aa7515fad797cf7a5b492591037682a5.png","alt":null,"title":null,"style":null,"href":null,"fromPaste":true,"pastePass":true}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" We know , link 、session、SSM State machines correspond one by one , They also have the right of ownership , The ownership here refers to the current SSM Belongs to that thread , That is, at present SSM The state machine scheduling module is implemented by that thread . Ownership by Ownership Class tags , This kind of protection has the following states , Each state function is described as follows :"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/c7/c7faf19544f54a3006dfc684ad9ff2bd.png","alt":null,"title":null,"style":null,"href":null,"fromPaste":true,"pastePass":true}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Mongodb After receiving the request from the client, the server receives the data 、 Protocol analysis 、 from db Layer get data 、 Sending data to the client is done through SSM The state machine processes state transitions in an orderly manner ,SSM Protect multiple states during scheduling processing , Each state corresponds to a status code , The specific status code and its function description are shown in the table below :"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/19/191bf0a1e1a516a07b13108d5103a6a3.png","alt":null,"title":null,"style":null,"href":null,"fromPaste":true,"pastePass":true}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Above is SSM Status code information of processing request process , Please refer to the following core code analysis for the specific implementation process of state transition .listerner After receiving a new client link, the thread will call through service"},{"type":"text","marks":[{"type":"italic"}],"text":"entry"},{"type":"text","text":"point Service entry sub module ssm->start() Interface into SSM State machine scheduling module , The source code implementation of the interface is as follows :"}]},{"type":"codeblock","attrs":{"lang":"cpp"},"content":[{"type":"text","text":"1.//ServiceEntryPointImpl::startSession In the implementation of    start-up SSM State machine   \n2.void ServiceStateMachine::start(Ownership ownershipModel) {  \n3.    // Call directly _scheduleNextWithGuard Interface   \n4.    _scheduleNextWithGuard(   \n5.    //listener Thread temporarily becomes conn The thread of , stay _scheduleNextWithGuard Zhongren   \n6.    // When the team is finished , In the following _scheduleNextWithGuard call guard.release() recovery listener The thread of   \n7.        ThreadGuard(this), transport::ServiceExecutor::kEmptyFlags, ownershipModel);  \n8.}  \n9.  \n10.void ServiceStateMachine::_scheduleNextWithGuard(...) {  \n11.    // This task func Actually by worker Thread running ,worker The thread from asio The global queue of the library gets the task to schedule execution   \n12.    auto func = [ ssm = shared_from_this(), ownershipModel ] {  \n13.     // structure ThreadGuard  \n14.        ThreadGuard guard(ssm.get());    \n15.     // That is the sync mode, That is, one link, one thread mode ,  The ownership is clear , Belongs to the specified thread   \n16.     if (ownershipModel == Ownership::kStatic)   \n17.         guard.markStaticOwnership();  \n18.     // Corresponding :ServiceStateMachine::_runNextInGuard, Complete the state scheduling transition in this   \n19.        ssm->_runNextInGuard(std::move(guard));    \n20.    };  \n21.    // The thread name before recovery , If it's time to SSM Get into Ended state , Start resource recovery   \n22.    guard.release();  \n23.    //ServiceExecutorAdaptive::schedule(adaptive)   ServiceExecutorSynchronous::schedule(synchronous)  \n24.    // The first time you enter this function, you create a new thread in it , Not for the first time task Task scheduling   \n25.    Status status = _serviceContext->getServiceExecutor()->schedule(std::move(func), flags);  \n26.    if (status.isOK()) {  \n27.        return;  \n28.    }  \n29.    // exception handling   \n30.    ......  \n31.}  "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"     ServiceStateMachine::start() Interface call ServiceStateMachine::_scheduleNextWithGuard() To start the state machine ._scheduleNextWithGuard() The core function of an interface is to call service_executor Service running sub module ( Thread model sub module ) Of schedule() Interface to queue state machine scheduling tasks to ASIO A global queue of network libraries (adaptive Dynamic thread model ), If it's a link, a thread model , The task is queued to the thread level private queue ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"      adaptive Threading model , The process of task queuing and task execution scheduling will be analyzed in the following thread model sub module , You can also refer to :"},{"type":"link","attrs":{"href":"https://my.oschina.net/u/4087916/blog/4295038","title":null},"content":[{"type":"text","text":"<>"}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"      Besides ,_scheduleNextWithGuard() The task of entering the global queue is what we need to analyze later in this module SSM State machine tasks , these task The task is passed through the interface of this function func (...) encapsulate , Then queue up to a global queue through the thread model sub module .Func(...) This task The task will call _runNextInGuard() Interface for state transition processing , The interface is to join the queue to ASIO Global queue tasks , The core code functions are as follows :"}]},{"type":"codeblock","attrs":{"lang":"cpp"},"content":[{"type":"text","text":"1.void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {  \n2.    // Get current SSM state   \n3.    auto curState = state();  \n4.    // If this is the first run of the SSM, then update its state to Source  \n5.    // If it's the first time you run this SSM, The state of Created, This is where the tag is ready to receive data   \n6.    if (curState == State::Created) {   \n7.        // Get into Source Waiting to receive data   \n8.        curState = State::Source;  \n9.        _state.store(curState);  \n10.    }  \n11.    // Corresponding processing of each state   \n12.    try {  \n13.        switch (curState) {   \n14.         // receive data   \n15.            case State::Source:    \n16.                _sourceMessage(std::move(guard));  \n17.                break;  \n18.            // And receiving the complete one mongodb message , It can be handled internally ( analysis + Command processing + Answer client )  \n19.            case State::Process:  \n20.                _processMessage(std::move(guard));  \n21.                break;  \n22.            // The link is abnormal or closed , Then we start recycling   \n23.            case State::EndSession:  \n24.                _cleanupSession(std::move(guard));  \n25.                break;  \n26.            default:  \n27.                MONGO_UNREACHABLE;  \n28.        }  \n29. return;\n30.    } catch (...) {  \n31.        // Abnormal printing   \n32.    }  \n33.    // exception handling   \n34.    ......  \n35.    // Get into EndSession state   \n36.    _state.store(State::EndSession);  \n37.    _cleanupSession(std::move(guard));  \n38.} "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" From the above code implementation, we can see that , There are only three types of tasks that are actually queued to the global queue , Namely :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"1)  receive mongodb Data task Mission , Referred to as readTask."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"2)  Received a complete mongodb After the data processing ( Including protocol resolution 、 Command processing 、DB get data 、 Send data to client, etc ), Referred to as dealTask."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"3)  Abnormal receiving or sending data 、 Follow up resource release caused by link closing , Referred to as cleanTask."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Here are three kinds of task Task core code implementation analysis :"}]},{"type":"bulletedlist","content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong"}],"text":" readTask Task core code implementation "}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" readTask The task core code is implemented by _sourceMessage() Interface implementation , The specific code is as follows :"}]},{"type":"codeblock","attrs":{"lang":"cpp"},"content":[{"type":"text","text":"1.// Receive client data   \n2.void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {  \n3.    ......  \n4.    // Access book session Receiving data ticket, That is to say ASIOSourceTicket  \n5.    auto ticket = _session()->sourceMessage(&_inMessage);   \n6.    // Enter and wait to receive data state   \n7.    _state.store(State::SourceWait);    \n8.    //release recovery worker The original thread name of the thread ,synchronous The thread model is \"conn-xx\",adaptive Corresponding worker The thread is called \"conn-xx\"  \n9.    guard.release();  \n10.    // Thread model default synchronization mode , That is, a link, a thread   \n11.    if (_transportMode == transport::Mode::kSynchronous) {  \n12.         // Synchronization mode , Read a complete mongodb Execute after message _sourceCallback Callback   \n13.         _sourceCallback([this](auto ticket) {  \n14.            MONGO_IDLE_THREAD_BLOCK;  \n15.            return _session()->getTransportLayer()->wait(std::move(ticket));  \n16.        }(std::move(ticket)));   \n17.    } else if (_transportMode == transport::Mode::kAsynchronous) {  \n18.        //adaptive Threading model , Read an asynchronously mongodb Execute after message _sourceCallback Callback   \n19.        _session()->getTransportLayer()->asyncWait(   \n20.            ////TransportLayerASIO::ASIOSourceTicket::_bodyCallback After reading a complete message, the callback is executed   \n21.            std::move(ticket), [this](Status status) { _sourceCallback(status); });  \n22.    }  \n23.}  \n24.  \n25.// Received a complete mongodb Callback processing after the message   \n26.void ServiceStateMachine::_sourceCallback(Status status) {  \n27.    // structure ThreadGuard, Modify the executive copy SSM The thread name of the interface is conn-xx  \n28.    ThreadGuard guard(this);   \n29.  \n30.    // Status check   \n31.    dassert(state() == State::SourceWait);  \n32.    // For a link session Remote information   \n33.    auto remote = _session()->remote();   \n34.    if (status.isOK()) {  \n35.    // Waiting for dispatch , Enter the message processing phase   _processMessage  \n36.        _state.store(State::Process);  \n37.        // Be careful kMayRecurse identification State::Process The processing of the phase is still executed by this thread , This is a recursive token   \n38.        return _scheduleNextWithGuard(std::move(guard), ServiceExecutor::kMayRecurse);  \n39.    }  \n40.    ......  \n41.    // Exception flow call   \n42.    _runNextInGuard(std::move(guard));  \n43.}  "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" SSM The first task of scheduling is readTask Mission , From the source code analysis above, we can see that , The task is through ticket Data distribution module from ASIO The network library reads a full length of mongodb message , And then execute "},{"type":"text","marks":[{"type":"italic"}],"text":"sourceCallback Callback . After entering the callback function , Set up now SSM Status as State::Process state , And then call "},{"type":"text","text":"scheduleNextWithGuard(...) hold dealTask Task line up to ASIO Global queue of (adaptive Threading model ), Or queue up to a thread level private queue (synchronous Threading model ) wait for worker Thread schedule execution ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Here's a detail , In the dealTask When you join the team , With kMayRecurse Mark , This tag identifies that the task can be called recursively , That is, the task can be continued by the current thread , In this way, the same request can be guaranteed taskRead The tasks and dealTask Tasks are processed by the same thread . Task recursive scheduling , You can refer to the following thread model sub module source code implementation ."}]},{"type":"bulletedlist","content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong"}],"text":" dealTask Task core code implementation "}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" When reading a full length of mongodb After the message , It will dealTask Task queued to global queue , Then from worker The thread schedules the execution of the task Mission .dealTask The core code implementation of the task is as follows :"}]},{"type":"codeblock","attrs":{"lang":"cpp"},"content":[{"type":"text","text":"1.//dealTask Handle   \n2.void ServiceStateMachine::_processMessage(ThreadGuard guard) {  \n3.    ......  \n4.    // Inlet flow count   \n5.    networkCounter.hitLogicalIn(_inMessage.size());  \n6.    // Get a unique UniqueOperationContext, One client corresponds to one UniqueOperationContext  \n7.    auto opCtx = Client::getCurrent()->makeOperationContext();  \n8.    //ServiceEntryPointMongod::handleRequest  ServiceEntryPointMongos::handleRequest Request processing   \n9.    //command Handle 、DB After accessing the data through dbresponse return   \n10.    DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  \n11.    // Release opCtx, such currentop I can't see   \n12.    opCtx.reset();  \n13.    // The response data that needs to be sent to the client   \n14.    Message& toSink = dbresponse.response;  \n15.    // Answer data exists   \n16.    if (!toSink.empty()) {    \n17.        ......  \n18.        // send data  ServiceStateMachine::_sinkMessage()  \n19.        _sinkMessage(std::move(guard), std::move(toSink));  \n20.  \n21.    } else {  \n22.       // If you don't need to respond to the client's processing   \n23.       ......  \n24.    }  \n25.}  \n26.  \n27.// call Sinkticket send data   \n28.void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) {  \n29.    // Get the sending data ASIOSinkTicket  \n30.    auto ticket = _session()->sinkMessage(toSink);  \n31.    // Get into sink Send wait status   \n32.    _state.store(State::SinkWait);  \n33.    // Restore the original worker The thread of   \n34.    guard.release();  \n35.    //synchronous Threading model , The synchronous   \n36.    if (_transportMode == transport::Mode::kSynchronous) {  \n37.     // In the end in ASIOSinkTicket When the synchronous data is sent successfully, it will be executed _sinkCallback  \n38.        _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket)));  \n39.    } else if (_transportMode == transport::Mode::kAsynchronous) {  \n40.        // In the end in ASIOSinkTicket When the data is sent asynchronously, it is executed _sinkCallback  \n41.        _session()->getTransportLayer()->asyncWait(  \n42.            std::move(ticket), [this](Status status) { _sinkCallback(status); });  \n43.    }  \n44.}  \n45.  \n46.//sink Data sending   \n47.void ServiceStateMachine::_sinkCallback(Status status) {  \n48.    //SSM Belong to the new guard, At the same time, modify the current thread name to conn-xx  \n49.    ThreadGuard guard(this);  \n50.    // Status check   \n51.    dassert(state() == State::SinkWait);  \n52.    if (!status.isOK()) {  \n53.        // Get into EndSession state   \n54.        _state.store(State::EndSession);  \n55.     // Exception call   \n56.        return _runNextInGuard(std::move(guard));  \n57.    } else if (_inExhaust) { //_inExhaust The way    \n58.        // Notice that the state here is process   _processMessage    It needs to go on Process Handle   \n59.        _state.store(State::Process);   \n60.    } else {   \n61.        // The normal process always enters the branch  _sourceMessage     Here we continue to receive data recursively   \n62.        // Notice that the state here is Source, Continue to receive client requests   \n63.        _state.store(State::Source);  \n64.    }  \n65.    // This link corresponds to a time mongo The interview has been answered , Need to continue to schedule again   \n66.    return _scheduleNextWithGuard(std::move(guard),  \n67.                                  ServiceExecutor::kDeferredTask |  \n68.                                      ServiceExecutor::kMayYieldBeforeSchedule);  \n69.}  "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" readTask adopt ticket Data distribution sub module reads a full length of mongodb After the message , Start dealTask Task logic , The task is _processMessage(...). The core implementation of the interface is to call mongod and mongos Instance corresponding to the service entry class handleRequest(...) Interface to complete the following command Command processing 、DB Layer data access, etc , The accessed data is stored in DbResponse in , Finally, through _sinkMessage(...) Send the data out ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" real mongodb The internal processing flow is actually through the dealTask Task to complete , This task is also the most resource consuming part of the whole request . In the task Tasks , When the data is successfully sent to the client , The session The link corresponds to SSM The state machine enters State::Source state , Continue to wait for worker Thread scheduling to complete subsequent new requests for the link ."}]},{"type":"bulletedlist","content":[{"type":"listitem","content":[{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong"}],"text":" cleanTask Mission "}]}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" In the process of reading and writing data 、 Client link closed 、 visit DB Data layer and any other link is abnormal , You will enter State::EndSession state . The task implementation corresponding to this state is relatively simple , The specific code implementation is as follows :"}]},{"type":"codeblock","attrs":{"lang":"cpp"},"content":[{"type":"text","text":"1.//session Corresponding link recycling processing   \n2.void ServiceStateMachine::_cleanupSession(ThreadGuard guard) {  \n3.    // After entering this state, in ~ThreadGuard::release In the middle of ssm _cleanupHook Handle , The hook stay ServiceEntryPointImpl::startSession  \n4.    _state.store(State::Ended);  \n5.    // Empty message buffer  \n6.    _inMessage.reset();  \n7.    // Release the link corresponding to client resources   \n8.    Client::releaseCurrent();  \n9.}  "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" After entering this state, it is directly carried out by this thread session Recycling and client Resource release processing , Without state machine scheduling worker Thread to recycle ."}]},{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":"2.2 About worker Thread name and guardthread Thread Guardian class "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" We have to analyze it in front of us. We know , When the thread model is adaptive Dynamic thread model ,mongod and mongos There are many child threads named “conn-xx” and ”worker-xx” The thread of , And the same thread may be called “conn-xx”, Next time it's going to be ”worker-xx”. The initial name of the thread name and the change of the thread name with ServiceStateMachine State machine scheduling class 、guardthread Thread Guardian class 、worker Thread model and so on ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Worker Thread by ServiceExecutor Thread model sub module creation , Please refer to the following chapters of thread model sub module . The default initialization thread name is ”conn-x”, The initialization code is implemented as follows :"}]},{"type":"codeblock","attrs":{"lang":"cpp"},"content":[{"type":"text","text":"1.//ServiceStateMachine::create call ,ServiceStateMachine Class initialization construct   \n2.ServiceStateMachine::ServiceStateMachine(...)  \n3.    ......  \n4.    // Thread name initialization :conn-xx,xx Code session id  \n5.    _threadName{str::stream() << \"conn-\" << _session()->id()} {}   \n6.}  \n7.  \n8.class Session {  \n9.    ......  \n10.    //sessionID, Autogenous generation   \n11.    const Id _id;  \n12.}  \n13.  \n14.// overall situation sessionIdCounter Counter , Initialize to 0  \n15.AtomicUInt64 sessionIdCounter(0);  \n16.  \n17.//session id Self increasing   \n18.Session::Session() : _id(sessionIdCounter.addAndFetch(1)) {} "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" SSM During state processing , Will put a complete request process = readTask Mission + dealTask Mission , Both tasks are done through SSM State machines and ServiceExecutor Thread model sub module worker The thread cooperates with the schedule , During task processing, the thread processing the same task may have multiple thread name changes , This is the combination guardthread Thread Guardian class to complete , Take the pseudocode implementation of a thread name switch change as an example :"}]},{"type":"codeblock","attrs":{"lang":"cpp"},"content":[{"type":"text","text":"1.worker_thread_run_task(...)  \n2.{  \n3.    // If it is adaptive Threading model , At present worker The thread is called \"worker-xx\"  \n4.    print(threadName)  \n5.    // Business logic processing 1  \n6.    ......  \n7.      \n8.    // Initialization construct ThreadGuard, In this, modify the thread name to _ssm->_threadName, That is to say \"conn-xx\",  \n9.    // Also save the original thread name \"worker-xx\" To _ssm->_oldThreadName in   \n10.    ThreadGuard guard(this);  \n11.    // If it is adaptive Threading model , The print content of thread name is \"conn-xx\"  \n12.    print(threadName)  \n13.    // Business logic processing 2  \n14.    ......  \n15.    // recovery _ssm->_oldThreadName Saved thread name \"worker-xx\"  \n16.    guard.release();  \n17.      \n18.    // If it is adaptive Threading model , The thread name is restored to \"worker-xx\"  \n19.    print(threadName)  \n20.}  "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" From the pseudo code above, you can see that ,adaptive The thread model corresponds to worker The thread is called ”worker”, When entering ThreadGuard guard(this) After the process , Change the thread name to ”conn-xx” Threads , When guard.release() It's released and restored ”worker-xx” The thread of ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Combine the preceding SSM State processing flow ,adaptive The thread model can be summarized as follows : The underlying network IO Data reading and writing process ,worker The thread name will be changed to ”worker-xx”, Other non network IO Of mongodb The internal logic processing thread is named ”conn-xx”. therefore , If you look at mongod perhaps mongos When all thread names of a process , If you find a thread named ”worker-xx”, Indicates that the current thread is processing the network IO; If you find a thread named ”conn-xx”, Indicates that the current thread is processing internal logic processing , about mongod Examples can be understood as dealing mainly with disks IO."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" because synchronous Synchronous thread model , All client requests corresponding to the same link are processed by the same thread from beginning to end , So the whole processing thread name doesn't change , There's no need to change the thread name , The whole process is ”conn-xx” The thread of ."}]},{"type":"heading","attrs":{"align":null,"level":2},"content":[{"type":"text","text":"2.3 The module function interface summary "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"       The main core interface source code implementation is analyzed , Many other interfaces are not listed in detail , The module u All interface functions are summarized as follows , For more interface code implementations, see "},{"type":"link","attrs":{"href":"https://github.com/y123456yz/reading-and-annotate-mongodb-3.6","title":null},"content":[{"type":"text","marks":[{"type":"underline"}],"text":"Mongodb Kernel source code detailed annotation analysis "}]},{"type":"text","text":":"}]},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/d7/d7d7d677becaaa54eaba8397110a1276.png","alt":null,"title":null,"style":null,"href":null,"fromPaste":true,"pastePass":true}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"image","attrs":{"src":"https://static001.geekbang.org/infoq/ba/ba6ed1479ab80046e2e5c1b717a7bf24.png","alt":null,"title":null,"style":null,"href":null,"fromPaste":true,"pastePass":true}},{"type":"heading","attrs":{"align":null,"level":1},"content":[{"type":"text","text":"3.  summary "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"   "},{"type":"link","attrs":{"href":"https://xie.infoq.cn/article/d4460b807c9835c6d80707410","title":null},"content":[{"type":"text","text":"Mongodb Network module source code implementation and performance tuning ( One )"}],"marks":[{"type":"underline"}]},{"type":"text","marks":[{"type":"underline"}],"text":" "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"link","attrs":{"href":"https://xie.infoq.cn/article/1cac5adcd1b4f3fe512a1457a","title":null},"content":[{"type":"text","marks":[{"type":"underline"}],"text":"<>"}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" This paper mainly analyzes service_state_machine State machine sub module , This module puts session The corresponding client request is converted to readTask Mission 、dealTask The tasks and cleanTask Mission , The first two tasks passed worker The thread completes the scheduling process ,cleanTask The task is executed directly by this thread when the internal processing exception or the link is closed , Not through worker Thread schedule execution ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" These three task processing processes will correspond to Created、Source、SourceWait、Process、SinkWait、EndSession、Ended One or more of the seven states , For details, please refer to the previous status code analysis . A normal client request state transition process is as follows :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"1)  The first request state transition process that the link just established :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Created->Source -> SourceWait -> Process -> SinkWait -> Source"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"2)  The link's subsequent request state transition process :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":"  Source -> SourceWait -> Process -> SinkWait -> Source"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" Besides ,SSM State machine scheduling module through ServiceStateMachine::_scheduleNextWithGuard(...) Interfaces are associated with thread model submodules .SSM Through this interface worker Thread initial creation 、task The task is in the queue , The next issue will analyze "},{"type":"text","marks":[{"type":"strong"}],"text":"<< Network thread model sub module >>"},{"type":"text","text":" Detailed source code implementation ."}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" "}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","marks":[{"type":"strong"}],"text":" explain :"}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null},"content":[{"type":"text","text":" More interface implementation details of the module are shown in Mongodb Kernel source code comments :"},{"type":"link","attrs":{"href":"https://github.com/y123456yz/reading-and-annotate-mongodb-3.6","title":null},"content":[{"type":"text","marks":[{"type":"underline"}],"text":"Mongodb Kernel source code detailed annotation analysis "}]}]},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}},{"type":"paragraph","attrs":{"indent":0,"number":0,"align":null,"origin":null}}]}

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢