当前位置:网站首页>Etcd database source code analysis -- network layer server rafthandler between clusters
Etcd database source code analysis -- network layer server rafthandler between clusters
2022-06-28 03:22:00 【Tertium ferrugosum】
stay ETCD Database source code analysis —— Inter cluster network layer server interface In the article , start-up http.Server Will pass rafthttp.Transporter.Handler() Method is the specified URL Add the corresponding path Handler example , As shown in the figure below .streamHandler Responsible for handling Stream Requests on the message channel .pipelineHandler Responsible for handling Pipeline Request on channel ,snapHandler Responsible for handling Pipeline Request on channel .
pipelineHandler
pipelineHandler It is used by the server of the network layer between clusters to handle Pipeline Request on channel ,tr For the current pipeline Instance related rafthttp.Transport example ,r For the bottom Raft example ,cid For the current cluster ID.
type pipelineHandler struct {
lg *zap.Logger
localID types.ID
tr Transporter
r Raft
cid types.ID
}
pipelineHandler Implemented in the http.Server.Handler Interface ServeHTTP Method , Is its core method of processing requests . The method is passed by Read the request from the peer node to get the corresponding message instance , Then give it to the bottom etcd-raft Module processing .
func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
// Start with a series of checks , for example : Check the requested Method Is it POST, Detection cluster ID Is it legal or not
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) // Set the... In the response header X-Etcd-Cluster-ID For the current cluster ID
if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
addRemoteFromRequest(h.tr, r) // according to http Add a remote... To the request header peer
// Limit the number of bytes read continuously from the bottom layer each time , The default is 64KB, Because snapshot data can be very large , To prevent read timeouts , Only part of the data can be read into the buffer at a time , Finally, all the data are spliced together , Get complete snapshot data
// Limit the data size that could be read from the request body, which ensures that read from
// connection will not time out accidentally due to possible blocking in underlying implementation.
limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
b, err := io.ReadAll(limitedr) // Read HTTP Requested Body The whole content of
if err != nil {
h.lg.Warn("failed to read Raft message",zap.String("local-member-id", h.localID.String()),zap.Error(err),)
http.Error(w, "error reading raft message", http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
return
}
var m raftpb.Message
if err := m.Unmarshal(b); err != nil {
// Deserialized raftpb.Message example
h.lg.Warn("failed to unmarshal Raft message",zap.String("local-member-id", h.localID.String()),zap.Error(err),
)
http.Error(w, "error unmarshalling raft message", http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
return
}
receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
// Give the read message instance to the underlying Raft State machine for processing <---------------------------------
if err := h.r.Process(context.TODO(), m); err != nil {
switch v := err.(type) {
case writerToResponse: v.WriteTo(w)
default:
h.lg.Warn("failed to process Raft message",zap.String("local-member-id", h.localID.String()),zap.Error(err),
)
http.Error(w, "error processing raft message", http.StatusInternalServerError)
w.(http.Flusher).Flush()
// disconnect the http stream
panic(err)
}
return
}
// Return the appropriate status code to the peer node , Indicates that the request has been processed
// Write StatusNoContent header after the message has been processed by raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
}
// addRemoteFromRequest adds a remote peer according to an http request header addRemoteFromRequest According to a http Add a remote... To the request header peer
func addRemoteFromRequest(tr Transporter, r *http.Request) {
if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err == nil {
if urls := r.Header.Get("X-PeerURLs"); urls != "" {
// If... In the request package X-PeerURLs Field has opposite end URL, You need the server to call AddRemote Function to add new URL
tr.AddRemote(from, strings.Split(urls, ","))
}
}
}
streamHandler
streamHandler The structure is mainly responsible for receiving the peer-to-peer network connection , Match it to the corresponding streamWriter Instance . such ,streamWriter You can start sending messages to the peer node .Stream The message channel maintains HTTP A long connection , Mainly responsible for transmitting a small amount of data 、 Send more frequent messages , therefore streamHandler.ServeHTTP The main objective is to outgoingConn example (streamWriter) With the corresponding peer Instance binding .
type streamHandler struct {
lg *zap.Logger
tr *Transport // relation rafthttp.Transport example
peerGetter peerGetter // In its interface Get Method will change the specified node ID Get the corresponding peer example
r Raft // Bottom raft example
id types.ID // Current node ID
cid types.ID // Current cluster ID
}
streamHandler Implemented in the http.Server.Handler Interface ServeHTTP Method , Is its core method of processing requests .
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
// Detection request Method Is it GET
w.Header().Set("Allow", "GET")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("X-Server-Version", version.Version)
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
return
}
// Detect cluster's ID
var t streamType
switch path.Dir(r.URL.Path) {
case streamTypeMsgAppV2.endpoint(h.lg):
t = streamTypeMsgAppV2
case streamTypeMessage.endpoint(h.lg):
t = streamTypeMessage
default:
h.lg.Debug("ignored unexpected streaming request path",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("path", r.URL.Path),)
http.Error(w, "invalid path", http.StatusNotFound)
return
}
// Get the... Of the peer node ID
fromStr := path.Base(r.URL.Path)
from, err := types.IDFromString(fromStr)
if err != nil {
h.lg.Warn("failed to parse path into ID",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("path", fromStr),zap.Error(err),
)
http.Error(w, "invalid from", http.StatusNotFound)
return
}
if h.r.IsIDRemoved(uint64(from)) {
// Check whether the opposite end has been removed
h.lg.Warn("rejected stream from remote peer because it was removed",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("remote-peer-id-from", from.String()),
)
http.Error(w, "removed member", http.StatusGone)
return
}
p := h.peerGetter.Get(from) // According to the opposite end node ID Get the corresponding Peer example
if p == nil {
// This may happen in following cases:
// 1. user starts a remote peer that belongs to a different cluster with the same cluster ID.
// 2. local etcd falls behind of the cluster, and cannot recognize the members that joined after its current progress.
if urls := r.Header.Get("X-PeerURLs"); urls != "" {
h.tr.AddRemote(from, strings.Split(urls, ","))}
h.lg.Warn("failed to find remote peer in cluster",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("remote-peer-id-from", from.String()),zap.String("cluster-id", h.cid.String()),
)
http.Error(w, "error sender not found", http.StatusNotFound)
return
}
wto := h.id.String() // Get the... Of the current node ID
if gto := r.Header.Get("X-Raft-To"); gto != wto {
// Check whether the requested target node is the current node
h.lg.Warn("ignored streaming request; ID mismatch",zap.String("local-member-id", h.tr.ID.String()),zap.String("remote-peer-id-stream-handler", h.id.String()),zap.String("remote-peer-id-header", gto),zap.String("remote-peer-id-from", from.String()),zap.String("cluster-id", h.cid.String()),
)
http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
return
}
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush() // call Flush Method to send the response data to the peer node
c := newCloseNotifier()
conn := &outgoingConn{
t: t,Writer: w,Flusher: w.(http.Flusher),Closer: c,localID: h.tr.ID,peerID: from,} // establish outgoingConn example
p.attachOutgoingConn(conn) // take outgoingConn Instance and corresponding streamWriter Instance binding
<-c.closeNotify()
}
snapHandler
snapHandler The structure is used to receive snapshot data from the peer node .tr For related rafthttp.Transport example ,r For the bottom Raft example ,cid For the current cluster ID.snapshotter Responsible for saving snapshot data to a local file , See ETCD Source code analysis ——snap.New Function introduction .
type snapshotHandler struct {
lg *zap.Logger
tr Transporter
r Raft
snapshotter *snap.Snapshotter
localID types.ID
cid types.ID
}
ServeHTTP In the service of HTTP Request to receive and process snapshot messages . If the request sender does not close the underlying TCP Death in case of connection , The handler will continue to wait for the request body , until TCP keepalive After a few minutes, the connection was found disconnected . This is acceptable , because 1. Through others TCP Snapshot messages sent by the connection can still be received and processed .2. This should rarely happen , So there is no further optimization .snapshotHandler.ServeHTTP Method besides Read the snapshot data sent by the peer node , Generate the corresponding snapshot file locally , And pass the snapshot data through Raft Interfaces are passed to the underlying etcd-raft Module processing .
// ServeHTTP serves HTTP request to receive and process snapshot message.
//
// If request sender dies without closing underlying TCP connection,
// the handler will keep waiting for the request body until TCP keepalive
// finds out that the connection is broken after several minutes.
// This is acceptable because
// 1. snapshot messages sent through other TCP connections could still be
// received and processed.
// 2. this case should happen rarely, so no further optimization is done.
func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// First, a series of tests are carried out
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
http.Error(w, err.Error(), http.StatusPreconditionFailed)
snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
return
}
addRemoteFromRequest(h.tr, r) // according to http Add a remote... To the request header peer
dec := &messageDecoder{
r: r.Body}
// let snapshots be very large since they can exceed 512MB for large installations
// Limit the maximum number of bytes read from the underlying connection each time , The default is 64KB, Because snapshot data can be very large , To prevent read timeouts , Only part of the data can be read into the buffer at a time , Finally, all the data are spliced together , Get complete snapshot data
m, err := dec.decodeLimit(snapshotLimitByte)
from := types.ID(m.From).String()
if err != nil {
msg := fmt.Sprintf("failed to decode raft message (%v)", err)
h.lg.Warn("failed to decode Raft message",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Error(err),
)
http.Error(w, msg, http.StatusBadRequest)
recvFailures.WithLabelValues(r.RemoteAddr).Inc()
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
msgSize := m.Size()
receivedBytes.WithLabelValues(from).Add(float64(msgSize))
// Detect the type of message read , Is it MsgSnap
if m.Type != raftpb.MsgSnap {
h.lg.Warn("unexpected Raft message type",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.String("message-type", m.Type.String()),
)
http.Error(w, "wrong raft message type", http.StatusBadRequest)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
snapshotReceiveInflights.WithLabelValues(from).Inc()
defer func() {
snapshotReceiveInflights.WithLabelValues(from).Dec() }()
h.lg.Info("receiving database snapshot",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),zap.Int("incoming-snapshot-message-size-bytes", msgSize),zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
)
// save incoming database snapshot. Use Snapshotter Save the snapshot data to a local file
n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
if err != nil {
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
h.lg.Warn("failed to save incoming database snapshot",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),zap.Error(err),)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
return
}
receivedBytes.WithLabelValues(from).Add(float64(n))
downloadTook := time.Since(start)
h.lg.Info("received and saved database snapshot",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),zap.Int64("incoming-snapshot-size-bytes", n),zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),zap.String("download-took", downloadTook.String()),
)
// call Raft.Process Method , take MsgSnap Messages are delivered to the underlying etcd-raft Module processing
if err := h.r.Process(context.TODO(), m); err != nil {
switch v := err.(type) {
// Process may return writerToResponse error when doing some additional checks before calling raft.Node.Step.
case writerToResponse: v.WriteTo(w)
default:
msg := fmt.Sprintf("failed to process raft message (%v)", err)
h.lg.Warn("failed to process Raft message",zap.String("local-member-id", h.localID.String()),zap.String("remote-snapshot-sender-id", from),zap.Error(err),
)
http.Error(w, msg, http.StatusInternalServerError)
snapshotReceiveFailures.WithLabelValues(from).Inc()
}
return
}
// Write StatusNoContent header after the message has been processed by raft, which facilitates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
snapshotReceive.WithLabelValues(from).Inc()
snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
}
边栏推荐
- Thesis reading: General advantageous transformers
- Usage details of staticlayout
- [today in history] June 25: the father of notebook was born; Windows 98 release; First commercial use of generic product code
- Apache - Introduction à Apache
- Flask Foundation: template inheritance + static file configuration
- 元宇宙标准论坛成立
- 【522. 最长特殊序列 II】
- R语言惩罚逻辑回归、线性判别分析LDA、广义加性模型GAM、多元自适应回归样条MARS、KNN、二次判别分析QDA、决策树、随机森林、支持向量机SVM分类优质劣质葡萄酒十折交叉验证和ROC可视化
- 华为设备WLAN基本业务配置命令
- 【Kotlin】在Android官方文档中对其语法的基本介绍和理解
猜你喜欢

Dataloader参数collate_fn的使用

How to write concise code? (upper)

将PCAP转换为Json文件的神器:joy(安装篇)

测试要掌握的技术有哪些?软件测试必懂的数据库设计大全篇
![[today in history] June 20: the father of MP3 was born; Fujitsu was established; Google acquires dropcam](/img/54/df623fc1004e1dca5d369b4ed2608c.png)
[today in history] June 20: the father of MP3 was born; Fujitsu was established; Google acquires dropcam

嵌入式软件开发中必备软件工具

Arduino esp8266 web LED control

Inference optimization implementation of tensorrt model

空闲中断无法清除

Tips for visiting the website: you are not authorized to view the recovery method of this page
随机推荐
文件的相对路径写法
十年职场软件工程师感悟
测试要掌握的技术有哪些?软件测试必懂的数据库设计大全篇
一位博士在华为的22年(干货满满)
[kotlin] basic introduction and understanding of its syntax in Android official documents
Ten reasons for system performance failure
空闲中断无法清除
3年功能测试拿8K,被刚来的测试员反超,其实你在假装努力
TensorRT 模型推理优化实现
Dataloader参数collate_fn的使用
Notepad++--常用的插件
云成本优化有哪些优秀实践?
Apache, IIS6, ii7 independent IP host shielding restricts IP access
【小游戏】跑酷
Thesis reading: General advantageous transformers
crond BAD FILE MODE /etc/cron.d
apache、iis6、ii7独立ip主机屏蔽限制ip访问
R1 Quick Open Pressure Vessel Operation Special Operation Certificate Examination Library and Answers in 2022
导致系统性能失败的十个原因
Tencent games released more than 40 products and projects, including 12 new games