当前位置:网站首页>mongodb内核源码实现、性能调优、最佳运维实践系列-command命令处理模块源码实现一
mongodb内核源码实现、性能调优、最佳运维实践系列-command命令处理模块源码实现一
2020-11-09 22:32:00 【杨亚洲-专注mongodb及高性能中间件】
关于作者
前滴滴出行技术专家,现任OPPO文档数据库mongodb负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库mongodb内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。后续持续分享《MongoDB内核源码设计、性能优化、最佳运维实践》,Github账号地址:https://github.com/y123456yz
- 背景
<<transport_layer网络传输层模块源码实现>>中分享了mongodb内核底层网络IO处理相关实现,包括套接字初始化、一个完整mongodb报文的读取、获取到DB数据发送给客户端等。Mongodb支持多种增、删、改、查、聚合处理、cluster处理等操作,每个操作在内核实现中对应一个command,每个command有不同的功能,mongodb内核如何进行command源码处理将是本文分析的重点
此外,mongodb提供了mongostat工具来监控当前集群的各种操作统计。Mongostat监控统计如下图所示:
其中,insert、delete、update、query这四项统计比较好理解,分别对应增、删、改、查。但是,comand、getmore不是很好理解,command代表什么统计?getMore代表什么统计?,这两项相对比较难理解。
此外,通过本文字分析,我们将搞明白这六项统计的具体含义,同时弄清这六项统计由那些操作进行计数。
Command命令处理模块分为:mongos操作命令、mongod操作命令、mongodb集群内部命令,具体定义如下:
- mongos操作命令,客户端可以通过mongos访问集群相关的命令。
- mongod操作命令:客户端可以通过mongod复制集和cfg server访问集群的相关命令。
- mongodb集群内部命令:mongos、mongod、mongo-cfg集群实例之间交互的命令。
Command命令处理模块核心代码实现如下:
《command命令处理模块源码实现》相关文章重点分析命令处理模块核心代码实现,也就是上面截图中的命令处理源码文件实现。
2. <<transport_layer网络传输层模块源码实现>>衔接回顾
<<transport_layer网络传输层模块源码实现三>>一文中,我们对service_state_machine状态机调度子模块进行了分析,该模块中的dealTask任务进行mongodb内部业务逻辑处理,其核心实现如下:
1.//dealTask处理
2.void ServiceStateMachine::_processMessage(ThreadGuard guard) {
3. ......
4. //command处理、DB访问后的数据通过dbresponse返回
5. DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);
6. ......
7.}
上面的_sep对应mongod或者mongos实例的服务入口实现,该_seq成员分别在如下代码中初始化为ServiceEntryPointMongod和ServiceEntryPointMongod类实现。SSM状态机的_seq成员初始化赋值核心代码实现如下:
1.//mongos实例启动初始化
2.static ExitCode runMongosServer() {
3. ......
4. //mongos实例对应sep为ServiceEntryPointMongos
5. auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext());
6. getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));
7. ......
8.}
9.
10.//mongod实例启动初始化
11.ExitCode _initAndListen(int listenPort) {
12. ......
13. //mongod实例对应sep为ServiceEntryPointMongod
14. serviceContext->setServiceEntryPoint(
15. stdx::make_unique<ServiceEntryPointMongod>(serviceContext));
16. ......
17.}
18.
19.//SSM状态机初始化
20.ServiceStateMachine::ServiceStateMachine(...)
21. : _state{State::Created},
22. //mongod和mongos实例的服务入口通过这里赋值给_seq成员变量
23. _sep{svcContext->getServiceEntryPoint()},
24. ......
}
通过上面的几个核心接口实现,把mongos和mongod两个实例的服务入口与状态机SSM(ServiceStateMachine)联系起来,最终和下面的command命令处理模块关联。
dealTask进行一次mongodb请求的内部逻辑处理,该处理由_sep->handleRequest()接口实现。由于mongos和mongod服务入口分别由ServiceEntryPointMongos和ServiceEntryPointMongod两个类实现,因此dealTask也就演变为如下接口处理:
- mongos实例:ServiceEntryPointMongos::handleRequest(...)
- Mongod实例::ServiceEntryPointMongod::handleRequest(...)
这两个接口入参都是OperationContext和Message,分别对应操作上下文、请求原始数据内容。下文会分析Message解析实现、OperationContext服务上下文实现将在后续章节分析。
Mongod和mongos实例服务入口类都继承自网络传输模块中的ServiceEntryPointImpl类,如下图所示:
Tips: mongos和mongod服务入口类为何要继承网络传输模块服务入口类?
原因是一个请求对应一个链接session,该session对应的请求又和SSM状态机唯一对应。所有客户端请求对应的SSM状态机信息全部保存再ServiceEntryPointImpl._sessions成员中,而command命令处理模块为SSM状态机任务中的dealTask任务,通过该继承关系,ServiceEntryPointMongod和ServiceEntryPointMongos子类也就可以和状态机及任务处理关联起来,同时也可以获取当前请求对应的session链接信息。
3. Mongodb协议解析
在《transport_layer网络传输层模块源码实现二》中的数据收发子模块完成了一个完整mongodb报文的接收,一个mongodb报文由Header头部+opCode包体组成,如下图所示:
上图中各个字段说明如下表:
opCode取值比较多,早期版本中OP_INSERT、OP_DELETE、OP_UPDATE、OP_QUERY分别针对增删改查请求,Mongodb从3.6版本开始默认使用OP_MSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。本文以OP_MSG操作码对应协议为例进行分析,其他操作码协议分析过程类似,OP_MSG请求协议格式如下:
1.OP_MSG {
2. //mongodb报文头部
3. MsgHeader header;
4. //位图,用于标识报文是否需要校验 是否需要应答等
5. uint32 flagBits; // message flags
6. //报文内容,例如find write等命令内容通过bson格式存在于该结构中
7. Sections[] sections; // data sections
8. //报文CRC校验
9. optional<uint32> checksum; // optional CRC-32C checksum
}
OP_MSG各个字段说明如下表:
一个完整OP_MSG请求格式如下:
除了通用头部header外,客户端命令请求实际上都保存于sections字段中,该字段存放的是请求的原始bson格式数据。BSON是由10gen开发的一个数据格式,目前主要用于MongoDB中,是MongoDB的数据存储格式。BSON基于JSON格式,选择JSON进行改造的原因主要是JSON的通用性及JSON的schemaless的特性。BSON相比JSON具有以下特性:
- Lightweight(更轻量级)
- Traversable(易操作)
- Efficient(高效性能)
本文重点不是分析bson协议格式,bson协议实现细节将在后续章节分享。bson协议更多设计细节详见:http://bsonspec.org/
总结:一个完整mongodb报文由header+body组成,其中header长度固定为16字节,body长度等于messageLength-16。Header部分协议解析由message.cpp和message.h两源码文件实现,body部分对应的OP_MSG类请求解析由op_msg.cpp和op_msg.h两源码文件实现。
3. mongodb报文通用头部解析及封装源码实现
Header头部解析由src/mongo/util/net目录下message.cpp和message.h两文件完成,该类主要完成通用header头部和body部分的解析、封装。因此报文头部核心代码分为以下两类:
- 报文头部内容解析及封装(MSGHEADER命名空间实现)
- 头部和body内容解析及封装(MsgData命名空间实现)
3.1 mongodb报文头部解析及封装核心代码实现
mongodb报文头部解析由namespace MSGHEADER {...}实现,该类主要成员及接口实现如下:
1.namespace MSGHEADER {
2.//header头部各个字段信息
3.struct Layout {
4. //整个message长度,包括header长度和body长度
5. int32_t messageLength;
6. //requestID 该请求id信息
7. int32_t requestID;
8. //getResponseToMsgId解析
9. int32_t responseTo;
10. //操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等
11. int32_t opCode;
12.};
13.
14.//ConstView实现header头部数据解析
15.class ConstView {
16.public:
17. ......
18. //初始化构造
19. ConstView(const char* data) : _data(data) {}
20. //获取_data地址
21. const char* view2ptr() const {
22. return data().view();
23. }
24. //TransportLayerASIO::ASIOSourceTicket::_headerCallback调用
25. //解析header头部的messageLength字段
26. int32_t getMessageLength() const {
27. return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength));
28. }
29. //解析header头部的requestID字段
30. int32_t getRequestMsgId() const {
31. return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID));
32. }
33. //解析header头部的getResponseToMsgId字段
34. int32_t getResponseToMsgId() const {
35. return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo));
36. }
37. //解析header头部的opCode字段
38. int32_t getOpCode() const {
39. return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode));
40. }
41.
42.protected:
43. //mongodb报文数据起始地址
44. const view_type& data() const {
45. return _data;
46. }
47.private:
48. //数据部分
49. view_type _data;
50.};
51.
52.//View填充header头部数据
53.class View : public ConstView {
54.public:
55. ......
56. //构造初始化
57. View(char* data) : ConstView(data) {}
58. //header起始地址
59. char* view2ptr() {
60. return data().view();
61. }
62. //以下四个接口进行header填充
63. //填充header头部messageLength字段
64. void setMessageLength(int32_t value) {
65. data().write(tagLittleEndian(value), offsetof(Layout, messageLength));
66. }
67. //填充header头部requestID字段
68. void setRequestMsgId(int32_t value) {
69. data().write(tagLittleEndian(value), offsetof(Layout, requestID));
70. }
71. //填充header头部responseTo字段
72. void setResponseToMsgId(int32_t value) {
73. data().write(tagLittleEndian(value), offsetof(Layout, responseTo));
74. }
75. //填充header头部opCode字段
76. void setOpCode(int32_t value) {
77. data().write(tagLittleEndian(value), offsetof(Layout, opCode));
78. }
79.private:
80. //指向header起始地址
81. view_type data() const {
82. return const_cast<char*>(ConstView::view2ptr());
83. }
84.};
85.}
从上面的header头部解析、填充的实现类可以看出,header头部解析由MSGHEADER::ConstView实现;header头部填充由MSGHEADER::View完成。实际上代码实现上,通过offsetof来进行移位,从而快速定位到头部对应字段。
3.2 mongodb报文头部+body解析封装核心代码实现
Namespace MSGHEADER{...}命名空间只负责header头部的处理,namespace MsgData{...}命名空间相对MSGHEADER命名空间更加完善,除了处理头部解析封装外,还负责body数据起始地址维护、body数据封装、数据长度检查等。MsgData命名空间核心代码实现如下:
1.namespace MsgData {
2.struct Layout {
3. //数据填充组成:header部分
4. MSGHEADER::Layout header;
5. //数据填充组成: body部分,body先用data占位置
6. char data[4];
7.};
8.
9.//解析header字段信息及body其实地址信息
10.class ConstView {
11.public:
12. //初始化构造
13. ConstView(const char* storage) : _storage(storage) {}
14. //获取数据起始地址
15. const char* view2ptr() const {
16. return storage().view();
17. }
18.
19. //以下四个接口间接执行前面的MSGHEADER中的头部字段解析
20. //填充header头部messageLength字段
21. int32_t getLen() const {
22. return header().getMessageLength();
23. }
24. //填充header头部requestID字段
25. int32_t getId() const {
26. return header().getRequestMsgId();
27. }
28. //填充header头部responseTo字段
29. int32_t getResponseToMsgId() const {
30. return header().getResponseToMsgId();
31. }
32. //获取网络数据报文中的opCode字段
33. NetworkOp getNetworkOp() const {
34. return NetworkOp(header().getOpCode());
35. }
36. //指向body起始地址
37. const char* data() const {
38. return storage().view(offsetof(Layout, data));
39. }
40. //messageLength长度检查,opcode检查
41. bool valid() const {
42. if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))
43. return false;
44. if (getNetworkOp() < 0 || getNetworkOp() > 30000)
45. return false;
46. return true;
47. }
48. ......
49.protected:
50. //获取_storage
51. const ConstDataView& storage() const {
52. return _storage;
53. }
54. //指向header起始地址
55. MSGHEADER::ConstView header() const {
56. return storage().view(offsetof(Layout, header));
57. }
58.private:
59. //mongodb报文存储在这里
60. ConstDataView _storage;
61.};
62.
63.//填充数据,包括Header和body
64.class View : public ConstView {
65.public:
66. //构造初始化
67. View(char* storage) : ConstView(storage) {}
68. ......
69. //获取报文起始地址
70. char* view2ptr() {
71. return storage().view();
72. }
73.
74. //以下四个接口间接执行前面的MSGHEADER中的头部字段构造
75. //以下四个接口完成msg header赋值
76. //填充header头部messageLength字段
77. void setLen(int value) {
78. return header().setMessageLength(value);
79. }
80. //填充header头部messageLength字段
81. void setId(int32_t value) {
82. return header().setRequestMsgId(value);
83. }
84. //填充header头部messageLength字段
85. void setResponseToMsgId(int32_t value) {
86. return header().setResponseToMsgId(value);
87. }
88. //填充header头部messageLength字段
89. void setOperation(int value) {
90. return header().setOpCode(value);
91. }
92.
93. using ConstView::data;
94. //指向data
95. char* data() {
96. return storage().view(offsetof(Layout, data));
97. }
98.private:
99. //也就是报文起始地址
100. DataView storage() const {
101. return const_cast<char*>(ConstView::view2ptr());
102. }
103. //指向header头部
104. MSGHEADER::View header() const {
105. return storage().view(offsetof(Layout, header));
106. }
107.};
108.
109.......
110.//Value为前面的Layout,减4是因为有4字节填充data,所以这个就是header长度
111.const int MsgDataHeaderSize = sizeof(Value) - 4;
112.
113.//除去头部后的数据部分长度
114.inline int ConstView::dataLen() const {
115. return getLen() - MsgDataHeaderSize;
116.}
117.} // namespace MsgData
和MSGHEADER命名空间相比,MsgData这个namespace命名空间接口实现和前面的MSGHEADER命名空间实现大同小异。MsgData不仅仅处理header头部的解析组装,还负责body部分数据头部指针指向、头部长度检查、opCode检查、数据填充等。其中,MsgData命名空间中header头部的解析构造底层依赖MSGHEADER实现。
3.3 Message/DbMessage核心代码实现
在《transport_layer网络传输层模块源码实现二》中,从底层ASIO库接收到的mongodb报文是存放在Message结构中存储,最终存放在ServiceStateMachine._inMessage成员中。
在前面第2章我们知道mongod和mongso实例的服务入口接口handleRequest(...)中都带有Message入参,也就是接收到的Message数据通过该接口处理。Message类主要接口实现如下:
1.//DbMessage._msg成员为该类型
2.class Message {
3.public:
4. //message初始化
5. explicit Message(SharedBuffer data) : _buf(std::move(data)) {}
6. //头部header数据
7. MsgData::View header() const {
8. verify(!empty());
9. return _buf.get();
10. }
11. //获取网络数据报文中的op字段
12. NetworkOp operation() const {
13. return header().getNetworkOp();
14. }
15. //_buf释放为空
16. bool empty() const {
17. return !_buf;
18. }
19. //获取报文总长度messageLength
20. int size() const {
21. if (_buf) {
22. return MsgData::ConstView(_buf.get()).getLen();
23. }
24. return 0;
25. }
26. //body长度
27. int dataSize() const {
28. return size() - sizeof(MSGHEADER::Value);
29. }
30. //buf重置
31. void reset() {
32. _buf = {};
33. }
34. // use to set first buffer if empty
35. //_buf直接使用buf空间
36. void setData(SharedBuffer buf) {
37. verify(empty());
38. _buf = std::move(buf);
39. }
40. //把msgtxt拷贝到_buf中
41. void setData(int operation, const char* msgtxt) {
42. setData(operation, msgtxt, strlen(msgtxt) + 1);
43. }
44. //根据operation和msgdata构造一个完整mongodb报文
45. void setData(int operation, const char* msgdata, size_t len) {
46. verify(empty());
47. size_t dataLen = len + sizeof(MsgData::Value) - 4;
48. _buf = SharedBuffer::allocate(dataLen);
49. MsgData::View d = _buf.get();
50. if (len)
51. memcpy(d.data(), msgdata, len);
52. d.setLen(dataLen);
53. d.setOperation(operation);
54. }
55. ......
56. //获取_buf对应指针
57. const char* buf() const {
58. return _buf.get();
59. }
60.
61.private:
62. //存放接收数据的buf
63. SharedBuffer _buf;
64.};
Message是操作mongodb收发报文最直接的实现类,该类主要完成一个完整mongodb报文封装。有关mongodb报文头后面的body更多的解析实现在DbMessage类中完成,DbMessage类包含Message类成员_msg。实际上,Message报文信息在handleRequest(...)实例服务入口中赋值给DbMessage._msg,报文后续的body处理继续由DbMessage类相关接口完成处理。DbMessage和Message类关系如下:
1.class DbMessage {
2. ......
3. //包含Message成员变量
4. const Message& _msg;
5. //mongodb报文起始地址
6. const char* _nsStart;
7. //报文结束地址
8. const char* _theEnd;
9.}
10.
11.DbMessage::DbMessage(const Message& msg) : _msg(msg),
12. _nsStart(NULL), _mark(NULL), _nsLen(0) {
13. //一个mongodb报文(header+body)数据的结束地址
14. _theEnd = _msg.singleData().data() + _msg.singleData().dataLen();
15. //报文起始地址 [_nextjsobj, _theEnd ]之间的数据就是一个完整mongodb报文
16. _nextjsobj = _msg.singleData().data();
17. ......
18.}
DbMessage._msg成员为DbMessage 类型,DbMessage的_nsStart和_theEnd成员分别记录完整mongodb报文的起始地址和结束地址,通过这两个指针就可以获取一个完整mongodb报文的全部内容,包括header和body。
注意:DbMessage是早期mongodb版本(version<3.6)中用于报文body解析封装的类,这些类针对opCode=[dbUpdate, dbDelete]这个区间的操作。在mongodb新版本(version>=3.6)中,body解析及封装由op_msg.h和op_msg.cpp代码文件中的clase OpMsgRequest{}完成处理。
3.4 OpMsg报文解析封装核心代码实现
Mongodb从3.6版本开始默认使用OP_MSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。OP_MSG对应mongodb报文body解析封装处理由OpMsg类相关接口完成,OpMsg::parse(Message)从Message中解析出报文body内容,其核心代码实现如下:
1.struct OpMsg {
2. ......
3. //msg解析赋值见OpMsg::parse
4. //各种命令(insert update find等)都存放在该body中
5. BSONObj body;
6. //sequences用法暂时没看懂,感觉没什么用?先跳过
7. std::vector<DocumentSequence> sequences; //赋值见OpMsg::parse
8.}
1.//从message中解析出OpMsg信息
2.OpMsg OpMsg::parse(const Message& message) try {
3. //message不能为空,并且opCode必须为dbMsg
4. invariant(!message.empty());
5. invariant(message.operation() == dbMsg);
6. //获取flagBits
7. const uint32_t flags = OpMsg::flags(message);
8. //flagBits有效性检查,bit 0-15中只能对第0和第1位操作
9. uassert(ErrorCodes::IllegalOpMsgFlag,
10. str::stream() << "Message contains illegal flags value: Ob"
11. << std::bitset<32>(flags).to_string(),
12. !containsUnknownRequiredFlags(flags));
13.
14. //校验码默认4字节
15. constexpr int kCrc32Size = 4;
16. //判断该mongo报文body内容是否启用了校验功能
17. const bool haveChecksum = flags & kChecksumPresent;
18. //如果有启用校验功能,则报文末尾4字节为校验码
19. const int checksumSize = haveChecksum ? kCrc32Size : 0;
20. //sections字段内容
21. BufReader sectionsBuf(message.singleData().data() + sizeof(flags),
22. message.dataSize() - sizeof(flags) - checksumSize);
23.
24. //默认先设置位false
25. bool haveBody = false;
26. OpMsg msg;
27. //解析sections对应命令请求数据
28. while (!sectionsBuf.atEof()) {
29. //BufReader::read读取kind内容,一个字节
30. const auto sectionKind = sectionsBuf.read<Section>();
31. //kind为0对应命令请求body内容,内容通过bson报错
32. switch (sectionKind) {
33. //sections第一个字节是0说明是body
34. case Section::kBody: {
35. //默认只能有一个body
36. uassert(40430, "Multiple body sections in message", !haveBody);
37. haveBody = true;
38. //命令请求的bson信息保存在这里
39. msg.body = sectionsBuf.read<Validated<BSONObj>>();
40. break;
41. }
42.
43. //DocSequence暂时没看明白,用到的地方很少,跳过,后续等
44. //该系列文章主流功能分析完成后,从头再回首分析
45. case Section::kDocSequence: {
46. ......
47. }
48. }
49. }
50. //OP_MSG必须有body内容
51. uassert(40587, "OP_MSG messages must have a body", haveBody);
52. //body和sequence去重判断
53. for (const auto& docSeq : msg.sequences) {
54. ......
55. }
56. return msg;
57.}
OpMsg类被OpMsgRequest类继承,OpMsgRequest类中核心接口就是解析出OpMsg.body中的库信息和表信息,OpMsgRequest类代码实现如下:
1.//协议解析得时候会用到,见runCommands
2.struct OpMsgRequest : public OpMsg {
3. ......
4. //构造初始化
5. explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}
6. //opMsgRequestFromAnyProtocol->OpMsgRequest::parse
7. //从message中解析出OpMsg所需成员信息
8. static OpMsgRequest parse(const Message& message) {
9. //OpMsg::parse
10. return OpMsgRequest(OpMsg::parse(message));
11. }
12. //根据db body extraFields填充OpMsgRequest
13. static OpMsgRequest fromDBAndBody(... {
14. OpMsgRequest request;
15. request.body = ([&] {
16. //填充request.body
17. ......
18. }());
19. return request;
20. }
21. //从body中获取db name
22. StringData getDatabase() const {
23. if (auto elem = body["$db"])
24. return elem.checkAndGetStringData();
25. uasserted(40571, "OP_MSG requests require a $db argument");
26. }
27. //find insert 等命令信息 body中的第一个elem就是command 名
28. StringData getCommandName() const {
29. return body.firstElementFieldName();
30. }
31.};
OpMsgRequest通过OpMsg::parse(message)解析出OpMsg信息,从而获取到body内容,GetCommandName()接口和getDatabase()则分别从body中获取库DB信息、命令名信息。通过该类相关接口,命令名(find、write、update等)和DB库都获取到了。
OpMsg模块除了OP_MSG相关报文解析外,还负责OP_MSG报文组装填充,该模块接口功能大全如下表:
4. Mongod实例服务入口核心代码实现
Mongod实例服务入口类ServiceEntryPointMongod继承ServiceEntryPointImpl类,mongod实例的报文解析处理、命令解析、命令执行都由该类负责处理。ServiceEntryPointMongod核心接口可以细分为:opCode解析及回调处理、命令解析及查找、命令执行三个子模块。
4.1 opCode解析及回调处理
OpCode操作码解析及其回调处理由ServiceEntryPointMongod::handleRequest(...)接口实现,核心代码实现如下:
1.//mongod服务对于客户端请求的处理
2.//通过状态机SSM模块的如下接口调用:ServiceStateMachine::_processMessage
3.DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) {
4. //获取opCode,3.6版本对应客户端默认使用OP_MSG
5. NetworkOp op = m.operation();
6. ......
7. //根据message构造DbMessage
8. DbMessage dbmsg(m);
9. //根据操作上下文获取对应的client
10. Client& c = *opCtx->getClient();
11. ......
12. //获取库.表信息,注意只有dbUpdate<opCode<dbDelete的opCode请求才通过dbmsg直接获取库和表信息
13. const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;
14. const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();
15. ....
16. //CurOp::debug 初始化opDebug,慢日志相关记录
17. OpDebug& debug = currentOp.debug();
18. //慢日志阀值
19. long long logThresholdMs = serverGlobalParams.slowMS;
20. //时mongodb将记录这次慢操作,1为只记录慢操作,即操作时间大于了设置的配置,2表示记录所有操作
21. bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));
22. DbResponse dbresponse;
23. if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) {
24. //新版本op=dbMsg,因此走这里
25. //从DB获取数据,获取到的数据通过dbresponse返回
26. dbresponse = runCommands(opCtx, m);
27. } else if (op == dbQuery) {
28. ......
29. //早期mongodb版本查询走这里
30. dbresponse = receivedQuery(opCtx, nsString, c, m);
31. } else if (op == dbGetMore) {
32. //早期mongodb版本查询走这里
33. dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);
34. } else {
35. ......
36. //早期版本增 删 改走这里处理
37. if (op == dbInsert) {
38. receivedInsert(opCtx, nsString, m); //插入操作入口 新版本CmdInsert::runImpl
39. } else if (op == dbUpdate) {
40. receivedUpdate(opCtx, nsString, m); //更新操作入口
41. } else if (op == dbDelete) {
42. receivedDelete(opCtx, nsString, m); //删除操作入口
43. }
44. }
45. //获取runCommands执行时间,也就是内部处理时间
46. debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses());
47. ......
48. //慢日志记录
49. if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) {
50. Locker::LockerInfo lockerInfo;
51. //OperationContext::lockState LockerImpl<>::getLockerInfo
52. opCtx->lockState()->getLockerInfo(&lockerInfo);
53.
54. //OpDebug::report 记录慢日志到日志文件
55. log() << debug.report(&c, currentOp, lockerInfo.stats);
56. }
57. //各种统计信息
58. recordCurOpMetrics(opCtx);
59.}
Mongod的handleRequest()接口主要完成以下工作:
- 从Message中获取OpCode,早期版本每个命令又对应取值,例如增删改查早期版本分别对应:dbInsert、dbDelete、dbUpdate、dbQuery;Mongodb 3.6开始,默认请求对应OpCode都是OP_MSG,本文默认只分析OpCode=OP_MSG相关的处理。
- 获取本操作对应的Client客户端信息。
- 如果是早期版本,通过Message构造DbMessage,同时解析出库.表信息。
- 根据不同OpCode执行对应回调操作,OP_MSG对应操作为runCommands(...),获取的数据通过dbresponse返回。
- 获取到db层返回的数据后,进行慢日志判断,如果db层数据访问超过阀值,记录慢日志。
- 设置debug的各种统计信息。
4.2 命令解析及查找
从上面的分析可以看出,接口最后调用runCommands(...),该接口核心代码实现如下所示:
1.//message解析出对应command执行
2.DbResponse runCommands(OperationContext* opCtx, const Message& message) {
3. //获取message对应的ReplyBuilder,3.6默认对应OpMsgReplyBuilder
4. //应答数据通过该类构造
5. auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));
6. [&] {
7. OpMsgRequest request;
8. try { // Parse.
9. //协议解析 根据message获取对应OpMsgRequest
10. request = rpc::opMsgRequestFromAnyProtocol(message);
11. }
12. }
13. try { // Execute.
14. //opCtx初始化
15. curOpCommandSetup(opCtx, request);
16. //command初始化为Null
17. Command* c = nullptr;
18. //OpMsgRequest::getCommandName查找
19. if (!(c = Command::findCommand(request.getCommandName()))) {
20. //没有找到相应的command的后续异常处理
21. ......
22. }
23. //执行command命令,获取到的数据通过replyBuilder.get()返回
24. execCommandDatabase(opCtx, c, request, replyBuilder.get());
25. }
26. //OpMsgReplyBuilder::done对数据进行序列化操作
27. auto response = replyBuilder->done();
28. //responseLength赋值
29. CurOp::get(opCtx)->debug().responseLength = response.header().dataLen();
30. // 返回
31. return DbResponse{std::move(response)};
32.}
RunCommands(...)接口从message中解析出OpMsg信息,然后获取该OpMsg对应的command命令信息,最后执行该命令对应的后续处理操作。主要功能说明如下:
- 获取该OpCode对应replyBuilder,OP_MSG操作对应builder为OpMsgReplyBuilder。
- 根据message解析出OpMsgRequest数据,OpMsgRequest来中包含了真正的命令请求bson信息。
- opCtx初始化操作。
- 通过request.getCommandName()返回命令信息(如“find”、“update”等字符串)。
- 通过Command::findCommand(command name)从CommandMap这个map表中查找是否支持该command命令。如果没找到说明不支持,如果找到说明支持。
- 调用execCommandDatabase(...)执行该命令,并获取命令的执行结果。
- 根据command执行结果构造response并返回
4.3 命令执行
1.void execCommandDatabase(...) {
2. ......
3. //获取dbname
4. const auto dbname = request.getDatabase().toString();
5. ......
6. //mab表存放从bson中解析出的elem信息
7. StringMap<int> topLevelFields;
8. //body elem解析
9. for (auto&& element : request.body) {
10. //获取bson中的elem信息
11. StringData fieldName = element.fieldNameStringData();
12. //如果elem信息重复,则异常处理
13. ......
14. }
15. //如果是help命令,则给出help提示
16. if (Command::isHelpRequest(helpField)) {
17. //给出help提示
18. Command::generateHelpResponse(opCtx, replyBuilder, *command);
19. return;
20. }
21. //权限认证检查,检查该命令执行权限
22. uassertStatusOK(Command::checkAuthorization(command, opCtx, request));
23. ......
24.
25. //该命令执行次数统计 db.serverStatus().metrics.commands可以获取统计信息
26. command->incrementCommandsExecuted();
27. //真正的命令执行在这里面
28. retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);
29. //该命令执行失败次数统计
30. if (!retval) {
31. command->incrementCommandsFailed();
32. }
33. ......
34.}
execCommandDatabase(...)最终调用RunCommandImpl(...)进行对应命令的真正处理,该接口核心代码实现如下:
1.bool runCommandImpl(...) {
2. //获取命令请求内容body
3. BSONObj cmd = request.body;
4. //获取请求中的DB库信息
5. const std::string db = request.getDatabase().toString();
6. //ReadConcern检查
7. Status rcStatus = waitForReadConcern(
8. opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd));
9. //ReadConcern检查不通过,直接异常提示处理
10. if (!rcStatus.isOK()) {
11. //异常处理
12. return;
13. }
14. if (!command->supportsWriteConcern(cmd)) {
15. //命令不支持WriteConcern,但是对应的请求中却带有WriteConcern配置,直接报错不支持
16. if (commandSpecifiesWriteConcern(cmd)) {
17. //异常处理"Command does not support writeConcern"
18. ......
19. return result;
20. }
21. //调用Command::publicRun执行不同命令操作
22. result = command->publicRun(opCtx, request, inPlaceReplyBob);
23. }
24. //提取WriteConcernOptions信息
25. auto wcResult = extractWriteConcern(opCtx, cmd, db);
26. //提取异常,直接异常处理
27. if (!wcResult.isOK()) {
28. //异常处理
29. ......
30. return result;
31. }
32. ......
33. //执行对应的命令Command::publicRun,执行不同命令操作
34. result = command->publicRun(opCtx, request, inPlaceReplyBob);
35. ......
36.}
RunCommandImpl(...)接口最终调用该接口入参的command,执行 command->publicRun(...)接口,也就是命令模块的公共publicRun。
4.4 总结
Mongod服务入口首先从message中解析出opCode操作码,3.6版本对应客户端默认操作码为OP_MSQ,解析出该操作对应OpMsgRequest信息。然后从message原始数据中解析出command命令字符串后,继续通过全局Map表种查找是否支持该命令操作,如果支持则执行该命令;如果不支持,直接异常打印,同时返回。
5. Mongos实例服务入口核心代码实现
mongos服务入口核心代码实现过程和mongod服务入口代码实现流程几乎相同,mongos实例message解析、OP_MSG操作码处理、command命令查找等流程和上一章节mongod实例处理过程类似,本章节不在详细分析。Mongos实例服务入口处理调用流程如下:
ServiceEntryPointMongos::handleRequest(...)->Strategy::clientCommand(...)-->runCommand(...)->execCommandClient(...)
最后的接口核心代码实现如下:
1.void runCommand(...) {
2. ......
3. //获取请求命令name
4. auto const commandName = request.getCommandName();
5. //从全局map表中查找
6. auto const command = Command::findCommand(commandName);
7. //没有对应的command存在,抛异常说明不支持该命令
8. if (!command) {
9. ......
10. return;
11. }
12. ......
13. //执行命令
14. execCommandClient(opCtx, command, request, builder);
15. ......
16.}
17.
18.void execCommandClient(...)
19.{
20. ......
21. //认证检查,是否有操作该command命令的权限,没有则异常提示
22. Status status = Command::checkAuthorization(c, opCtx, request);
23. if (!status.isOK()) {
24. Command::appendCommandStatus(result, status);
25. return;
26. }
27. //该命令的执行次数自增,代理上面也是要计数的
28. c->incrementCommandsExecuted();
29. //如果需要command统计,则加1
30. if (c->shouldAffectCommandCounter()) {
31. globalOpCounters.gotCommand();
32. }
33. ......
34. //有部分命令不支持writeconcern配置,报错
35. bool supportsWriteConcern = c->supportsWriteConcern(request.body);
36. //不支持writeconcern又带有该参数的请求,直接异常处理"Command does not support writeConcern"
37. if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {
38. ......
39. return;
40. }
41. //执行本命令对应的公共publicRun接口,Command::publicRun
42. ok = c->publicRun(opCtx, request, result);
43. ......
44.}
- Tips: mongos和mongod实例服务入口核心代码实现的一点小区别
- Mongod实例opCode操作码解析、OpMsg解析、command查找及对应命令调用处理都由class ServiceEntryPointMongod{...}类一起完成。
- mongos实例则把opCode操作码解析交由class ServiceEntryPointMongos{...}类实现,OpMsg解析、command查找及对应命令调用处理放到了clase Strategy{...}类来处理。
6. 总结
Mongodb报文解析及组装流程总结
- 一个完整mongodb报文由通用报文header头部+body部分组成。
- Body部分内容,根据报文头部的opCode来决定不同的body内容。
- 3.6版本对应客户端请求opCode默认为OP_MSG,该操作码对应body部分由flagBits + sections + checksum组成,其中sections 中存放的是真正的命令请求信息,已bson数据格式保存。
- Header头部和body报文体封装及解析过程由class Message {...}类实现
- Body中对应command命令名、库名、表名的解析在mongodb(version<3.6)低版本协议中由class DbMessage {...}类实现
- Body中对应command命令名、库名、表名的解析在mongodb(version<3.6)低版本协议中由struct OpMsgRequest{...}结构和struct OpMsg {...}类实现
Mongos和mongod实例的服务入口处理流程大同小异,整体处理流程如下:
- 从message解析出opCode操作码,根据不同操作码执行对应操作码回调。
- 根据message解析出OpMsg request信息,mongodb报文的命令信息就存储在该body中,该body已bson格式存储。
- 从body中解析出command命令字符串信息(如“insert”、“update”等)。
- 从全局_commands map表中查找是否支持该命令,如果支持则执行该命令处理,如果不支持则直接报错提示。
- 最终找到对应command命令后,执行command的功能run接口。
图形化总结如下:
说明:第3章的协议解析及封装过程实际上应该算是网络处理模块范畴,本文为了分析command命令处理模块方便,把该部分实现归纳到了命令处理模块,这样方便理解。
Tips: 下期继续分享不同command命令执行细节。
7. 遗留问题
第1章节中的统计信息,将在command模块核心代码分析完毕后揭晓答案,《mongodb command命令处理模块源码实现二》中继续分析,敬请关注。
版权声明
本文为[杨亚洲-专注mongodb及高性能中间件]所创,转载请带上原文链接,感谢
https://my.oschina.net/u/4087916/blog/4709503
边栏推荐
- Mongodb source code implementation series network transport layer module implementation 3
- 如何用代码上传头像,并添加自己的版权信息?
- 开源项目,私活利器,快速开发
- 迅为IMX6ULL开发板C程序调用shell
- Quick for imx6ull development board c program call shell
- Chrome扩展程序热更新方案:2.基于双缓存更新功能模块
- 做个别人家的网页
- 东哥吃葡萄时竟然吃出一道算法题!
- Kubernetes-18:Dashboard安装及使用
- On the practical application of C 9's new features
猜你喜欢
Hand in hand to teach you to use container service tke cluster audit troubleshooting
60 余位技术高管齐聚松山湖,华为云第一期核心伙伴开发者训练营圆满落幕
C/C++编程日记:逻辑井字棋(圈叉)游戏开发
YoMo Codec - Y3的性能评测报告
11.9
Android instance - simple login layout
手把手教你使用容器服务 TKE 集群审计排查问题
SQL server attached database access denial resolution summary
都要2021年了,现代C++有什么值得我们学习的?
如何用代码上传头像,并添加自己的版权信息?
随机推荐
技术点5:XML语言
老旧系统重构技巧,轻松搞定遗留代码
C + + game development
[stm32h7] Chapter 6: stm32h7 dma2d acceleration of ThreadX guix
当我们开发一个接口时需要注意些什么
Hot update scheme of Chrome extension program: 2. Based on double cache update function module
nodejs篇-手写koa中间件
如何运用二分查找算法
[最佳实践]了解 Eolinker 如何助力远程办公
Modify the files in the jar package
sql 筛选查询重复列
The movie theater booking system based on micro Service Framework
How to realize the authority route and authority menu of background management system
Application of V7 version of lvgl Library
团灭 LeetCode 打家劫舍 问题
C/C++编程日记:逻辑井字棋(圈叉)游戏开发
商品后台系统实现
LeetCode 49 字母异位词分组
正式班D25
容器技术(三)镜像小结【16】